Deliver data at scale to HAQM Managed Streaming for Apache Kafka (HAQM MSK)
with AWS IoT Core
In this tutorial you learn how to:
- Set up Private Certificate Authority (CA) with AWS Certificate Manager
- Set up an Apache Kafka cluster with HAQM MSK
- Configure Kafka authentication and test the stream using AWS Cloud9
- Prepare Java KeyStore and configure an AWS Secrets Manager secret
- Configure an AWS IoT Core rule to deliver messages to the Kafka cluster
- Set up error logging for AWS IoT Core rules and service
- Clean up resources
About this Tutorial | |
---|---|
Time | 1 hour |
Cost | $7 |
Use Case | Internet of Things, Analytics |
Products | HAQM MSK, AWS IoT Core |
Audience | Developers |
Level | Intermediate |
Last Updated | April 21, 2021 |
Step 1: Set up Private CA with HAQM Certificate Manager
In this step, you set up a Private Certificate Authority (Private CA) with AWS Certificate Manager (ACM). This Private CA enables you to issue TLS certificates to authenticate clients to communicate with the Apache Kafka cluster.
1.1 — Open the AWS Certificate Manager console and sign in with your AWS account credentials. If you do not have an AWS account, create a new AWS account to get started.
Already have an account? Log in to your account
Step 2: Set up Apache Kafka cluster with HAQM MSK
AWS IoT rules for Apache Kafka can be configured to deliver messages to HAQM MSK. In this step you will set up a new Kafka cluster with IoT rule supported client authentication settings.
2.1 — Open the HAQM MSK console and sign in with your AWS account credentials. Verify you are in the same Region as your Private CA. Then, choose Create cluster.
Step 3: Prepare an AWS Cloud9 instance to configure Kafka authentication and test the stream
AWS Cloud9 provides pre-installed AWS CLI and Java, which helps make the development experience much simpler. In this step you configure Kafka authentication, install Kafka tools, and then test the consumer and producer.
3.1 — Open the AWS Cloud9 console, and choose Create environment. Verify that you launched the AWS Cloud9 environment in the same VPC where the Kafka cluster is deployed.
3.5 — Open the HAQM MSK console, and in the General section, verify if your created cluster is Active, and copy the Cluster ARN.
Choose the View client information tab, and copy the Bootstrap servers, and ZooKeeper connection to use in the following configuration.
3.6 — Open the Cloud9 IDE bash window, and run the following commands in the terminal window.
a. Download and extract Kafka cli tools
wget http://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzf kafka_2.12-2.2.1.tgz
b. Install jq to parse JSON CLI response
sudo yum install jq –y
c. Set up variables for CLUSTER_ARN, PRIVATE_CA_ARN, REGION_NAME, and TOPIC as per your environment.
CLUSTER_ARN="[YOUR CLUSTER ARN]"
PRIVATE_CA_ARN="[YOUR PRIVATE CA ARN]"
REGION_NAME="[YOUR REGION]"
TOPIC="AWSKafkaTutorialTopic"
d. The following commands refer to the variables configured in the previous step, and use AWS CLI and cluster ARN to fetch the ZooKeeper string and save it as a variable to ZOOKEEPER_STRING.
ZOOKEEPER_STRING=$(aws kafka describe-cluster --region $REGION_NAME --cluster-arn $CLUSTER_ARN | jq -r ".ClusterInfo.ZookeeperConnectString")
echo $ZOOKEEPER_STRING
3.7 —Configure Security Group Inbound rules for your Kafka cluster.
a. Open the AWS Cloud9 console, and select the View details tab for your created environment.
c. Open the HAQM MSK console, and choose your Cluster name.
3.8 —Create a new topic AWSKafkaTutorialTopic in the Kafka cluster. This topic will be used by producer and consumer to publish and subscribe messages. In the following commands (if applicable) change the name of the topic that was set as variable in section 3.6.c.
cd ~/environment/kafka_2.12-2.2.1
bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER_STRING --replication-factor 2 --partitions 1 --topic $TOPIC
This will create a new topic AWSKafkaTutorialTopic. Verify that the commands returns the following message: Created topic AWSKafkaTutorialTopic.
Note: If you receive a Client Session timed out error appears, check that your cluster Security Group Inbound Rules were configured correctly.
Step 4: Prepare Java KeyStore and configure AWS Secrets Manager
Java KeyStore is a used to store private keys, public keys, certificates, and secret keys. In this step you will create a Java KeyStore, create a certificate using Keytool, sign the certificate with your Private CA, and save the KeyStore to Secrets Manager.
Note: The name of Kafka directory will change with each downloaded Kafka version. Use the name of your version.
Note: The path name for java-11-amazon-corretto.x86_64 will change with your java version. Change this path to your version of installed Java. Look for the file named cacerts in jvm sub-directories and copy it to the client directory created below.
4.1 — In your AWS Cloud9 terminal, run the following commands to create a client folder to store certificate files, and copy Java KeyStore to this folder.
cd ~/environment/kafka_2.12-2.2.1/
mkdir client && cd client
cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts kafka.client.truststore.jks
4.2 — Run the following commands to configure the Java KeyStore with ALIAS and PASSWORD.
Note: If you change ALIAS and PASSWORD variables, you must use the same in next steps.
ALIAS="keyAlias"
PASSWORD="myPassword"
4.3 — Run the following command to use keytool to generate a key with a Common Name (CN), ALIAS, and PASSWORD. (Optional) Enter a "Distinguished-Name" to help you identify your key, you can leave the variable "Distinguished-Name".
keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass $PASSWORD -keypass $PASSWORD -dname "CN=Distinguished-Name" -alias $ALIAS -storetype pkcs12
4.4 — Run the following command to create a Certificate Signing Request for the key generated in the previous step.
keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias $ALIAS -storepass $PASSWORD -keypass $PASSWORD
4.5 — Run the following command to edit the certificate created to the correct format.
sed -i 's/NEW //' client-cert-sign-request
4.6 — Run the following command to issue a Certificate Signed by your Private CA and save the Certificate ARN to a variable.
In the following command, jq (previously installed) is used to parse the JSON output from AWS CLI to a string and save in variable CERTIFICATE_ARN.
CERTIFICATE_ARN=$(aws acm-pca issue-certificate --certificate-authority-arn $PRIVATE_CA_ARN --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS" --region $REGION_NAME | jq -r ".CertificateArn")
4.7 — Run the following command to use AWS CLI to fetch the certificate using the received Certificate ARN in the previous step. The received response is in JSON format, which is parsed by jq, and saved into a file signed-certificate-from-acm.
aws acm-pca get-certificate --certificate-authority-arn $PRIVATE_CA_ARN --certificate-arn $CERTIFICATE_ARN --region $REGION_NAME | jq -r '"\(.CertificateChain)\n\(.Certificate)"' > signed-certificate-from-acm
4.8 — Run the following command to import the signed certificate to KeyStore.
keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias $ALIAS -storepass $PASSWORD -keypass $PASSWORD
a. Enter Yes when prompted in the terminal.
b. The client file should have 4 files.
- client-cert-sign-request
- kafka.client.keystore.jks
- kafka.client.truststore.jks
- signed-certificate-from-acm
4.9 —The MSK cluster that was previously configured requires all client communication to be TLS secured. The file kafka.client.keystore.jks has to be uploaded to Secrets Manager as a SecretBinary to authenticate IoT core rule.
Run the following command to create a Secret named Kafka_Keystore in HAQM Secrets Manager.
aws secretsmanager create-secret --name Kafka_Keystore --secret-binary fileb://kafka.client.keystore.jks --region $REGION_NAME
You can verify the newly created Secret in the Secrets Manager console.
4.10 —Set up a Kafka consumer in a new Terminal while keeping the producer session active.
Note: A new Terminal can be opened by choosing the + symbol or alt+T in AWS Cloud9 console.
In the new Terminal session, run the following commands to set the initial variables, and start a console producer.
REGION_NAME="[YOUR REGION]"
TOPIC="[NAME OF THE TOPIC CREATED]"
CLUSTER_ARN="[YOUR CLUSTER ARN]"
4.11 —Run the following commands to set up Producer and Consumer in Kafka, and create a file named client.properties with the following contents. If applicable, adjust the truststore and keystore locations to the paths where you saved kafka.client.truststore.jks.
Note: If all your paths are the default values suggested in the previous steps, you don't need to modify the following commands.
cd ~/environment/kafka_2.12-2.2.1/client
sudo nano client.properties
a. Paste the following contents into the pop up window, then save and exit.
security.protocol=SSL
ssl.truststore.location=client/kafka.client.truststore.jks
ssl.keystore.location=client/kafka.client.keystore.jks
ssl.keystore.password=myPassword
ssl.key.password=myPassword
b. Run the following command to verify the alignment of the previous added text is correctly aligned.
cat client.properties
c. Run the following command to start a console producer. This tells AWS CLI to fetch the bootstrap server list for Cluster ARN and saves it as a variable. This list is used to connect to the Kafka cluster.
cd ~/environment/kafka_2.12-2.2.1
BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --region $REGION_NAME --cluster-arn $CLUSTER_ARN | jq -r ".BootstrapBrokerStringTls")
bin/kafka-console-producer.sh --broker-list $BOOTSTRAP_SERVER --topic $TOPIC --producer.config client/client.properties
4.12 —Run the following commands to start a Kafka consumer session.
cd ~/environment/kafka_2.12-2.2.1
BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --region $REGION_NAME --cluster-arn $CLUSTER_ARN | jq -r ".BootstrapBrokerStringTls")
bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC --consumer.config client/client.properties --from-beginning
You should now have a Kafka producer session and consumer session initiated.
4.13 —Type any string and press enter into the producer terminal to publish to the Kafka stream.
Step 5: Configure AWS IoT Core rule to deliver data to Kafka stream
5.1 — Create an AWS IAM role with AWS Secrets Manager permissions to allow an AWS IoT Core rule to access the Kafka KeyStore stored in Secrets Manager.
a. Open the IAM console, choose Roles from the left navigation pane, and choose Create role.
b. For Select type of trusted entity, choose AWS service. For Choose a use case, choose EC2, then choose Next: Permissions.
5.5 —Once the Role is created, choose the newly created kafkaRole. On the Summary page for the role, choose the Trust relationships tab, choose Edit trust relationship, and copy and paste the following details.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
5.6 —Then, choose Update Trust Policy.
5.7 —Open the AWS IoT Core console, and in the left navigation pane, choose ACT. Then, choose Rules. Choose Create a rule.
5.11 — On the Create a VPC destination page, choose the same VPC ID where you created your Kafka cluster.
5.12 — Choose the Subnet IDs, and choose the Security group.
Note: Make sure you select a Security Group with access to Kafka cluster Security Group.
Select the security group settings of the EC2 instance you created previously and/ or the security group of Kafka cluster itself.
5.13 — Choose Create Role, on the Create a new role window, name the role AWSKafkaTutorialTopic. Then, choose Create role.
5.14 — Choose Create Destination.
Note: It takes 5-10 mins for the Destination to be Enabled. Once the status is Enabled, continue back with the Rule creation page.
5.15 — In your web browser, navigate to the previously opened AWS IoT - Rules- Configuration action page. Then, choose the newly created VPC Destination from the drop down, and enter AWSKafkaTutorialTopic for the Kafka topic. Leave Key and Partition empty.
5.16 —On the Configure action page, in the Client properties section, enter the following details.
- Bootstrap.server = The TLS bootstrap string for Kafka cluster
- security.protocol = SSL
- ssl.truststore = Leave EMPTY
- ss.truststore.password = Leave EMPTY
- ssl.keystore = ${get_secret('Kafka_Keystore','SecretBinary','arn:aws:iam::[YOUR ROLE ARN FOR IAM]:role/kafkaRole')}
- ssl.keystore.password = myPassword
Note: The password is the variable you entered for myPassword, which was configured in the previous steps.
5.18 — On the Create a rule page, choose Create rule.
5.19 — Use the following procedure to test the MQTT Stream.
a. In the left navigation pane of AWS IoT Core, choose Test, and choose MQTT test client.
b. On the MQTT test client page, choose the publish to a topic tab.
c. Under Topic name, enter a name. Under Message payload, enter a message.
d. In the active AWS Cloud9 consumer session (created/opened in the previous step), you should see data published from the MQTT topic and streamed to Kafka consumer.
Step 6: Error logging
To help with troubleshooting errors, you can set up error logging for AWS IoT Core rules and service.
6.1 — Open the AWS IoT console, in the left navigation pane, choose Settings. Then, in the Logs section, choose Manage Logs.
a. In the Log level section, choose Debug (most verbosity) from the drop down, and choose Update.
6.2 —To create an IoT log level, open the IoT console, and choose Rules. Then, choose the kafkaRole.
6.3 — In the Error action section, choose Add action. Then, choose Republish message to an AWS IoT topic, and choose Configure action.
You can now view your IoT Logs in HAQM Cloudwatch, Log groups, and AWS IoT Logs.
Step 7: Clean up
In the following steps, you clean up the resources you created in this tutorial.
It is a best practice to delete resources that you are no longer using so that you are not continually charged for them.
Delete HAQM MSK cluster
7.1 — Open the HAQM MSK console.
7.2 — Choose Clusters and choose the MSK cluster that you created for this tutorial.
7.3 — Choose Delete.
7.4 — Type delete to confirm and choose Delete.
Delete IoT Core rule
7.5 — Sign in to the AWS Cloud9 environment, choose Open IDE, and run the following command.
aws iot delete-topic-rule --rule-name myrule
Delete IAM role
7.6 — Open the IAM console.
7.7 — In the navigation pane, choose Roles, and then select the check box next to the role that was created for this tutorial (kafkaRole).
7.8 — Choose Delete role.
7.9 — Choose Yes, Delete to delete the IAM role.
Delete AWS Secrets Manager secret
7.10 — Open the Secrets Manager console and in the left navigation pane, choose Secrets.
7.11 — Choose the kafka_keystore secret.
7.12 — In the Secret details section, choose Delete secret.
Delete AWS Secrets Manager secret
7.13 — Open the ACM Private CA console.
7.14 — Choose Private CAs.
7.15 — Choose your private CA from the list.
7.16 — If your CA is in the ACTIVE state, you must disable it. On the Actions menu, choose Disable.
7.17 — On the Actions menu, choose Delete.
7.18 — If your CA is in the PENDING_CERTIFICATE, EXPIRED, or DISABLED state, specify a restoration period of seven to 30 days. Then choose Delete.
Note: If your private CA is not in one of these states, it can not be restored later.
7.19 — If you are certain that you want to delete the private CA, choose Permanently delete when prompted. The status of the private CA changes to DELETED.
Delete AWS Cloud9 environment
7.20 — Open the AWS Cloud9 console.
7.21 — In the top navigation bar, choose the AWS Region where the environment is located.
7.22 — In the list of environments, for the environment you want to delete, choose the title of the card for the environment.
7.23 —Choose Delete.
7.24 — In the Delete dialog box, type Delete, and then choose Delete.
Congratulations
You have configured IoT Rules to deliver messages to an Apache Kafka cluster using AWS IoT Core and HAQM MSK. You can now securely deliver MQTT messages to a highly scalable, durable, and reliable system using Apache Kafka.
Recommended next steps
Getting started with HAQM MSK
Learn how to migrate a self-managed Apache Kafka cluster
If you want to learn different ways of migrating a self-managed Apache Kafka cluster, whether on HAQM EC2 or on premises, to HAQM MSK, complete the Migration Lab.