AWS Big Data Blog
Break data silos and stream your CDC data with HAQM Redshift streaming and HAQM MSK
Data loses value over time. We hear from our customers that they’d like to analyze the business transactions in real time. Traditionally, customers used batch-based approaches for data movement from operational systems to analytical systems. Batch load can run once or several times a day. A batch-based approach can introduce latency in data movement and reduce the value of data for analytics. Change Data Capture (CDC)-based approach has emerged as alternative to batch-based approaches. A CDC-based approach captures the data changes and makes them available in data warehouses for further analytics in real-time.
CDC tracks changes made in source database, such as inserts, updates, and deletes, and continually updates those changes to target database. When the CDC is high-frequency, the source database is changing rapidly, and the target database (i.e., usually a data warehouse) needs to reflect those changes in near real-time.
With the explosion of data, the number of data systems in organizations has grown. Data silos causes data to live in different sources, which makes it difficult to perform analytics.
To gain deeper and richer insights, you can bring all the changes from different data silos into one place, like data warehouse. This post showcases how to use streaming ingestion to bring data to HAQM Redshift.
Redshift streaming ingestion provides low latency, high-throughput data ingestion, which enables customers to derive insights in seconds instead of minutes. It’s simple to set up, and directly ingests streaming data into your data warehouse from HAQM Kinesis Data Streams and HAQM Managed Streaming for Kafka (HAQM MSK) without the need to stage in HAQM Simple Storage Service (HAQM S3). You can create materialized views using SQL statements. After that, using materialized-view refresh, you can ingest hundreds of megabytes of data per second.
Solution overview
In this post, we create a low-latency data replication between HAQM Aurora MySQL to HAQM Redshift Data Warehouse, using Redshift streaming ingestion from HAQM MSK. Using HAQM MSK, we securely stream data with a fully managed, highly available Apache Kafka service. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. We store CDC events in HAQM MSK, for a set duration of time, which makes it possible to deliver CDC events to additional destinations such as HAQM S3 data lake.
We deploy Debezium MySQL source Kafka connector on HAQM MSK Connect. HAQM MSK Connect makes it easy to deploy, monitor, and automatically scale connectors that move data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. HAQM MSK Connect is a fully compatible with Apache Kafka Connect, which enables you to lift and shift your Apache Kafka Connect applications with zero code changes.
This solution uses HAQM Aurora MySQL hosting the example database salesdb
. Users of the database can perform the row-level INSERT, UPDATE, and DELETE operations to produce the change events in the example salesdb
database. Debezium MySQL source Kafka Connector reads these change events and emits them to the Kafka topics in HAQM MSK. HAQM Redshift then read the messages from the Kafka topics from HAQM MSK using HAQM Redshift Streaming feature. HAQM Redshift stores these messages using materialized views and process them as they arrive.
You can see how CDC performs create event by looking at this example here. We are going to use OP field – its mandatory string describes the type of operation that caused the connector to generate the event, in our solution for processing. In this example, c indicates that the operation created a row. Valid values for OP field are:
- c = create
- u = update
- d = delete
- r = read (applies to only snapshots)
The following diagram illustrates the solution architecture:
The solution workflow consists of the following steps:
- HAQM Aurora MySQL has a binary log (i.e., binlog) that records all operations(INSERT, UPDATE, DELETE) in the order in which they are committed to the database.
- HAQM MSK Connect runs the source Kafka Connector called Debezium connector for MySQL, reads the binlog, produces change events for row-level INSERT, UPDATE, and DELETE operations, and emits the change events to Kafka topics in amazon MSK.
- An HAQM Redshift-provisioned cluster is the stream consumer and can read messages from Kafka topics from HAQM MSK.
- A materialized view in HAQM Redshift is the landing area for data read from the stream, which is processed as it arrives.
- When the materialized view is refreshed, HAQM Redshift compute nodes allocate a group of Kafka partition to a compute slice.
- Each slice consumes data from the allocated partitions until the view reaches parity with last Offset for the Kafka topic.
- Subsequent materialized view refreshes read data from the last offset of the previous refresh until it reaches parity with the topic data.
- Inside the HAQM Redshift, we created stored procedure to process CDC records and update target table.
Prerequisites
This post assumes you have a running HAQM MSK Connect stack in your environment with the following components:
- Aurora MySQL hosting a database. In this post, you use the example database
salesdb
. - The Debezium MySQL connector running on HAQM MSK Connect, which connects HAQM MSK in your HAQM Virtual Private Cloud (HAQM VPC).
- HAQM MSK cluster
If you don’t have an HAQM MSK Connect stack, then follow the instructions in the MSK Connect lab setup and verify that your source connector replicates data changes to the HAQM MSK topics.
You should provision the HAQM Redshift cluster in same VPC of HAQM MSK cluster. If you haven’t deployed one, then follow the steps here in the AWS Documentation.
We use AWS Identity and Access Management (AWS IAM) authentication for communication between HAQM MSK and HAQM Redshift cluster. Please make sure you have created an AWS IAM role with a trust policy that allows your HAQM Redshift cluster to assume the role. For information about how to configure the trust policy for the AWS IAM role, see Authorizing HAQM Redshift to access other AWS services on your behalf. After it’s created, the role should have the following AWS IAM policy, which provides permission for communication with the HAQM MSK cluster.
Please replace the ARN containing xxx from above example policy with your HAQM MSK cluster’s ARN.
- Also, verify that HAQM Redshift cluster has access to HAQM MSK cluster. In HAQM Redshift Cluster’s security group, add the inbound rule for MSK security group allowing port 9098. To see how to manage redshift cluster security group, refer Managing VPC security groups for a cluster.
- And, in the HAQM MSK cluster’s security group add the inbound rule allowing port 9098 for leader IP address of your HAQM Redshift Cluster, as shown in the following diagram. You can find the IP address for your HAQM Redshift Cluster’s leader node on properties tab of HAQM Redshift cluster from AWS Management Console.
Walkthrough
Navigate to the HAQM Redshift service from AWS Management Console, then set up HAQM Redshift streaming ingestion for HAQM MSK by performing the following steps:
- Enable_case_sensitive_identifier to true – In case you are using default parameter group for HAQM Redshift Cluster, you won’t be able to set
enable_case_sensitive_identifier
to true. You can create new parameter group withenable_case_sensitive_identifier
to true and attach it to HAQM Redshift cluster. After you modify parameter values, you must reboot any clusters that are associated with the modified parameter group. It may take few minutes for HAQM Redshift cluster to reboot.
This configuration value that determines whether name identifiers of databases, tables, and columns are case sensitive. Once done, please open a new HAQM Redshift Query Editor V2, so that config changes we made are reflected, then follow next steps.
- Create an external schema that maps to the streaming data source.
Once done, verify if you are seeing below tables created from MSK Topics:
- Create a materialized view that references the external schema.
Now, you can query newly created materialized view customer_debezium using below command.
Check the materialized view is populated with the CDC records
- REFRESH MATERIALIZED VIEW (optional). This step is optional as we have already specified
AUTO REFRESH AS YES
while creating MV (materialized view).
NOTE: Above the materialized view is auto-refreshed, which means if you don’t see the records immediately, then you have wait for few seconds and rerun the select statement. HAQM Redshift streaming ingestion view also comes with the option of a manual refresh, which allow you to manually refresh the object. You can use the following query that pulls streaming data to Redshift object immediately.
Process CDC records in HAQM Redshift
In following steps, we create the staging table to hold the CDC data, which is target table that holds the latest snapshot and stored procedure to process CDC records and update in target table.
- Create staging table: The staging table is a temporary table that holds all of the data that will be used to make changes to the target table, including both updates and inserts.
- Create target table
We use customer_target
table to load the processed CDC events.
- Create
Last_extract_time
debezium table and Inserting Dummy value.
We need to store the timestamp of last extracted CDC events. We use of debezium_last_extract
table for this purpose. For initial record we insert a dummy value, which enables us to perform a comparison between current and next CDC processing timestamp.
- Create stored procedure
This stored procedure processes the CDC records and updates the target table with the latest changes.
Test the solution
Update example salesdb
hosted on HAQM Aurora
- This will be your HAQM Aurora database and we access it from HAQM Elastic Compute Cloud (HAQM EC2) instance with
Name= KafkaClientInstance
. - Please replace the HAQM Aurora endpoint with value of your HAQM Aurora endpoint and execute following command and the
use salesdb
.
- Do an update, insert , and delete in any of the tables created. You can also do update more than once to check the last updated record later in HAQM Redshift.
- Invoke the stored procedure incremental_sync_customer created in the above steps from HAQM Redshift Query Editor v2. You can manually run proc using following command or schedule it.
call incremental_sync_customer();
- Check the target table for latest changes. This step is to check latest values in target table. You’ll see that all the updates and deletes that you did in source table are shown at top as a result order by
refresh_time
.
Extending the solution
In this solution, we showed CDC processing for the customer table, and you can use the same approach to extend it to other tables in the example salesdb
database or add more databases to MSK Connect configuration property database.include.list
.
Our proposed approach can work with any MySQL source supported by Debezium MySQL source Kafka Connector. Similarly, to extend this example to your workloads and use-cases, you need to create the staging and target tables according to the schema of the source table. Then you need to update the coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar as customer_id
statements with the column names and types in your source and target tables. Like in example stated in this post, we used LZO encoding as LZO encoding, which works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. Another consideration to keep in mind while creating target and staging tables is choosing a distribution style and key based on data in source database. Here we have chosen distribution style as key with Customer_id, which are based on source data and schema update by following the best practices mentioned here.
Cleaning up
- Delete all the HAQM Redshift clusters
- Delete HAQM MSK Cluster and MSK Connect Cluster
- In case you don’t want to delete HAQM Redshift clusters, you can manually drop MV and tables created during this post using below commands:
Also, please remove inbound security rules added to your HAQM Redshift and HAQM MSK Clusters, along with AWS IAM roles created in the Prerequisites section.
Conclusion
In this post, we showed you how HAQM Redshift streaming ingestion provided high-throughput, low-latency ingestion of streaming data from HAQM Kinesis Data Streams and HAQM MSK into an HAQM Redshift materialized view. We increased speed and reduced cost of streaming data into HAQM Redshift by eliminating the need to use any intermediary services.
Furthermore, we also showed how CDC data can be processed rapidly after generation, using a simple SQL interface that enables customers to perform near real-time analytics on variety of data sources (e.g., Internet-of-Things [ IoT] devices, system telemetry data, or clickstream data) from a busy website or application.
As you explore the options to simplify and enable near real-time analytics for your CDC data,
We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.
About the Authors
Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.
Vishal Khatri is a Sr. Technical Account Manager and Analytics specialist at AWS. Vishal works with State and Local Government helping educate and share best practices with customers by leading and owning the development and delivery of technical content while designing end-to-end customer solutions.