AWS Database Blog

Stream data with HAQM DocumentDB, HAQM MSK Serverless, and HAQM MSK Connect

A common trend in modern application development and data processing is the use of Apache Kafka as a standard delivery mechanism for data pipeline and fan-out approach. HAQM Managed Streaming for Apache Kafka (HAQM MSK) is a fully-managed, highly available, and secure service that makes it simple for developers and DevOps managers to run applications on Apache Kafka in the AWS Cloud without needing Apache Kafka infrastructure management expertise.

Document databases like HAQM DocumentDB (with MongoDB compatibility) are increasing in usage as developers and application owners prefer the schema-less flexibility of JSON schema in modern application developments. HAQM DocumentDB is a scalable, durable, and fully-managed database service for operating mission-critical MongoDB workloads. Increasingly, customers are using HAQM MSK with HAQM DocumentDB for various use cases.

In this post, we discuss how to run and configure the open-source MongoDB Kafka connector to move data between HAQM MSK and HAQM DocumentDB.

HAQM DocumentDB can act as both the data sink and data source to HAQM MSK in different use cases.

HAQM DocumentDB as a data sink

The following are example use cases in which you can use HAQM DocumentDB as a data sink behind HAQM MSK:

  • Streaming data for live video streaming or flash sale events: In a large video streaming or flash sale event, high volume data generated relating to viewers’ reactions or a buyer’s clickstream can be fed to HAQM MSK as raw data. You can further stream this data to HAQM DocumentDB for downstream processing and aggregation.
  • Streaming telemetry data from IoT devices or website hit data: For streaming of telemetry data from Internet of Things (IoT) devices, website hit data, or meteorological data, the data can be streamed into HAQM DocumentDB using the connector and then processed (such as aggregation or min/max calculation).
  • Record replay or application recovery in HAQM DocumentDB: In the HAQM DocumentDB cluster, rather than restoring the whole backup, the application can replay specific item-level changes from HAQM MSK to the HAQM DocumentDB collection.

HAQM DocumentDB as a data source

The following are example use cases in which you can send HAQM DocumentDB change streams to HAQM MSK:

  • Replication of data to other HAQM DocumentDB cluster or data stores: HAQM MSK can be used as an intermediate layer for selective replication of collections from one HAQM DocumentDB cluster to another cluster or other data stores.
  • Moving data out for Advance Analytics and Machine Learning: HAQM DocumentDB offers a rich aggregation framework, but for advanced analytics and machine learning (ML), you can create a data pipeline from HAQM DocumentDB to various other data stores. You can use HAQM MSK as an intermediate layer to modify and filter change events before loading them to the target data store.

The MongoDB Kafka connector can act in either use case to transfer data between HAQM DocumentDB and HAQM MSK.

Solution overview

MSK Connect is a feature of HAQM MSK that makes it simple to deploy, monitor, and automatically scale connectors that move data between Apache Kafka clusters and external systems such as data stores like HAQM DocumentDB, file systems, and search indexes.

In this post, we use the MongoDB Kafka connector running on MSK Connect to move the changes to and from HAQM DocumentDB to HAQM MSK.

With MSK Connect, you don’t need to provision infrastructure to run connectors. MSK Connect provides a serverless experience and scales the number of workers up and down, so you don’t have to provision servers or clusters, and you only pay for what you need to move your streaming data to and from your MSK Kafka cluster. With the auto-scaling option that MSK Connect offers, it scales the workers depending on the CPU utilization of the workloads.

We divided this post into two main sections:

  • HAQM DocumentDB as a sink – In the first section of this post, we discuss data delivery to HAQM DocumentDB via HAQM MSK using the connector.
  • HAQM DocumentDB as a source – In the second section of this post, we cover pulling data from HAQM DocumentDB using the same connector and publishing it to a Kafka topic for a downstream Kafka consumer.

The following diagram illustrates the architecture and data flow:

Prerequisites

To follow along with this post, you need the following resources and configurations:

  • An HAQM DocumentDB cluster.
  • An MSK Serverless cluster.
  • An HAQM Elastic Compute Cloud (HAQM EC2) instance with Mongo shell and Java configured.
  • An HAQM Simple Storage Service (HAQM S3) bucket to store the connector plugin and JVM truststore file.
  • A custom plugin using the MongoDB Kafka connector and HAQM MSK config providers.
  • A customer-managed policy and role for MSK Connect.
  • A role for the EC2 instance.
  • A trust store for JVM to connect HAQM DocumentDB from MSK Connect.
  • Gateway endpoints for MSK Connect to access the trust store on HAQM S3.

You will incur costs in your account related to the HAQM DocumentDB, HAQM MSK, and HAQM EC2 resources. You can use the AWS Pricing Calculator to estimate the cost based on your configuration.

Complete the steps in this section to create these resources.

An HAQM DocumentDB cluster

You can use an existing instance-based cluster or create a new HAQM DocumentDB instance cluster.

You can also use HAQM DocumentDB elastic cluster for a Sink use case.

An HAQM MSK cluster

You can use an existing MSK Serverless cluster or create a new MSK Serverless cluster using the quick create option. The cluster should be deployed to the same VPC as your HAQM DocumentDB cluster and configured with the same security group used for HAQM DocumentDB. Your cluster should also have the following configurations:

For MSK Serverless clusters, IAM role-based authentication is default. With IAM role-based authentication, TLS is enabled automatically.

An HAQM EC2 instance with Mongo Shell and Java Configured.

You can choose an existing HAQM EC2 instance or configure a new one. We use this EC2 instance for testing purposes. Your instance should have the following configurations:

  1. Deploy the instance in the same VPC of your HAQM DocumentDB cluster and MSK cluster with the same security group.
  2. Configure the instance security group to connect to and from the MSK cluster (port 9098) and HAQM DocumentDB cluster (port 27017)
  3. You need to install the mongo shell on the EC2 instance. For instructions, refer to Install the mongo shell.
  4. Install Java on the EC2 instance:
sudo yum install java-11-amazon-corretto-headless -y

An HAQM S3 Bucket

You will need an HAQM S3 bucket to store the connector plugin and JVM truststore file. You can use an existing S3 bucket or create a new bucket. You need to make sure the S3 bucket access policies are properly configured as following. Update your HAQM S3 bucket name and vpc_id (where you created HAQM MSK, HAQM DocumentDB) in the policy. With HAQM S3 bucket policies, you can secure access to objects in your buckets so that only users/resources with the appropriate permissions can access them.

{
    "Version": "2012-10-17",
    "Id": "Access-to-bucket-using-specific-VPC",
    "Statement": [
        {
            "Sid": "Access-to-specific-VPC-only",
            "Effect": "Allow",
            "Principal": "*",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::<HAQM S3 Bucket>",
                "arn:aws:s3:::<HAQM S3 Bucket>/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:sourceVpc": "<vpc-id>"
                }
            }
        }
    ]
} 

Create a custom plugin using the MongoDB Kafka connector

A plugin contains the code that defines the logic of the connector. You need to create a custom plugin in HAQM MSK using the Mongodb Kafka connector. When you create the MSK Connect connector later, you need to specify it.

Apache Kafka config providers integrate your connector with other systems like HAQM S3 for storing the trust store file, AWS Systems Manager Parameter Store for storing the trust store password, and AWS Secrets Manager to store the HAQM DocumentDB user name, password, and other credentials.

In this post, you are going to store the mongodb kafka connector and trust store certificate on HAQM S3 bucket which you created in the previous step. You need config providers to access HAQM S3 from MSK connect.

Open a terminal, login to the EC2 instance, and complete the following steps:

  1. Create the directory structure as follows:
docdb-connector
├── mongo-connector
│ └── <MONGODB-CONNECTOR-ALL>.jar
├── msk-config-providers
│ └── <MSK CONFIG PROVIDERS>
mkdir -p ~/docdb-connector
mkdir -p ~/docdb-connector/mongo-connector
mkdir -p ~/docdb-connector/msk-config-providers

Copy the connector JAR in the ~/docdb-connector/mongo-connector directory and the MSK config provider .zip file in ~/docdb-connector/msk-config-providers.

  1. Download the MongoDB Kafka connector JAR v. 1.10 or later from GitHub:
cd ~/docdb-connector/mongo-connector
wget http://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.10.0/mongo-kafka-connect-1.10.0-all.jar
  1. Download the MSK config provider .zip file and unzip it:
cd ~/docdb-connector/msk-config-providers
wget http://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip
unzip msk-config-providers-0.1.0-with-dependencies.zip
rm msk-config-providers-0.1.0-with-dependencies.zip
  1. Combine both JAR files and create a .zip file:
cd ~;zip -r docdb-connector-plugin.zip docdb-connector
  1. Before you create the custom MSK plugin, upload docdb-connector-plugin.zip to the S3 bucket you created in the previous step. You can upload it from the command line (see the following code) or using the HAQM S3 console.
cd ~;aws s3 cp docdb-connector-plugin.zip s3://<HAQM S3 Bucket>;

Now you can create the custom plugin for MSK Connect with the following steps:

  1. On the HAQM MSK console, choose Custom plugins in the navigation pane and choose Create custom plugin.
  2. Provide the S3 URI where you uploaded the connector plugin.
  3. Enter a name for the plugin.
  4. Choose Create custom plugin.

docdb-connector-plugin will be active and ready for creating the connector.

Create a customer-managed policy and role for MSK Connect

Create a customer managed policy to access the MSK Serverless cluster from MSK Connect and EC2 instance. Update your Region and account_id in the policy. The Region should be same as where you provisioned your HAQM DocumentDB cluster, MSK cluster, and EC2 instance.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "kafka-cluster:*",
            "Resource": "arn:aws:kafka:::*/*/*"
        }
    ]
}

Now, create an IAM role with the preceding policy and also attach the AWS managed HAQM S3 read only access policy to this role (because MSK Connect needs to access the HAQM DocumentDB trust store certificate from HAQM S3).

Replace/Add the following trust policy to the IAM role so that MSK Connect can assume it:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kafkaconnect.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Create a role for the EC2 instance

Use the above customer-managed policy for MSK Serverless Cluster to access the MSK from the EC2 instance and create an IAM role and assign it to EC2 instance. We used the EC2 instance for testing purposes.

Create a trust store for JVM

The HAQM DocumentDB cluster is by default SSL/TLS enabled and the Kafka connector runs with Java Virtual Machine (JVM), so you need to create a trust store with a password. For instructions, refer to Connecting Programmatically to HAQM DocumentDB. Create a local directory and copy your trust store file (rds-truststore.jks). If you followed the steps to create the trust store correctly, the file will be located in /tmp/certs.

Copy the trust store file to the S3 bucket; the connector uses this file to connect to HAQM DocumentDB. You can use the same S3 bucket where you stored the connector plugin. See the following code:

cp /tmp/certs/rds-truststore.jks ~
cd ~;aws s3 cp rds-truststore.jks s3://<HAQM S3 Bucket>

Create Gateway endpoints for HAQM S3 to access the trust store

Because the trust store is stored in HAQM S3, you need to configure a gateway VPC endpoint for HAQM S3 so the connector can pull the trust store from HAQM S3.

HAQM DocumentDB as a sink

In this part of the post, we focus on the sink use case, as shown in the following diagram. We discuss how to create and run the connector (using MSK Connect) and use HAQM DocumentDB as a sink database to move data from the MSK Kafka topic, which is generated by a Kafka producer.

The configuration steps are as follows:

  1. Configure the connector as an HAQM DocumentDB sink connector.
  2. Test the MongoDB Kafka connector with HAQM DocumentDB as a sink.

Configure the connector as an HAQM DocumentDB sink connector

Complete the following steps:

  1. On the HAQM MSK console, choose Connectors in the navigation pane and choose Create connector.

  1. Select the custom plugin that you created in the prerequisite steps, then choose Next.

  1. Provide the connector name in basic information.
  2. Select the MSK Serverless Cluster with IAM authentication.

  1. In the connector configuration, enter the following configurations. Update the HAQM DocumentDB login name, password, cluster endpoint, port of your cluster, region name, S3 bucket name and truststore password.
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=sink-topic
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.tolerance=all
# Connection String with Plain text secrets and cluster domain details:
connection.uri=mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&readPreference=secondaryPreferred&retryWrites=false
# Connection String with usage of AWS Secrets Manager:
#connection.uri=mongodb://${sm:/docdb/db1:username}:${sm:/docdb/db1:password}@${sm:/docdb/db1:host}:${sm:/docdb/db1:port}/?ssl=true&retryWrites=false
database=sinkdatabase
collection=sinkcollection
connection.ssl.truststore=${s3import:<regionname>:<s3-bucket-name>/rds-truststore.jks}
# Truststore password in PLAIN view:
connection.ssl.truststorePassword=<truststore_password>
# Truststore password using AWS System Manager Parameter Store:
#connection.ssl.truststorePassword=${ssm::/docdb/truststorePassword/caCertPass}
config.providers= s3import,ssm,sm
config.providers.s3import.class=com.amazonaws.kafka.config.providers.S3ImportConfigProvider
config.providers.s3import.param.region=<regionname>
#config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
#config.providers.ssm.param.region=<regionname>
#config.providers.sm.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
#config.providers.sm.param.region=<regionname>

The configuration contains the following details:

  • connector.class – The Java class for the connector. It’s the class responsible for moving data from Kafka.
  • tasks.max – The maximum number of tasks that should be created for this connector.
  • topics – The list of Kafka topics that this sink connector watches. The topic name is sink-topic.
  • key.converter – The converter class that instructs the connector on how to translate the key from Kafka serialized format. We use the string class converter.
  • value.converter – The converter class that instructs the connector on how to translate the value from Kafka serialized format. We have JSON data in our Kafka topic, so we configure Kafka Connect to use the JSON converter.
  • value.converter.schemas.enable – By default, the JSON converter is going to expect a JSON schema, but we set it as false because there isn’t any schema.
  • connection-uri – Defines the endpoint to connect to the HAQM DocumentDB cluster. We use an endpoint with the SSL option. Note that the HAQM DocumentDB cluster information is stored in the AWS Secrets Manager instead of plain text, and dynamically retrieved during connector creation or task creation and recovery. For more information, refer to Finding a Cluster’s Endpoints.
  • database – The target HAQM DocumentDB database. We use the database name sinkdb.
  • collection – The collection name in the database to push the changes. The collection name is sinkcollection.
  • connection.ssl.truststore – Defines the HAQM DocumentDB trust store file location. It is defined as S3 URI format with the bucket and file name.
  • connection.ssl.truststorePassword – You need to put the trust store password here in plain text. You can also put the password in Parameter Store and define the config providers.

config.providers – To integrate your Kafka connector with other systems like HAQM S3 for storing the trust store file, Parameter Store for storing the trust store password, and Secrets Manager to store the HAQM DocumentDB user name, password, and other details, you need the providers configuration. Here, you only need the HAQM S3 config provider to access the trust store.

  • config.providers– Name of the config provider. In this case, ‘s3’.
  • config.providers.s3import.class – S3 import Config provider Java Class.
  • config.providers.s3import.param.region – Config provider S3 bucket region.
  1. Choose the IAM role that you created to access the MSK cluster and HAQM S3, then choose Next.

  1. Select Deliver to HAQM CloudWatch Logs and enter the log delivery location for the connector.
  2. Wait for connector status to change to Running.

Test the MongoDB Kafka connector with HAQM DocumentDB as a sink

To test the connector, start a Kafka producer to push the changes to the Kafka topic -documentdb_topic. The Kafka connector reads the details from this topic and puts the details in HAQM DocumentDB based on the configuration.

  1. To run the local Kafka producer, you need to login to your EC2 instance and download the binary distribution of Apache Kafka and extract the archive in the local_kafka:
mkdir ~/local_kafka;cd ~/local_kafka/
cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts kafka_iam_truststore.jks
wget http://dlcdn.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
tar -xzf kafka_2.13-3.2.3.tgz
ln -sfn kafka_2.13-3.2.3 kafka
  1. To use IAM to authenticate with the MSK cluster, download the HAQM MSK Library for IAM and copy to the local Kafka library directory as shown in the following code. For complete instructions, refer to Configure clients for IAM access control.
wget http://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.3/aws-msk-iam-auth-1.1.3-all.jar
cp aws-msk-iam-auth-1.1.3-all.jar kafka/libs
  1. In the ~/local_kafka/kafka/config/ directory, create a client-config.properties file to configure a Kafka client to use IAM authentication for the Kafka console producer and consumers:
ssl.truststore.location=/home/ec2-user/local_kafka/kafka_iam_truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  1. Define the BOOTSTRAP_SERVERS environment variable to store the bootstrap servers of the MSK cluster and locally install Kafka in the path environment variable.
export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin
  1. Create the Kafka topic sink-topic, which you defined in the connector config:
cd ~/local_kafka/kafka/config
kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --partitions 1 --topic sink-topic --command-config client-config.properties
  1. Run the Kafka console producer to write into the MSK topic documentdb_topic and submit the valid JSON documents {"name":"DocumentDB NoSQL"} and {"test":"DocumentDB Sink Connector"}:
cd ~/local_kafka/kafka/config
kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --producer.config client-config.properties --topic sink-topic
{"name":"DocumentDB NoSQL"}
{"test":"DocumentDB Sink Connector"}
  1. Open a second terminal and connect to the HAQM DocumentDB cluster using the mongo shell. The preceding two JSON documents should be part of the sinkcollection collection in sinkdb:
use sinkdatabase
db.sinkcollection.find()

We get the following output:

{ "_id" : ObjectId("62c3cf2ec3d9010274c7a37e"), "name" : "DocumentDB NoSQL" }
{ "_id" : ObjectId("62c3d048c3d9010274c7a37f"), "test" : "DocumentDB Sink Connector" }

You should see the JSON document that we pushed using the console producer.

HAQM DocumentDB as the source

In this section, we discuss how to create and run the connector (using Docker containers) with the Kafka Connect framework, and use HAQM DocumentDB as the source database to move the collection changes to the MSK Kafka topic.

The following diagram illustrates this data flow:

Now you need to set up another connector for the source use case with the following steps:

  1. Configure HAQM DocumentDB for a change stream.
  2. Configure the connector as an HAQM DocumentDB source connector.
  3. Test the MongoDB Kafka connector with HAQM DocumentDB as the source.

Configure HAQM DocumentDB for a change stream

The connector reads changes from the source collection through a change stream cursor. The change streams feature in HAQM DocumentDB provides a time-ordered sequence of change events that occur within your collections.

For this post, we use the collection sourcecollection in the sourcedatabase database in our HAQM DocumentDB cluster.

Connect to the HAQM DocumentDB cluster and enable the change stream for collection sourcecollection:

use sourcedatabase
db.createCollection("sourcecollection")
db.adminCommand({modifyChangeStreams: 1,database: "sourcedatabase",collection: "sourcecollection", enable:true});

Configure the connector as an HAQM DocumentDB source connector

Now we configure the source connector to read the changes in the HAQM DocumentDB collection and store those changes in the MSK topic. The connector reads these changes from the HAQM DocumentDB change stream that we configured.

The steps for creating the HAQM DocumentDB source connector are the same as for the sink connector except for the connector configuration.

For the source connector, perform similar steps from Step 1 to Step 8 of the sink connector configuration, but use the following connector configurations:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
errors.tolerance=all
connection.uri= mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&replicaSet=rs0&retryWrites=false
database= sourcedatabase
collection=sourcecollection
connection.ssl.truststore=${s3import:<regionname>:<HAQM S3 Bucket>/rds-truststore.jks}
connection.ssl.truststorePassword=<truststore_password>
config.providers=s3import,ssm,sm
config.providers.s3import.class=com.amazonaws.kafka.config.providers.S3ImportConfigProvider
config.providers.s3import.param.region=<regionname>
config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
config.providers.ssm.param.region=<regionname>
config.providers.sm.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
config.providers.sm.param.region=<regionname>

The configuration contains the connector type and its properties:

  • connector.class – The Java class for the connector. It’s the class responsible for moving data from the HAQM DocumentDB collection to the MSK topic.
  • tasks.max – The maximum number of tasks that should be created for this connector.
  • connection-uri – The HAQM DocumentDB endpoint to connect to the HAQM DocumentDB cluster. We use an endpoint with the SSL option.
  • database – The source database. In this case, the database name is sourcedatabase.
  • collection – The collection in the database to watch the changes. The collection name is sourcecollection.
  • connection.ssl.truststore – Defines the HAQM DocumentDB trust store file location. It’s defined as S3 URI format with a bucket and file name.
  • connection.ssl.truststorePassword – Add the trust store password here in plain text. You can also store the password in AWS Systems Manager Parameter Store and define config providers.

To integrate your Kafka connector with other systems like HAQM S3, you need to define the config providers.

Note that connection.uri is different than the previous sink use case. We don’t include the read preference setting as secondary in connection.uri because HAQM DocumentDB only supports a change stream on the primary instance.

Wait for the HAQM DocumentDB source connector status to change to Running.

Test the connector with HAQM DocumentDB as the source

To test the connector, we insert data in the HAQM DocumentDB collection. The Kafka connector reads the inserted data using the collection change stream and writes that to the Kafka topic.

  1. Open a new terminal on the EC2 instance and import the following environment variables:
export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin
  1. Create the Kafka topic sourcedatabase.sourcecollection:
cd ~/local_kafka/kafka/config
kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --partitions 1 --topic sourcedatabase.sourcecollection --command-config client-config.properties
  1. Run the Kafka console consumer to read the details from the sourcedatabase.sourcecollection Kafka topic. If you run it on a new terminal, make sure to create the BOOTSTRAP_SERVERS environment variable.
cd ~/local_kafka/kafka/config
kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config client-config.properties --topic sourcedatabase.sourcecollection --from-beginning
  1. In a second terminal, add a record in sourcedatabase.sourceCollection of your HAQM DocumentDB cluster:
use sourcedatabase
db.sourcecollection.insert({"name":"HAQM DocumentDB"})
  1. Return to the first terminal, where the console consumer is reading from the MSK topic:
{"_id": {"_data": "0164263f9e0000000701000000070000426b"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1680228254, "i": 7}}, "ns": {"db": "sourcedatabase", "coll": "sourcecollection"}, "documentKey": {"_id": {"$oid": "64263f9ea715466fe5ff0c9d"}}, "fullDocument": {"_id": {"$oid": "64263f9ea715466fe5ff0c9d"}, "name": "HAQM DocumentDB"}}

We can observe that the insert operation made on the HAQM DocumentDB collection is available on the console consumer.

We’re now able to capture changes in HAQM DocumentDB as the source database using the MongoDB Kafka connector by running on MSK Connect.

Cleanup

To clean up the resources you used in your account, delete them in the following order:

  • HAQM EC2 instance
  • IAM role and customer managed policy
  • Gateway endpoints for HAQM S3
  • HAQM MSK connect connectors
  • HAQM MSK custom plugin
  • HAQM MSK Kafka cluster
  • HAQM DocumentDB cluster

Conclusion

In this post, we discussed how to run and configure the MongoDB Kafka connector to move data between HAQM DocumentDB and HAQM MSK for different sink and source use cases. You can use this solution for a variety of use cases, such as creating pipelines for large video streaming or flash sale events, streaming telemetry data from IoT devices, collecting website hit data, replicating collections from HAQM DocumentDB to other data stores, and moving data for advanced analytics and ML.

We first showed you how to use the connector to stream data from HAQM MSK to HAQM DocumentDB, where HAQM DocumentDB acts as a sink. We also showed how to configure a connector on MSK Connect. In the second half of this post, we showed you how to stream data from HAQM DocumentDB to HAQM MSK where HAQM DocumentDB acts as the source. We also discussed various configurations available with both use cases that you can adjust for your specific use case or workload requirement.

Leave a comment. We’d love to hear your thoughts and suggestions.


About the authors

Anshu Vajpayee is a Senior DocumentDB Specialist Solutions Architect at HAQM Web Services (AWS). He has been helping customers to adopt NoSQL databases and modernize applications leveraging HAQM DocumentDB. Before joining AWS, he worked extensively with relational and NoSQL databases.

Ed Berezitsky is a Principal Streaming Architect at HAQM Web Services. Ed helps customers design and implement solutions using streaming technologies, and specializes on HAQM MSK and Apache Kafka.