AWS Big Data Blog
Optimize downstream data processing with HAQM Data Firehose and HAQM EMR running Apache Spark
February 9, 2024: HAQM Kinesis Data Firehose has been renamed to HAQM Data Firehose. Read the AWS What’s New post to learn more.
For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge. Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis. HAQM S3 is a natural landing spot for data of all types. However, the way data is stored in HAQM S3 can make a significant difference in the efficiency and cost of downstream data processing. Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files. Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use HAQM Data Firehose to merge many small messages into larger messages for delivery to HAQM S3. This results in faster processing with HAQM EMR running Spark.
Like HAQM Kinesis Data Streams, HAQM Data Firehose accepts a maximum incoming message size of 1 MB. If a single message is greater than 1 MB, it can be compressed before placing it on the stream. However, at large volumes, a message or file size of 1 MB or less is usually too small. Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.
This post also shows how to read the compressed files using Apache Spark that are in HAQM S3, which does not have a proper file name extension and store back in HAQM S3 in parquet format.
Solution overview
The steps we follow in this blog post are:
- Create a virtual private cloud (VPC) and an HAQM S3 bucket.
- Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
- Provision HAQM Data Firehose to deliver messages to HAQM S3 sent from the Lambda function in step 2. This step also provisions an HAQM EMR cluster to process the data in HAQM S3.
- Generate test data with custom code running on an HAQM EC2
- Run a sample Spark program from the HAQM EMR cluster’s master instance to read the files from HAQM S3, convert them into parquet format and write back to an HAQM S3 destination.
The following diagram explains how the services work together:
The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to HAQM Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to HAQM S3.
HAQM Data Firehose can buffer incoming messages into larger records before delivering them to your HAQM S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.
An Apache Spark job reads the messages from HAQM S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like HAQM Athena.
Considerations
The maximum size of a record sent to Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Data Firehose is the best approach. Data Firehose also offers compression of messages after they are written to the Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Data Firehose delivers a previously compressed message to HAQM S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Data Firehose, it is delivered to HAQM S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.
We will see how to overcome this issue by reading the files using the HAQM S3 API operations later in this blog.
Prerequisites and assumptions
To follow the steps outlined in this blog post, you need the following:
- An AWS account that provides access to AWS services.
- An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
- The templates and code are intended to work in the US East (N. Virginia) Region only.
Additionally, be aware of the following:
- We configure all services in the same VPC to simplify networking considerations.
- Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.
Implementing the solution
You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.
This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.
For this parameter | Provide this |
StackName | Provide the stack name. |
ClientIP | The IP address range of the client that is allowed to connect to the cluster using SSH. |
FirehoseDeliveryStreamName | The name of the HAQM Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”. |
InstanceType | The EC2 instance type. |
KeyName | The name of an existing EC2 key pair to enable access to login. |
KinesisStreamName | The name of the HAQM Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream” |
Region | AWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only. |
EMRClusterName | A name for the EMR cluster. |
S3BucketName | The name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code. |
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.
If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.
To create each component individually, use the following steps.
1. Use the AWS CloudFormation template to configure HAQM VPC and create an HAQM S3 bucket
In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard HAQM S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
For this parameter | Do this |
StackName | Provide the stack name. |
S3BucketName | Provide a unique HAQM S3 bucket. This bucket is created in your account. |
ClientIp | Provide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.haqm.com” web url. |
After you specify the template details, choose Next. On the Review page, choose Create.
When the stack launch is complete, it should return outputs similar to the following.
Key | Value |
StackName | Name |
VPCID | Vpc-xxxxxxx |
SubnetID | subnet-xxxxxxxx |
SecurityGroup | sg-xxxxxxxxxx |
S3BucketDomain | <S3_BUCKET_NAME>.s3.amazonaws.com |
S3BucketARN | arn:aws:s3:::<S3_BUCKET_NAME> |
Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
2. Use the AWS CloudFormation template to create necessary IAM Roles
In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to HAQM S3 service, HAQM Data Firehose, HAQM CloudWatch Logs, and HAQM EC2 instances. The second IAM role is used by the HAQM Data Firehose service to access HAQM S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
For this parameter | Do this |
StackName | Provide the stack name. |
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.
When the stack launch is complete, it should return outputs similar to the following.
Key | Value |
LambdaRoleArn | arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole |
FirehoseRoleArn | arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole |
When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
3. Use an AWS CloudFormation template to configure the HAQM Data Firehose data stream
In this step, we set up HAQM Data Firehose with HAQM S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
For this parameter | Do this |
StackName | Provide the stack name. |
FirehoseDeliveryStreamName | Provide the name of the HAQM Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose” |
Role | Provide the Data Firehose IAM role ARN that was created as part of step 2. |
S3BucketARN | Select the S3BucketARN. You can get this from the step 1 AWS CloudFormation output. |
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.
4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function
In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides details.
For this parameter | Do this |
StackName | Provide the stack name. |
KinesisStreamName | Provide the name of the HAQM Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’ |
Role | Provide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template. |
S3Bucket | Provide the existing HAQM S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only. |
Region | Select the AWS Region. By default it is us-east-1 — US East (N. Virginia). |
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.
5. Use an AWS CloudFormation template to configure the HAQM EMR cluster
In this step, we set up an HAQM EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the HAQM EMR hive metastore. This HAQM EMR cluster is used to process the messages in HAQM S3 bucket that are created by the HAQM Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
For this parameter | Do this |
EMRClusterName | Provide the name for the EMR cluster. |
ClusterSecurityGroup | Select the security group ID that was created as part of the first AWS CloudFormation template. |
ClusterSubnetID | Select the subnet ID that was created as part of the first AWS CloudFormation template. |
AllowedCIDR | Provide the IP address range of the client that is allowed to connect to the cluster. |
KeyName | Provide the name of an existing EC2 key pair to access the HAQM EMR cluster. |
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.
When the stack launch is complete, it should return outputs similar to the following.
Key | Value |
EMRClusterMaster | ssh hadoop@ec2-XX-XXX-XXX-XXX.us-east-1.compute.amazonaws.com -i <KEY_PAIR_NAME>.pem |
Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
6. Use an AWS CloudFormation template to create an HAQM EC2 Instance to generate test data
In this step, we set up an HAQM EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.
You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
For this parameter | Do this |
EC2SecurityGroup | Select the security group ID that was created from the first AWS CloudFormation template. |
EC2Subnet | Select the subnet that was created from the first AWS CloudFormation template. |
InstanceType | Select the provided instance type. By default, it selects r4.4xlarge instance. |
KeyName | Name of an existing EC2 key pair to enable SSH access to the EC2 instance. |
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.
When the stack launch is complete, it should return outputs similar to the following.
Key | Value |
EC2Instance | ssh ec2-user@<Public-IP> -i <KEY_PAIR_NAME>.pem |
Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
7. Generate the test dataset and load into Kinesis Data Streams
After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to HAQM Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.
Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.
ssh ec2-user@<Public IP Address of the EC2 Instance> -i <SSH_KEY_PAIR_NAME>.pem
Run the following commands to generate some test data.
$ cd;
$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar
-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar
$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000
This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.
When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.
After all of the messages generated and sent to HAQM Kinesis Data Streams, program will exit gracefully.
The sample JSON input message format is shown as follows:
Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4. Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.
8. Processing Kinesis Data Streams messages using AWS Lambda
As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB. Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to HAQM Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.
We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided HAQM S3 bucket.
The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.
If we do not compress the message before sending to Data Firehose, it throws this error message in the HAQM CloudWatch Logs.
Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.
You can check the provided bucket to see if the messages are flowing into the bucket. The HAQM S3 bucket should show something similar to the following example:
You see the files generated from Data Firehose that do not have any extension. By default, Data Firehose does not provide any extension to the files that are generated in HAQM S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Data Firehose. As the message is already compressed, we are not selecting any compression option in Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.
9. Reading and converting the data into parquet format using Apache Spark program with HAQM EMR
As we noted down from the previous screen shot, Data Firehose by default does not generate any file extensions to the files that are written into HAQM S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.
To overcome this issue, we can use HAQM S3 API operations, particularly HAQMS3Client class, to list all the HAQM S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.
Once the HAQM EMR cluster creation is completed successfully, login to the HAQM EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.
- ssh hadoop@ec2-XX-XX-XX-XX.compute-1.amazonaws.com -i <KEYPAIR_NAME>.pem
- Make sure the security port 22 is opened to connect to the HAQM EMR master machine.
Run the Spark program using the following Spark submit command.
spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/
Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.
Argument # | Property | Value |
1 | –class | com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet |
2 | –master | yarn |
3 | –deploy-mode | client |
4 | s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar | |
5 | S3_BUCKET_NAME | The HAQM S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket. |
6 | <INPUT S3 LOCATION> | “fromfirehose/<YYYY>/”. The input files are created in this HAQM S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/” |
7 | <OUTPUT S3 LOCATION> | Provide an output directory name that will be created under the above provided HAQM S3 bucket. For example: “output-emr-parquet/” |
When the program finishes running, you can check the HAQM S3 output location to see the files that are written in parquet format.
Cleaning up after the migration
After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created HAQM S3 bucket. Make sure that you cleaned up the HAQM S3 bucket that was created before deleting the AWS CloudFormation templates.
Conclusion
In this post, we described the process of avoiding small file creation in HAQM S3 by sending the incoming messages to HAQM Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an HAQM EMR cluster.
About the Author
Srikanth Kodali is a Sr. IOT Data analytics architect at HAQM Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.