Using DynamoDB Streams to Trigger Lambda Function

When you create an Amazon DynamoDB Table there is an option to Manage Stream. Changes made to the table can be captured via the stream, and these changes can be used to trigger a Lambda Function.

There are currently 4 options for specifying how records are emitted to the stream.

  • Keys only - only the key attributes of the modified item
  • New image - the entire item, as it appears after it was modified
  • Old image - the entire item, as it appeared before it was modified
  • New and old images - both the new and the old images of the item

When you create your Lambda Function you also need to give it privileges to read from the DynamoDB streams in addition to writing CloudWatch Logs. IAM has a role already configured with such policies.

  • AWSLambdaDynamoDBExecutionRole - provides list and read access to DynamoDB streams and write permissions to CloudWatch logs.

Processing New Books

I created an example where I have a DynamoDB books table and I want to capture new books being inserted into the table. The table has 3 pieces of information about each book, where id is the partition key for the table.

{
    "id" : "9780618640157",
    "title": "The Lord of the Rings",
    "author": "J.R.R. Tolkien"
}

I created a new stream for the table and used it as a trigger for a Lambda Function. The best way to get started with a new Lambda Function that uses a DynamoDB Stream as a trigger is to use one of the blueprints. In my case I wanted to use Python 3.

  • dynamodb-process-stream-python3 - An Amazon DynamoDB trigger that logs the updates made to a table.

Below is Python code that looks for only new books inserted into the DynamoDB table and logs them.

import json
import logging


logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            book = {
                'id': record['dynamodb']['NewImage']['id']['S'],
                'title': record['dynamodb']['NewImage']['title']['S'],
                'author': record['dynamodb']['NewImage']['author']['S']
            }
            logger.info('Book: ' + json.dumps(book, indent=2))

Although I am only logging the new books, one can easily publish the information to SNS or SQS as well as execute a Step Function.

Contents