AWS Compute Blog

Indexing HAQM DynamoDB Content with HAQM Elasticsearch Service Using AWS Lambda

September 8, 2021: HAQM Elasticsearch Service has been renamed to HAQM OpenSearch Service. See details.

Stephan Hadinger
Sr Mgr, Solutions Architecture

Mathieu Cadet Account Representative

NOTE: It was recently brought to our attention that this post contains instructions that reference a now deprecated Lambda blueprint. We are in the process of updating this post to correct this.

A lot of AWS customers have adopted HAQM DynamoDB for its predictable performance and seamless scalability. The main querying capabilities of DynamoDB are centered around lookups using a primary key. However, there are certain times where richer querying capabilities are required. Indexing the content of your DynamoDB tables with a search engine such as Elasticsearch would allow for full-text search.

In this post, we show how you can send changes to the content of your DynamoDB tables to an HAQM Elasticsearch Service (HAQM ES) cluster for indexing, using the DynamoDB Streams feature combined with AWS Lambda.

 

Architectural overview

Here’s a high-level overview of the architecture:

DynamoDB Streams to Elasticsearch bridge

We’ll cover the main steps required to put this bridge in place:

  1. Choosing the DynamoDB tables to index and enabling DynamoDB Streams on them.
  2. Creating an IAM role for accessing the HAQM ES cluster.
  3. Configuring and enabling the Lambda blueprint.

 

Choosing the DynamoDB table to index

In this post, you look at indexing the content of a product catalog in order to provide full-text search capabilities. You’ll index the content of a DynamoDB table called all_products, which is acting as the catalog of all products.

Here’s an example of an item stored in that table:

{
  "product_id": "B016JOMAEE",
  "name": "Serverless Single Page Apps: Fast, Scalable, and Available",
  "category": "ebook",
  "description": "AWS Lambda - A Guide to Serverless Microservices
                  takes a comprehensive look at developing 
                  serverless workloads using the new
                  HAQM Web Services Lambda service.",
  "author": "Matthew Fuller",
  "price": 15.0,
  "rating": 4.8
}

Enabling DynamoDB Streams

In the DynamoDB console, enable the DynamoDB Streams functionality on the all_products table by selecting the table and choosing Manage Stream.

Enabling DynamoDB Streams

Multiple options are available for the stream. For this use case, you need new items to appear in the stream; choose either New image or New and old images. For more information, see Capturing Table Activity with DynamoDB Streams.

DynamoDB Streams Options

After the stream is set up, make a good note of the stream ARN. You’ll need that information later, when configuring the access permissions.

Finding a DynamoDB Stream ARN

Creating a new IAM role

The Lambda function needs read access to the DynamoDB stream just created. In addition, the function also requires access to the HAQM ES cluster to submit new records for indexing.

In the AWS Identity and Access Management (IAM) console, create a new role for the Lambda function and call it ddb-elasticsearch-bridge.

Creating new IAM role

As this role will be used by the Lambda function, choose AWS Lambda from the AWS Service Roles list.

Attaching policy to the role

On the following screens, choose the AWSLambdaBasicExecutionRole managed policy, which allows the Lambda function to send logs to HAQM CloudWatch Logs.

Configuring access to the HAQM ES cluster

First, you need a running HAQM ES cluster. In this example, create a search domain called inventory. After the domain has been created, note its ARN:

Attaching policy to the role

In the IAM console, select the ddb-elasticsearch-bridge role created earlier and add two inline policies to that role:

Attaching policy to the role

Here’s the policy to add to allow the Lambda code to push new documents to HAQM ES (replace the resource ARN with the ARN of your HAQM ES cluster):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:ESHttpPost"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:es:us-east-1:0123456789:domain/inventory/*"
        }
    ]
}

Important: you need to add /* to the resource ARN as depicted above.

Next, add a second policy for read access to the DynamoDB stream (replace the resource ARN with the ARN of your DynamoDB stream):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:dynamodb:us-east-1:0123456789:table/all_products/stream/2016-02-16T23:13:07.600"
            ]
        }
    ]
}

Enabling the Lambda blueprint

When you log into the Lambda console and choose Create a Lambda Function, you are presented with a list of blueprints to use. Select the blueprint called dynamodb-to-elasticsearch.

dynamodb-to-elasticsearch blueprint

Next, select the DynamoDB table all_products as the event source:

Lambda event source

Then, customize the Lambda code to specify the Elasticsearch endpoint:

Customizing the blueprint

Finally, select the ddb-elasticsearch-bridge role created earlier to give the Lambda function the permissions required to interact with DynamoDB and the HAQM ES cluster:

Choosing a role

Testing the result

You’re all set!

After a few records have been added to your DynamoDB table, you can go back to the HAQM ES console and validate that a new index for your items has been automatically created:

HAQM ES indices

Playing with Kibana (Optional)

Elasticsearch is commonly used with Kibana for visual exploration of data.

To start querying the indexed data, create an index pattern in Kibana. Use the name of the DynamoDB table as an index pattern:

Kibana Index pattern

Kibana automatically determines the best type for each field:

Kibana Index pattern

Use a simple query to search the product catalog for all items in the category book containing the word aws in any field:

Kibana Index pattern

Other considerations

Indexing pre-existing content

The solution presented earlier is ideal to ensure that new data is indexed as soon it is added to the DynamoDB table. But what about pre-existing data stored in the table?

Luckily, the Lambda function used earlier can also be used to process data from an HAQM Kinesis stream, as long as the format of the data is similar to the DynamoDB Streams records.

Provided that you have an HAQM Kinesis stream set up as an additional input source for the Lambda code above, you can use the (very naive) sample Python3 code below to read the entire content of a DynamoDB table and push it to an HAQM Kinesis stream called ddb-all-products for indexing in HAQM ES.

import json
import boto3
import boto3.dynamodb.types

# Load the service resources in the desired region.
# Note: AWS credentials should be passed as environment variables
# or through IAM roles.
dynamodb = boto3.resource('dynamodb', region_name="us-east-1")
kinesis = boto3.client('kinesis', region_name="us-east-1")

# Load the DynamoDB table.
ddb_table_name = "all_products"
ks_stream_name = "ddb-all-products"
table = dynamodb.Table(ddb_table_name)

# Get the primary keys.
ddb_keys_name = [a['AttributeName'] for a in table.attribute_definitions]

# Scan operations are limited to 1 MB at a time.
# Iterate until all records have been scanned.
response = None
while True:
    if not response:
        # Scan from the start.
        response = table.scan()
    else:
        # Scan from where you stopped previously.
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])

    for i in response["Items"]:
        # Get a dict of primary key(s).
        ddb_keys = {k: i[k] for k in i if k in ddb_keys_name}
        # Serialize Python Dictionaries into DynamoDB notation.
        ddb_data = boto3.dynamodb.types.TypeSerializer().serialize(i)["M"]
        ddb_keys = boto3.dynamodb.types.TypeSerializer().serialize(ddb_keys)["M"]
        # The record must contain "Keys" and "NewImage" attributes to be similar
        # to a DynamoDB Streams record. Additionally, you inject the name of
        # the source DynamoDB table in the record so you can use it as an index
        # for HAQM ES.
        record = {"Keys": ddb_keys, "NewImage": ddb_data, "SourceTable": ddb_table_name}
        # Convert the record to JSON.
        record = json.dumps(record)
        # Push the record to HAQM Kinesis.
        res = kinesis.put_record(
            StreamName=ks_stream_name,
            Data=record,
            PartitionKey=i["product_id"])
        print(res)

    # Stop the loop if no additional records are
    # available.
    if 'LastEvaluatedKey' not in response:
        break

Note: In the code example above, you are passing the name of the source DynamoDB table as an extra record attribute SourceTable. The Lambda function uses that attribute to build the HAQM ES index name. Another approach for passing that information is tagging the HAQM Kinesis stream.

Now, create the HAQM Kinesis stream ddb-all-productsand then add permissions to the ddb-elasticsearch-bridge role in IAM to allow the Lambda function to read from the stream:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:Get*",
                "kinesis:DescribeStream"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:0123456789:stream/ddb-all-products"
            ]
        }
    ]
}

Finally, set the HAQM Kinesis stream as an additional input source to the Lambda function:

HAQM Kinesis input source

Neat tip: Doing a full re-index of the content this way will not create duplicate entries in HAQM ES.

Paying attention to attribute types

With DynamoDB, you can use different types for the same attribute on different records, but HAQM ES expects a given attribute to be of only one type. Similarly, changing the type of an existing attribute after it has been indexed in HAQM ES causes problems and some searches won’t work as expected.

In these cases, you must rebuild the HAQM ES index. For more information, see Reindexing Your Data in the Elasticsearch documentation.

Conclusion

In this post, you have seen how you can use AWS Lambda with DynamoDB to index your table content in HAQM ES as changes happen.

Because you are relying entirely on Lambda for the business logic, you don’t have to deal with servers at any point: everything is managed by the AWS platform in a highly available and scalable fashion. To learn more about Lambda and serverless infrastructures, see the Microservices without the Servers blog post.

Now that you have added full-text search to your DynamoDB table, you might be interested in exposing its content through a small REST API. For more information, see Using HAQM API Gateway as a proxy for DynamoDB.