AWS Compute Blog
Using HAQM MSK as an event source for AWS Lambda
HAQM Managed Streaming for Apache Kafka (HAQM MSK) is a fully managed, highly available service that uses Apache Kafka to process real-time streaming data. Many producers can send messages to Kafka, which can then be routed to and processed by multiple consumers. Lambda now supports HAQM MSK as an event source, so it can consume messages and integrate with downstream serverless workflows.
Apache Kafka is a distributed streaming platform that it is similar to HAQM Kinesis. HAQM MSK simplifies the setup, scaling, and management of clusters running Kafka. It makes it easier to configure the application for multiple Availability Zones and securing with IAM. It’s fully compatible with Kafka and supports familiar community-build tools such as MirrorMaker, Apache Flink, and Prometheus.
In this blog post, I explain how to set up a test HAQM MSK cluster and configure key elements in the networking configuration. I also show how to create a Lambda function that is invoked by messages in HAQM MSK topics.
Overview
Using HAQM MSK as an event source operates in a similar way to using HAQM SQS or HAQM Kinesis. In all cases, the Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.
Lambda is a consumer application for your Kafka topic. It processes records from one or more partitions and sends the payload to the target function. Lambda continues to process batches until there are no more messages in the topic.
The Lambda function’s event payload contains an array of records. Each array item contains details of the topic and Kafka partition identifier, together with a timestamp and base64 encoded message:
Network configuration overview
HAQM MSK is a highly available service, so it must be configured to run in a minimum of two Availability Zones in your preferred Region. To comply with security best practice, the brokers are usually configured in private subnets in each Region.
For HAQM MSK to invoke Lambda, you must ensure that there is a NAT Gateway running in the public subnet of each Region. It’s possible to route the traffic to a single NAT Gateway in one AZ for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT Gateway available in each Availability Zone.
The Lambda function target in the event source mapping does not need to be running in a VPC to receive messages from HAQM MSK. To learn more about configuring the private subnet table to use a NAT Gateway, see this Premium Support response. For an example of how to configure the infrastructure with AWS CloudFormation, see this template.
Required Lambda function permissions
The Lambda function must have permission to describe VPCs and security groups, and manage elastic network interfaces. These execution roles permissions are:
- ec2:CreateNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:DescribeVpcs
- ec2:DeleteNetworkInterface
- ec2:DescribeSubnets
- ec2:DescribeSecurityGroups
To access the HAQM MSK data stream, the Lambda function also needs two Kafka permissions: kafka:DescribeCluster and kafka:GetBootstrapBrokers. The policy template AWSLambdaMSKExecutionRole includes these permissions.
In AWS SAM templates, you can configure a Lambda function with an HAQM MSK event source mapping and the necessary permissions. For example:
Resources:
ProcessMSKfunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: code/
Timeout: 3
Handler: app.handler
Runtime: nodejs12.x
Policies:
- AWSLambdaMSKExecutionRole
Events:
MSKEvent:
Type: MSK
Properties:
StartingPosition: LATEST
Stream: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/12abcd12-1234-1234-1234-1234abcd1234-1
Topics:
- MyTopic
Setting up an HAQM MSK cluster
Before starting, configure a new VPC with public and private subnets in two Availability Zones using this AWS CloudFormation template. In this configuration, the private subnets are set up to use a NAT Gateway.
To create the HAQM MSK cluster:
- From the HAQM MSK console, choose Create Cluster.
- Select Create cluster with custom settings, apply the name “my-MSK-cluster”, and keep the recommended Apache Kafka version.
- In the Configuration panel, keep the “Use the MSK default configuration” option selected.
- In Networking, select the new VPC and choose “2” for Number of Availability Zones. From the dropdowns, select the two Availability Zones in the VPC, and choose the private subnets for each.
- In the Brokers panel, choose kafka.t3.small for Broker instance type, and enter “1” for Number of brokers per Availability Zone.
- For Storage, enter “1” for EBS storage volume per broker.
- Keep the existing defaults and choose Create cluster. It takes up to 15 minutes to create the cluster and the status is displayed in the Cluster Summary panel.
Configuring the Lambda event source mapping
You can create the Lambda event source mapping using the AWS CLI or AWS SDK, which provide the CreateEventSourceMapping API. In this walkthrough, you use the AWS Management Console to create the event source mapping.
First, you must create an HAQM MSK topic:
- Launch an EC2 instance, selecting t2.micro as the instance size. Connect to the instance once it is available.
- Follow these instructions in the HAQM MSK documentation to create a topic. Use the name “mytopic” and a
replication-factor
of 2, since there are only two Availability Zones in this test.
Now create a Lambda function that uses the HAQM MSK cluster and topic as an event source:
- From the Lambda console, select Create function.
- Enter a function name, and select Node.js 12.x as the runtime.
- Select the Permissions tab, and select the role name in the Execution role panel to open the IAM console.
- Choose Attach policies.
- In the filter box, enter “MSK”, then check the AWSLambdaMSKExecutionRole policy name.
- Choose Attach policy.
- Back in the Lambda function, select the Configuration tab. In the Designer panel, choose Add trigger.
- In the dropdown, select MSK. In the MSK cluster box, select the cluster you configured earlier. Enter “mytopic” for Topic name and choose “Latest” for Starting Position. Choose Add.
Note: it takes several minutes for the trigger status to update from Creating to Enabled. - In the Function code panel, replace the contents of index.js with:
exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
- Choose Save.
Testing the event source mapping
At this point, you have created a VPC with two private and public subnets and a NAT Gateway. You have placed a new HAQM MSK cluster in the two private subnets. You set up a target Lambda function in the same VPC and private subnets, with the necessary IAM permissions. Next, you publish messages to the HAQM MSK topic and see the resulting invocation in the Lambda function’s logs.
- From the EC2 instance you created earlier, follow these instructions to produce messages. In the Kafka console producer script running in the terminal, enter “Message #1”:
- Back in the Lambda function, select the Monitoring tab and choose View logs in CloudWatch. Select the first log stream.
- In the Log events panel, expand the entries to see the message sent from the HAQM MSK topic. Note that the value attribute containing the message is base64 encoded.
Conclusion
HAQM MSK provide a fully managed, highly available service that uses Kafka to process real-time streaming data. Now Lambda supports HAQM MSK as an event source, you can invoke Lambda functions from messages in Kafka topics to integrate into your downstream serverless workflows.
In this post, I give an overview of how the integration compared with other event source mappings. I show how to create a test HAQM MSK cluster, configure the networking, and create the event source mapping with Lambda. I also show how to set up the Lambda function in the AWS Management Console, and refer to the equivalent AWS SAM syntax to simplify deployment.
To learn more about how to use this feature, read the documentation.