CyberKeeda In Social Media
Showing posts with label AWS ElasticSearch Service. Show all posts
Showing posts with label AWS ElasticSearch Service. Show all posts

Logstash with AWS Elasticsearch Service

Logstash with AWS Elasticsearch Service.


Data into aws elastic search domain can be shipped and ingested via multiple ways.
  • Using Kinesis Stream to ingest logs to aws elastic search.
  • Using Kinesis Firehose stream to to ingest logs to aws elastic search.
  • Using filebeat and logstash combination to ingest logs to aws elastic search.
In this blog post we will cover, how we will send our logs/data from EC2 instance using logstash to our AWS managed Elasticsearch domain.

Assumptions and Requirements:
  1. We already have a Elasticsearch domain created within AWS Elasticsearch services.
  2. User with IAM Role configured that have AmazonESFullAccess, this could be more granular access but for now we are assuming it to have full access for Elastic Search services.
  3. User must have programmatic access configured aka must have Access Key ID and AWS Secret Access Key.
  4. EC2 Instance that can have the above attached IAM role and must have appropriate security group  configured to connect to Elasticsearch endpoint, below snapshot will guide you about elastic search endpoint.
  5. I will not explain about logstash pipeline ( input, filter , output ), input and filter remains same but we will learn here what to define on the output section to ingest data to elasticsearch domain.





Installation and Configuration.

Lets proceed with installation first, we will install two components here.
  • Logstash 
  • Logstash-output-amazon_es plugin

Logstash can be directly installed from apt/yum or from binary too, click to use the official link for it's guideline, or you can follow up our previous post for complete ELK stack installation.

logstash-output-amazon_es plugin is a mandatory plugin to install as without it we can't ingest data to our AWS elasticsearch domain.
Please note, logstash must be installed first to install logstash-output-amazon_es plugin.

So toggle down to command prompt and run the below command, please locate your logstash bin directory before running the command, for amazon linux below is the default path.
# /usr/share/logstash/bin/logstash-plugin install logstash-output-amazon_es
You will get a success message upon a successful installation.

Now let's put the below lines within output section of your logstash output pipeline configuration.

Replace the highlighted red one with your own parameters.
output {
        stdout {codec => rubydebug
        }
        amazon_es {
                hosts => ["search-myekkdomain-rcridsesoz23h6svyyyju4pnmy.us-west-2.es.amazonaws.com"]
                region => "us-west-2"
                aws_access_key_id => 'AjkjfjkNAPE7IHGZDDZ'
                aws_secret_access_key => '3yuefiuqeoixPRyho837WYwo0eicBVZ'
                index => "your-ownelasticsearch-index-name"
    }
}

Once inserted and configured, restart the logstash service to reflect the changes.
Verify the same within logstash logs or kibana dashboard or even on ES Domain indices section.

Overall my logstash entire pipleline mentioned within file logstash.conf within directory /etc/logstash/conf.d/ looks like below, may be someone can take a reference of it.

Note : My demo.log contains logs generated by a spring boot app.
input {
  file {
    path => "/tmp/demo.log*"
    start_position => "beginning"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate => true
      what => previous
    }
  }
}

filter {

    grok {
          match => {
            "message" => [
                  "%{TIMESTAMP_ISO8601:timestamp}*%{LOGLEVEL:level}*--- *\[%{DATA:thread}] %{JAVACLASS:class} *:%{GREEDYDATA:json_data}"
                  ]
         }
     }
}

filter {
      json {
        source => "json_data"
      }
 }
output {
        stdout {codec => rubydebug
        }
        amazon_es {
                hosts => ["search-myekkdomain-rcridsesoz23h6svyyyju4pnmy.us-west-2.es.amazonaws.com"]
                region => "us-west-2"
                aws_access_key_id => 'AKs3IAuoisoosoweIHGZDDZ'
                aws_secret_access_key => '3d0w8bwuywbwi6IxPRyho837WYwo0eicBVZ'
                index => "your-ownelasticsearch-index-name"
    }
}


Thanks, do comment i will be happy to help you.


Read more ...

AWS Kinesis Agent configuration to process and parse multi line logs.


Within Kinesis Agent configuration, in order to preprocess data/logs before it send to Kinesis Stream or Firehose directly, we can use it's dataProcessingOptions  configuration settings.

Below are the three configuration options available for now.
  • SINGLELINE
  • CSVTOJSON
  • LOGTOJSON
There are many standard data/log format Kinesis agent is aware of and it don't need any pre processing such as Apache logs, but there are many cases our logs like Wildfly logs, custom data, stack traces which is not predefined, we need to use Kinesis Agent's dataProcessingOptions  to parse into a JSON value.

So we will be using here option "CSVTOJSON" and "SINGLELINE" option to parse our custom logs.

Here is our sample log looks like, every individual complete log line is highlighted with a different color.
14:36:21,753 | INFO  --- xyzignorelogs-1842 | orrelationIdGeneratorInterceptor | 1135 - com.xyz.ppp.def-orchestration-api-impl-bundle-v1 - 0.0.2211 | Execution started in CorrelationIdGeneratorInterceptor : somerandomnumber09897w9w7w
14:36:21,753 | INFO  --- xyzignorelogs-1842 | LogInInterceptor                 | 1135 - com.xyz.ppp.def-orchestration-api-impl-bundle-v1 - 0.0.2211 | Execution started in LogInInterceptor : somerandomnumber09897w9w7w
14:36:21,759 | INFO  ---  xyzignorelogs-1842 | Util                             | 1134 - com.xyz.ppp.def-orchestration- 0.0.2211 | {
  "consumer-workflow-DATA" : {
    "correlationId" : "somerandomnumber09897w9w7w",
    "flowName" : "carate",
    "Url" : "www.cyberkeeda.com",
    "Request/Response Type" : "tracecustomer",
    "API request" : {"fromAddress": "LA", "country" : "US", "DOB": "19-12-1977"}
  }
}
Before we do anything, we will tell agent to read our logs as multiline and for this we need to define the start of the string with a regex pattern.

Kinesis agent will treat a next line only when it again finds the same pattern.
From above logs we can clearly find that every new line of our log start with below time strings.
14:36:21,753 | INFO  
14:36:21,753 | INFO
14:36:21,759 | INFO 
    
So to process the above log file in JSON format, before it sends to stream.
We will use below configuration
{
    "flows": [
        {
            "filePattern": "/tmp/app.log*",
            "kinesisStream": "myapplogstream",
            "multiLineStartPattern": "^[0-9]{2}-[0-9]{2}-[0-9]{2}",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "timeframe", "message" ],
                    "delimiter": "---"
                }
            ]

        }
    ]
}

    
We have defined a regex to meet our requirement as "^[0-9]{2}-[0-9]{2}-[0-9]{2}" to let kinesis agent know our where does our multi-line starts with.

Further we are converting the entire line as SINGLELINE and dividing the entire line based on delimiter.
Here we are using "---", so overall we are dividing the entire lines into two part and thus seperating them by a "Comma (',')" to make it a CSV value.
"customFieldNames": [ "timeframe", "message" ],
Further As we are breaking the single line into CSV value with only two fields, we are using their field name as "timeframe" and "message"

Thus final processed line to stream will be sent as below
"timeframe" :"14:36:21,753 | INFO", "message": "xyzignorelogs-1842 | orrelationIdGeneratorInterceptor | 1135 - com.xyz.ppp.def-orchestration-api-impl-bundle-v1 - 0.0.2211 | Execution started in CorrelationIdGeneratorInterceptor : somerandomnumber09897w9w7w"

"timeframe" :"14:36:21,753" | INFO",  "message": "xyzignorelogs-1842 | LogInInterceptor                 | 1135 - com.xyz.ppp.def-orchestration-api-impl-bundle-v1 - 0.0.2211 | Execution started in LogInInterceptor : somerandomnumber09897w9w7w"


"timeframe" :" "14:36:21,759 | INFO", "message": "xyzignorelogs-1842 | Util                             | 1134 - com.xyz.ppp.def-orchestration- 0.0.2211 | {
  "consumer-workflow-DATA" : {
    "correlationId" : "somerandomnumber09897w9w7w",
    "flowName" : "rate",
    "Url" : "www.cyberkeeda.com",
    "Request/Response Type" : "tracecustomer",
    "API request" : {"fromAddress": "LA", "country" : "US", "DOB": "19-12-1977"}
  }
}"

    
Do let me know, if it works for you or not.

Read more ...

AWS Elasticsearch Service with Kinesis Data Stream and Kinesis Data Firehose


EKK -- ElasticSearch Kinesis Kibana


EKK Stack is a collective approach of using end to end AWS services to use elasticserach services.

We will replace all opensource products within a normal ELK stack with AWS Service. 

EKK Stack all together can manage and parse huge amount of log data, that can be used further for analytical, troubleshooting , central monitoring and alarming purposes using it's efficient GUI and without taking the burden of infrastructure availability and scalabilty, we can use all AWS service to deploy the entire elasticsearch services.

So below is the architecture we will be using and we will be enlisting all the AWS services used as compared to usual ELK Stack


EKK Stack Component

    • Elasticsearch and Kibana will be replaced by Amazon ElasticSearch Services, it includes Kibana dashboard too.
    • Logstash will be replaced by Kinesis Data Sream and Kinesis Data Firehose
    • Logstash client agent ( FileBeat ) client agent will be replaced by Kinesis Agent.
    You can have a look on one of my previous post of "How to install ELK Stack" to get an overview of how ELK stack works together with it's components.

    I have created a very basic AWS Cloudformation script, and will try to explain it and later that can be used too, lets start with parameters section.

    Parameters :
    # Author : Jackuna (https://github.com/Jackuna)
    # Website : www.cyberkeeda.com
    AWSTemplateFormatVersion: 2010-09-09
    Description: CloudFormation Stack to Create an AWS Managed Elastic Service using Kinesis Streaming Services.
    
    Parameters:
      LogBucketName:
        Type: String
        Description: Name of Amazon S3 bucket for log [a-z][a-z0-9]*
    
      KinesisStreamName:
        Type: String
        Description: Name of Kinesis Stream Name for log [a-z][a-z0-9]*
    
      ElasticsearchDomainName:
        Type: String
        Description: Name of Elasticsearch domain for log [a-z][a-z0-9]*
    
      ElasticsearchIndexName:
        Type: String
        Description: Name of Elasticsearch index from Kinesis Firehose [a-z][a-z0-9]*
        
      FirehoseName:
        Type: String
        Description: DeliveryStream for ES and S3 [a-z][a-z0-9]*
    Here are the parameters explained
    • LogBucketName: One need to feed the name of the S3 bucket name, that will be used to keep failed records and logs while ingesting data to elasticsearch domain from Amazon Kinesis Firehose stream.
    • ElasticsearchDomainName: Creation of AWS Elasticsearch starts with creation of domain within it, so that in case we wish to manage multiple elasticsearch services it could be identified as a separate domain.
    • ElasticsearchIndexName: Name of the Index, it will be used later while configuring indexes on Kibana dashboard.
    • KinesisStreamName: Name of the Kinesis Data Stream.
    • FirehoseName : Name of Kinesis Firehose data Stream.

    Resources:

    We will look into each resources one by one and at the end i will paste the entire resource section.

    KinesisDomainCreation
    Resources: 
      KinesisDomainCreation:
        Type: "AWS::Kinesis::Stream"
        Properties:
          Name: !Sub "${KinesisStreamName}"
          ShardCount: 5
    Here are the resources explained for "KinesisDomainCreation"
    • Type: "AWS::Kinesis::Stream"  : Creates a Kinesis stream that captures and transports data records that are emitted from data sources.
    • Name: !Sub "${KinesisStreamName}" : Kinesis data stream name, that will be replaced by one of our above defined parameters "KinesisStreamName"
    • ShardCount: 5The number of shards that the stream uses. For greater provisioned throughput, increase the number of shards.
    ElasticsearchDomain
    This resource section is responsible for ElasticSearch domain configuration along with it's underlying servers used for elastcsearch.
    ElasticsearchDomain:
        Type: AWS::Elasticsearch::Domain
        Properties:
          DomainName: !Sub "${ElasticsearchDomainName}"
          ElasticsearchVersion: '6.8'
          ElasticsearchClusterConfig:
            InstanceCount: '1'
            InstanceType: t2.small.elasticsearch
          EBSOptions:
            EBSEnabled: 'true'
            Iops: 0
            VolumeSize: 10
            VolumeType: gp2
          SnapshotOptions:
            AutomatedSnapshotStartHour: '0'
          AccessPolicies:
            Version: 2012-10-17
            Statement:
            - Effect: Allow
              Principal:
                AWS: '*' # Need to be replaced with appropriate value
              Action: es:*
              Resource: '*' # Need to be replaced with appropriate value
              #Resource: !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/*"
          AdvancedOptions:
            rest.action.multi.allow_explicit_index: 'true'
    Here are the resources explained for "ElasticsearchDomain"
    • Type: AWS::Elasticsearch::Domain  : The AWS::Elasticsearch::Domain resource creates an Amazon Elasticsearch Service (Amazon ES) domain that encapsulates the Amazon ES engine instances.
    • DomainName: !Sub "${ElasticsearchDomainName}" : Elastic search domain name, that will be replaced by one of our above defined parameters "ElasticsearchDomainName"
    • ElasticsearchVersion: '6.8' : Elastic Search Version.
    • ElasticsearchClusterConfig : This section contains the EC2 instances properties that will be used to create elasticsearch services.
    • EBSOptions : Volume type and it's proerties will be defined within this section.
    • SnapshotOptions : Snapshot properties for used Elasticsearch EC2 instances.
    • AccessPolicies : Policies defined for access.

    ESDeliverystream
    This resource section is responsible to create resources at Amazon Kinesis Firehose and configure it to send data to above created elasticsearch domain.
    ESDeliverystream:
        Type: AWS::KinesisFirehose::DeliveryStream
        DependsOn:
          - ElasticsearchDomain
          - DeliveryRole
          - DeliveryPolicy
        Properties:
          DeliveryStreamName: !Sub "${FirehoseName}"
          DeliveryStreamType: KinesisStreamAsSource
          KinesisStreamSourceConfiguration:
            KinesisStreamARN: !GetAtt KinesisDomainCreation.Arn
            RoleARN: !GetAtt DeliveryRole.Arn
          ElasticsearchDestinationConfiguration:
            BufferingHints:
              IntervalInSeconds: 60
              SizeInMBs: 1
            CloudWatchLoggingOptions: 
                Enabled: false
            DomainARN: !GetAtt ElasticsearchDomain.DomainArn
            IndexName: "demoLogs"
            IndexRotationPeriod: "NoRotation" # NoRotation, OneHour, OneDay, OneWeek, or OneMonth.
            TypeName: "fromFirehose"
            RetryOptions:
              DurationInSeconds: 60
            RoleARN: !GetAtt DeliveryRole.Arn
            S3BackupMode: FailedDocumentsOnly
            S3Configuration:
              BucketARN: !Sub "arn:aws:s3:::${LogBucketName}"
              BufferingHints:
                IntervalInSeconds: 60
                SizeInMBs: 1
              CompressionFormat: "UNCOMPRESSED"
              RoleARN: !GetAtt DeliveryRole.Arn 
              CloudWatchLoggingOptions: 
                Enabled: true
                LogGroupName: "deliverystream"
                LogStreamName: "s3Backup"
    Here are the resources explained for "ESDeliverystream:", 
    • Type: AWS::KinesisFirehose::DeliveryStreamn  : The AWS::KinesisFirehose::DeliveryStream resource creates an Amazon Kinesis Data Firehose (Kinesis Data Firehose) delivery stream that delivers real-time streaming data to Elasticsearch Service (Amazon ES) destination, within "Properties" section, we are defining Kinesis Firehose data stream name and Stream Source Type, which is kinesis data stream.
    • DependsOn : This is a predefined statement in AWS Cloudformation scripts, which ensure creation of resources before executing the current in lined resource, here it's basically ensuring that ElasticSearch domain and IAM role are created before creating a delivery stream.
    • ElasticsearchDestinationConfiguration : This section defines the delivery of firehose data to above created ElasticSearch Domain.
    DeliveryRole and DeliveryPolicy
    This resource section is responsible to create appropriate roles and policies required to READ-WRITE data from and to multiple AWS resources. 
    DeliveryRole:
        Type: 'AWS::IAM::Role'
        Properties:
          AssumeRolePolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: 'sts:AssumeRole'
                Principal:
                  Service:
                    - 'firehose.amazonaws.com'
                Condition:
                  StringEquals:
                    'sts:ExternalId' : !Ref 'AWS::AccountId'
          RoleName: "DeliveryRole"
    
      DeliveryPolicy:
        Type: 'AWS::IAM::Policy'
        Properties:
          PolicyName: "DeliveryPolicy"
          Roles:
            - !Ref "DeliveryRole"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:AbortMultipartUpload'
                  - 's3:GetBucketLocation'
                  - 's3:GetObject'
                  - 's3:ListBucket'
                  - 's3:ListBucketMultipartUploads'
                  - 's3:PutObject'
                  - 's3:PutObjectAcl'
                Resource:
                  - !Sub 'arn:aws:s3:::${LogBucketName}'
                  - !Sub 'arn:aws:s3:::${LogBucketName}/*'
              - Effect: Allow
                Action:
                  - 'es:DescribeElasticsearchDomain'
                  - 'es:DescribeElasticsearchDomains'
                  - 'es:DescribeElasticsearchDomainConfig'
                  - 'es:ESHttpPost'
                  - 'es:ESHttpPut'
                Resource:
                  - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}"
                  - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/*"
              - Effect: Allow
                Action:
                  - 'es:ESHttpGet'
                Resource:
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_all/_settings'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_cluster/stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/${ElasticsearchIndexName}*/_mapping/superstore'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_nodes'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_nodes/stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_nodes/*/stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/${ElasticsearchIndexName}*/_stats'
              - Effect: Allow
                Action:
                  - 'logs:PutLogEvents'
                Resource:
                  - !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/:log-stream:*'
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:GetRecords'
                Resource: !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%'
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:GetRecords'
                  - 'kinesis:CreateStream'
                Resource: !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStreamName}'
    
    LogBucket:

    This resource section is responsible to create S3 bucket, meant to keep failed logs.
    LogBucket:
        Type: 'AWS::S3::Bucket'
        Properties:
          BucketName: !Ref "LogBucketName"
          AccessControl: Private

    Once the Stack is Created, we need Kinesis agent installed on clients that will ship logs to AWS Kinesis data stream

    Installation and Configuration of Kinesis Agent:

    We are using amzon linux here as client to ship log data, install it using below command
    $ sudo yum install –y aws-kinesis-agent
    For Redhat/CentOS
    $ sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
    Open and edit kinesis agent config file and edit it as per your requirement, below is basic configuration.
    { 
       "flows": [
            { 
                "filePattern": "/tmp/you_app.log*", 
                "deliveryStream": "your-kinesis-deliverystreamname"
            } 
       ] 
    } 
    For more detailed option of configuration, please visit the official AWS link.

    Save and start the agent.
    $ sudo service aws-kinesis-agent start
    There are detailed multiple ways for preprocessing logs at kinesis agent, do look into attached official link and use the one that suits your log.

    Complete AWS Cloudformation Script.
    # Author : Jackuna (https://github.com/Jackuna)
    # Website : www.cyberkeeda.com
    AWSTemplateFormatVersion: 2010-09-09
    Description: CloudFormation Stack to Create an AWS Managed Elastic Service using Kinesis Streaming Services.
    
    Parameters:
      LogBucketName:
        Type: String
        Description: Name of Amazon S3 bucket for log [a-z][a-z0-9]*
    
      KinesisStreamName:
        Type: String
        Description: Name of Kinesis Stream Name for log [a-z][a-z0-9]*
    
      ElasticsearchDomainName:
        Type: String
        Description: Name of Elasticsearch domain for log [a-z][a-z0-9]*
    
      ElasticsearchIndexName:
        Type: String
        Description: Name of Elasticsearch index from Kinesis Firehose [a-z][a-z0-9]*
        
      FirehoseName:
        Type: String
        Description: DeliveryStream for ES and S3 [a-z][a-z0-9]*
    
    Resources: 
      KinesisDomainCreation:
        Type: "AWS::Kinesis::Stream"
        Properties:
          Name: !Sub "${KinesisStreamName}"
          ShardCount: 5
    
      ElasticsearchDomain:
        Type: AWS::Elasticsearch::Domain
        Properties:
          DomainName: !Sub "${ElasticsearchDomainName}"
          ElasticsearchVersion: '6.8'
          ElasticsearchClusterConfig:
            InstanceCount: '1'
            InstanceType: t2.small.elasticsearch
          EBSOptions:
            EBSEnabled: 'true'
            Iops: 0
            VolumeSize: 10
            VolumeType: gp2
          SnapshotOptions:
            AutomatedSnapshotStartHour: '0'
          AccessPolicies:
            Version: 2012-10-17
            Statement:
            - Effect: Allow
              Principal:
                AWS: '*' # Need to be replaced with appropriate value
              Action: es:*
              Resource: '*' # Need to be replaced with appropriate value
              #Resource: !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/*"
          AdvancedOptions:
            rest.action.multi.allow_explicit_index: 'true'
    
      ESDeliverystream:
        Type: AWS::KinesisFirehose::DeliveryStream
        DependsOn:
          - ElasticsearchDomain
          - DeliveryRole
          - DeliveryPolicy
        Properties:
          DeliveryStreamName: !Sub "${FirehoseName}"
          DeliveryStreamType: KinesisStreamAsSource
          KinesisStreamSourceConfiguration:
            KinesisStreamARN: !GetAtt KinesisDomainCreation.Arn
            RoleARN: !GetAtt DeliveryRole.Arn
          ElasticsearchDestinationConfiguration:
            BufferingHints:
              IntervalInSeconds: 60
              SizeInMBs: 1
            CloudWatchLoggingOptions: 
                Enabled: false
            DomainARN: !GetAtt ElasticsearchDomain.DomainArn
            IndexName: "demoLogs"
            IndexRotationPeriod: "NoRotation" # NoRotation, OneHour, OneDay, OneWeek, or OneMonth.
            TypeName: "fromFirehose"
            RetryOptions:
              DurationInSeconds: 60
            RoleARN: !GetAtt DeliveryRole.Arn
            S3BackupMode: FailedDocumentsOnly
            S3Configuration:
              BucketARN: !Sub "arn:aws:s3:::${LogBucketName}"
              BufferingHints:
                IntervalInSeconds: 60
                SizeInMBs: 1
              CompressionFormat: "UNCOMPRESSED"
              RoleARN: !GetAtt DeliveryRole.Arn 
              CloudWatchLoggingOptions: 
                Enabled: true
                LogGroupName: "deliverystream"
                LogStreamName: "s3Backup"
    
      DeliveryRole:
        Type: 'AWS::IAM::Role'
        Properties:
          AssumeRolePolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: 'sts:AssumeRole'
                Principal:
                  Service:
                    - 'firehose.amazonaws.com'
                Condition:
                  StringEquals:
                    'sts:ExternalId' : !Ref 'AWS::AccountId'
          RoleName: "DeliveryRole"
    
      DeliveryPolicy:
        Type: 'AWS::IAM::Policy'
        Properties:
          PolicyName: "DeliveryPolicy"
          Roles:
            - !Ref "DeliveryRole"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:AbortMultipartUpload'
                  - 's3:GetBucketLocation'
                  - 's3:GetObject'
                  - 's3:ListBucket'
                  - 's3:ListBucketMultipartUploads'
                  - 's3:PutObject'
                  - 's3:PutObjectAcl'
                Resource:
                  - !Sub 'arn:aws:s3:::${LogBucketName}'
                  - !Sub 'arn:aws:s3:::${LogBucketName}/*'
              - Effect: Allow
                Action:
                  - 'es:DescribeElasticsearchDomain'
                  - 'es:DescribeElasticsearchDomains'
                  - 'es:DescribeElasticsearchDomainConfig'
                  - 'es:ESHttpPost'
                  - 'es:ESHttpPut'
                Resource:
                  - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}"
                  - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/*"
              - Effect: Allow
                Action:
                  - 'es:ESHttpGet'
                Resource:
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_all/_settings'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_cluster/stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/${ElasticsearchIndexName}*/_mapping/superstore'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_nodes'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_nodes/stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_nodes/*/stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/_stats'
                  - !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${ElasticsearchDomainName}/${ElasticsearchIndexName}*/_stats'
              - Effect: Allow
                Action:
                  - 'logs:PutLogEvents'
                Resource:
                  - !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/:log-stream:*'
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:GetRecords'
                Resource: !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%'
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:GetRecords'
                  - 'kinesis:CreateStream'
                Resource: !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStreamName}'
    
      LogBucket:
        Type: 'AWS::S3::Bucket'
        Properties:
          BucketName: !Ref "LogBucketName"
          AccessControl: Private
    
    
    Do comment, i will be happy to help.

    Read more ...
    Designed By Jackuna