AWS Big Data Blog

HAQM DocumentDB zero-ETL integration with HAQM OpenSearch Service is now available

Today, we are announcing the general availability of HAQM DocumentDB (with MongoDB compatibility) zero-ETL integration with HAQM OpenSearch Service.

HAQM DocumentDB provides native text search and vector search capabilities. With HAQM OpenSearch Service, you can perform advanced search analytics, such as fuzzy search, synonym search, cross-collection search, and multilingual search, on HAQM DocumentDB data.

Zero-ETL integration simplifies your architecture for advanced search analytics. It frees you from performing undifferentiated heavy lifting tasks and the costs associated with building and managing data pipeline architecture and data synchronization between the two services.

In this post, we show you how to configure zero-ETL integration of HAQM DocumentDB with OpenSearch Service using HAQM OpenSearch Ingestion. It involves performing a full load of HAQM DocumentDB data and continuously streaming the latest data to HAQM OpenSearch Service using change streams. For other ingestion methods, see documentation.

Solution overview

At a high level, this solution involves the following steps:

  1. Enable change streams on the HAQM DocumentDB collections.
  2. Create the OpenSearch Ingestion pipeline.
  3. Load sample data on the HAQM DocumentDB cluster.
  4. Verify the data in OpenSearch Service.

Prerequisites

To implement this solution, you need the following prerequisites:

Zero-ETL will perform an initial full load of your collection by doing a collection scan on the primary instance of your HAQM DocumentDB cluster, which may take several minutes to complete depending on the size of the data, and you may notice elevated resource consumption on your cluster.

Enable change streams on the HAQM DocumentDB collections

HAQM DocumentDB change stream events comprise a time-ordered sequence of data changes due to inserts, updates, and deletes on your data. We use these change stream events to transmit data changes from the HAQM DocumentDB cluster to the OpenSearch Service domain.

Change streams are disabled by default; you can enable them at the individual collection level, database level, or cluster level. To enable change streams on your collections, complete the following steps:

  1. Connect to HAQM DocumentDB using mongo shell.
  2. Enable change streams on your collection with the following code. For this post, we use the HAQM DocumentDB database inventory and collection product:
    db.adminCommand({modifyChangeStreams: 1,
        database: "inventory",
        collection: "product", 
        enable: true});

If you have more than one collection for which you want to stream data into OpenSearch Service, enable change streams for each collection. If you want to enable it at the database or cluster level, see Enabling Change Streams.

It’s recommended to enable change streams for only the required collections.

Create an OpenSearch Ingestion pipeline

OpenSearch Ingestion is a fully managed data collector that delivers real-time log and trace data to OpenSearch Service domains. OpenSearch Ingestion is powered by the open source data collector Data Prepper. Data Prepper is part of the open source OpenSearch project.

With OpenSearch Ingestion, you can filter, enrich, transform, and deliver your data for downstream analysis and visualization. OpenSearch Ingestion is serverless, so you don’t need to worry about scaling your infrastructure, operating your ingestion fleet, and patching or updating the software.

For a comprehensive overview of OpenSearch Ingestion, visit HAQM OpenSearch Ingestion, and for more information about the Data Prepper open source project, visit Data Prepper.

To create an OpenSearch Ingestion pipeline, complete the following steps:

  1. On the OpenSearch Service console, choose Pipelines in the navigation pane.
  2. Choose Create pipeline.
  3. For Pipeline name, enter a name (for example, zeroetl-docdb-to-opensearch).
  4. Set up pipeline capacity for compute resources to automatically scale your pipeline based on the current ingestion workload.
  5. Input the minimum and maximum Ingestion OpenSearch Compute Units (OCUs). In this example, we use the default pipeline capacity settings of minimum 1 Ingestion OCU and maximum 4 Ingestion OCUs.

Each OCU is a combination of approximately 8 GB of memory and 2 vCPUs that can handle an estimated 8 GiB per hour. OpenSearch Ingestion supports up to 96 OCUs, and it automatically scales up and down based on your ingest workload demand.

  1. Choose the configuration blueprint and under Use case in the navigation pane, choose ZeroETL.
  2. Select Zero-ETL with DocumentDB to build the pipeline configuration.

This pipeline is a combination of a source part from the HAQM DocumentDB settings and a sink part for OpenSearch Service.

You must set multiple AWS Identity and Access Management (IAM) roles (sts_role_arn) with the necessary permissions to read data from the HAQM DocumentDB database and collection and write to an OpenSearch Service domain. This role is then assumed by OpenSearch Ingestion pipelines to make sure the right security posture is always maintained when moving the data from source to destination. To learn more, see Setting up roles and users in HAQM OpenSearch Ingestion.

You need one OpenSearch Ingestion pipeline per HAQM DocumentDB collection.

version: "2"
documentdb-pipeline:
  source:
    documentdb:
      acknowledgments: true
      host: "<<docdb-2024-01-03-20-31-17.cluster-abcdef.us-east-1.docdb.amazonaws.com>>"
      port: 27017
      authentication:
        username: ${{aws_secrets:secret:username}}
        password: ${{aws_secrets:secret:password}}
      aws:
        sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
      
      s3_bucket: "<<bucket-name>>"
      s3_region: "<<bucket-region>>" 
      # optional s3_prefix for Opensearch ingestion to write the records
      # s3_prefix: "<<path_prefix>>"
      collections:
        # collection format: <databaseName>.<collectionName>
        - collection: "<<databaseName.collectionName>>"
          export: true
          stream: true
  sink:
    - opensearch:
        # REQUIRED: Provide an AWS OpenSearch endpoint
        hosts: [ "<<http://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com>>" ]
        index: "<<index_name>>"
        index_type: custom
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        # DocumentDB record creation or event timestamp
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          # REQUIRED: Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
          # Provide the region of the domain.
          region: "<<us-east-1>>"
          # Enable the 'serverless' flag if the sink is an HAQM OpenSearch Serverless collection
          # serverless: true
          # serverless_options:
            # Specify a name here to create or update network policy for the serverless collection
            # network_policy_name: "network-policy-name"
          
extension:
  aws:
    secrets:
      secret:
        # Secret name or secret ARN
        secret_id: "<<my-docdb-secret>>"
        region: "<<us-east-1>>"
        sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
        refresh_interval: PT1H 

Provide the following parameters from the blueprint:

  • HAQM DocumentDB endpoint – Provide your HAQM DocumentDB cluster endpoint.
  • HAQM DocumentDB collection – Provide your HAQM DocumentDB database name and collection name in the format dbname.collection within the col­­­­­lections section. For example, inventory.product.
  • s3_bucket – Provide your S3 bucket name along with the AWS Region and S3 prefix. This will be used temporarily to hold the data from HAQM DocumentDB for data synchronization.
  • OpenSearch hosts – Provide the OpenSearch Service domain endpoint for the host and provide the preferred index name to store the data.
  • secret_id – Provide the ARN for the secret for the HAQM DocumentDB cluster along with its Region.
  • sts_role_arn – Provide the ARN for the IAM role that has permissions for the HAQM Document DB cluster, S3 bucket, and OpenSearch Service domain.

To learn more, see Creating HAQM OpenSearch Ingestion pipelines.

  1. After entering all the required values, validate the pipeline configuration for any errors.
  2. When designing a production workload, deploy your pipeline within a VPC. Choose your VPC, subnets, and security groups. Also select Attach to VPC and choose the corresponding VPC CIDR range.

The security group inbound rule should have access to the HAQM DocumentDB port. For more information, refer to Securing HAQM OpenSearch Ingestion pipelines within a VPC.

Load sample data on the HAQM DocumentDB cluster

Complete the following steps to load the sample data:

  1. Connect to your HAQM DocumentDB cluster.
  2. Insert some documents into the collection product in the inventory database by running the following commands. For creating and updating documents on HAQM DocumentDB, refer to Working with Documents.
    use inventory;
    
     db.product.insertMany([
       {
          "Item":"Ultra GelPen",
          "Colors":[
             "Violet"
          ],
          "Inventory":{
             "OnHand":100,
             "MinOnHand":35
          },
          "UnitPrice":0.99
       },
       {
          "Item":"Poster Paint",
          "Colors":[
             "Red",
             "Green",
             "Blue",
             "Black",
             "White"
          ],
          "Inventory":{
             "OnHand":47,
             "MinOnHand":50
          }
       },
       {
          "Item":"Spray Paint",
          "Colors":[
             "Black",
             "Red",
             "Green",
             "Blue"
          ],
          "Inventory":{
             "OnHand":47,
             "MinOnHand":50,
             "OrderQnty":36
          }
       }
    ])

Verify the data in OpenSearch Service

You can use the OpenSearch Dashboards dev console to search for the synchronized items within a few seconds. For more information, see Creating and searching for documents in HAQM OpenSearch Service.

To verify the change data capture (CDC), run the following command to update the OnHand and MinOnHand fields for the existing document item Ultra GelPen in the product collection:

db.product.updateOne({
   "Item":"Ultra GelPen"
},
{
   "$set":{
      "Inventory":{
         "OnHand":300,
         "MinOnHand":100
      }
   }
});

Verify the CDC for the update to the document for the item Ultra GelPen on the OpenSearch Service index.

Monitor the CDC pipeline

You can monitor the state of the pipelines by checking the status of the pipeline on the OpenSearch Service console. Additionally, you can use HAQM CloudWatch to provide real-time metrics and logs, which lets you set up alerts in case of a breach of user-defined thresholds.

Clean up

Make sure you clean up unwanted AWS resources created during this post in order to prevent additional billing for these resources. Follow these steps to clean up your AWS account:

  1. On the OpenSearch Service console, choose Domains under Managed clusters in the navigation pane.
  2. Select the domain you want to delete and choose Delete.
  3. Choose Pipelines under Ingestion in the navigation pane.
  4. Select the pipeline you want to delete and on the Actions menu, choose Delete.
  5. On the HAQM S3 console, select the S3 bucket and choose Delete.

Conclusion

In this post, you learned how to enable zero-ETL integration between HAQM DocumentDB change data streams and OpenSearch Service. To learn more about zero-ETL integrations available with other data sources, see Working with HAQM OpenSearch Ingestion pipeline integrations.


About the Authors

Praveen Kadipikonda is a Senior Analytics Specialist Solutions Architect at AWS based out of Dallas. He helps customers build efficient, performant, and scalable analytic solutions. He has worked with building databases and data warehouse solutions for over 15 years.

Kaarthiik Thota is a Senior HAQM DocumentDB Specialist Solutions Architect at AWS based out of London. He is passionate about database technologies and enjoys helping customers solve problems and modernize applications using NoSQL databases. Before joining AWS, he worked extensively with relational databases, NoSQL databases, and business intelligence technologies for over 15 years.

Muthu Pitchaimani is a Search Specialist with HAQM OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics o f networking and security, and is based out of Austin, Texas.