Using Amazon Kinesis Firehose to Store Data in S3

Although I don’t believe Amazon Kinesis will be on the AWS Certified Solutions Architect and Developer Exams, the flash cards I am using to help study for the exams contain a few questions on Kinesis so I thought I would experiment with the service a bit.

Amazon Kinesis offers 3 services and Firehose looks like the easiest to learn.

  • Streams - Process your streaming data. Ingest and process streaming data with custom applications.
  • Firehose - Capture, transform, and load streaming data. Deliver real-time streaming data to AWS destinations.
  • Analytics - Continuously run SQL queries on streaming data. Analyze and process streaming data from Firehose and Streams.

Amazon Firehose is all about getting data from point A to point B with optional transformation. My goal is to add data to a Firehose delivery stream, transform it in a Lambda Function, and store it in an S3 bucket.

Kinesis Firehose - Create Delivery Stream

Creating a Kinesis Firehose delivery stream in the AWS Console is simple and you have to specify a small amount of detail about 1) how the source records will be added to the stream, 2) if you need to transform the source records using a Lambda Function, and 3) the final destination.

For my example, I will be adding the records into the stream using the AWS CLI put-record option for Firehose. I will specify a Lambda Function that could transform the records, called Transform, but in reality I won’t change a thing. And finally, the destination of the records will be an Amazon S3 Bucket, called Logs.

Using a Lambda Function to Transform Records for Amazon Kinesis Firehose

Adding Records to Firehose Delivery Stream

Using the AWS CLI with an IAM Role with sufficient privileges, one can list the firehose delivery streams using the list-delivery-streams command. I called my stream Logs.

> aws firehose list-delivery-streams

{
    "DeliveryStreamNames": [
        "Logs"
    ],
    "HasMoreDeliveryStreams": false
}

Now we can add a sample record to the Firehose delivery stream using the put-record option. I recommend running this command several times.

> aws firehose put-record
     --delivery-stream-name Logs
     --record "{\"Data\": \"Some sample data.\n\"}"

{
    "RecordId": "..."
}

AWS Lambda to Transform Firehose Source Data

As mentioned earlier, I added a Lambda Function, called Transform, to the Firehose delivery stream to transform the data. I didn’t change the Lambda Function and left it as the default Python code.

import base64


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data'])

        # Do custom processing on the record payload here
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload)
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

CloudWatch Logs

After adding several records to the Firehose delivery stream at varying times, I visited CloudWatch and viewed the logs for the Transform Lambda Function. At some point I successfully processed 18 records.

START RequestId: a28ae... Version: $LATEST
Successfully processed 18 records.
END RequestId: a28ae...

There is also a CloudWatch log for the Logs Firehose delivery stream that will contain any errors that may have occurred during S3 delivery: /aws/kinesisfirehose/Logs.

That log is empty, which means all the data has been successfully delivered to S3.

S3 Bucket for Kinesis Firehose

The S3 bucket I specified as the destination for the data contains a number of log files with the sample data I added to the stream. This is stored in the S3 bucket using various prefixes by date, etc.

Amazon S3 > Logs / YYYY / MM / DD / YY

Logs-1-YYYY-MM-DD-YY-{ID}

Some sample data.
Some sample data.
Some sample data.
Some sample data.
Some sample data.
...

Achievement unlocked!

Contents