亚马逊AWS官方博客

基于 Apache Kafka 和 AWS 构建端到端的无服务器流式 ETL 管道

Apache Kafka 是全球广泛使用的消息代理,因其在保持稳健性的同时具有高性能而备受青睐。HAQM Redshift 是一个强大的数据仓库,用户可以在其上构建实时和离线场景的业务分析工作负载。

从 Kafka 向 Redshift 传递消息有多种方法,例如,如果基于 HAQM Managed Stream for Apache Kafka,可以简单地使用 HAQM MSK connector,详情参考这篇博客

另外,如果您使用自托管的 Apache Kafka,您可以选择使用 Glue Visual ETL,从 Glue Kafka Connector 开始,这只需要很少的代码,能够通过简单地拖动 UI,来完成一些基本的转换工作。

然而,在某些情况下,如果您无法使用 Apache Kafka 的内置连接器(可能是由于需要复杂的数据转换,或者仅仅是因为 HAQM MSK connector 不支持 SASL/SCRAM-SHA-256 等认证),您还有第三个选择:在 HAQM EMR serverless 上运行 Spark Streaming 作业

关于 HAQM EMR Serverless

HAQM EMR Serverless 是 HAQM EMR 的一个新选项,它使数据工程师和分析师能够轻松且经济高效地运行使用开源大数据框架(如 Apache Spark、Hive 或 Presto)构建的应用程序,而无需调优、操作、优化、保护或管理集群。

关于此解决方案

此解决方案实施简单,并且使用 Jupyter Notebook on HAQM EMR Studio 进行调试非常容易。该解决方案使用 PySpark 编写,如果您不了解 PySpark 也没关系,因为我们将在本博客中为您详细讲解代码。

然而,本博客假设读者具备 Apache Spark 的基础知识。如果您从未听说过 Apache Spark,我们强烈建议您先了解 Hadoop Map Reduce 和 Apache Spark 的基本概念。

架构

架构很简单,您只需要:

1. 创建 S3 bucket 作为容器来存储数据

2. 创建 Redshift Serverless workgroup 作为目标

3. 然后创建 EMR Serverless Application:

  1. 在 EMR Studio workspace 中创建 Jupyter Notebook
  2. 编写您的 PySpark 代码并使其工作

4. 最后将您的代码提交到 EMR Serverless application

实施步骤

行动胜于空谈,让我们开始动手。

创建 S3 bucket

创建一个 S3 bucket,或者您可以重用现有的 S3 bucket。

该 bucket 主要有以下两个用途:

  • 用于存储 EMR 操作,如 Kafka checkpoint、Jupyter Notebook 文件等。
  • 作为 CSV 数据的临时存储。因为 EMR Redshift connector 会将转换后的数据写入 S3,然后 Redshift 将执行 COPY,从 S3 拉取数据到 Redshift。

创建 Redshift Serverless Workgroup

  1. 转到 AWS Console -> HAQM Redshift -> Serverless Dashboard -> Create workgroup
  2. 按如下输入参数。对于基本 RPU,如果您的搜索工作负载不重,可以从 8 个 RPU 开始。查看更多关于 Redshift RPU 的详细信息。
  3. 为 Redshift workgroup 设置 VPC。请确保 Kafka、HAQM EMR Serverless、HAQM Redshift Serverless 之间的网络连接没有问题。您可以将它们放在一个 VPC 中,或者如果它们不在同一个 VPC 中,请确保 VPC 之间没有连接问题。
  4. 创建 Namespace,Namespace 是数据库对象和用户的集合。数据属性包括数据库名称和密码、权限以及加密和安全性。
  5. 设置数据库的凭证。
  6. 将 IAM 角色关联到 Namespace 以运行这些 SQL 命令。您可以创建一个具有 HAQMRedshiftAllCommandsFullAccess 策略的 IAM 角色作为此配置的默认角色,并根据您的要求稍后调整权限。
  7. 验证所有内容并点击创建。

创建 EMR Serverless Application

  1. 在 AWS Console 的 HAQM EMR 页面中转到 EMR Serverless,点击 Get Started。在弹出窗口中点击 launch EMR Studio
  2. 在创建应用程序页面,选择 Use custom settings。您只需要设置两项:
    一个是启用 EMR studio 的端点,这用于运行 Jupyter Notebook;

    另一个是 EMR 应用程序的 VPC。请确保 Kafka、HAQM EMR Serverless、HAQM Redshift Serverless 之间的网络连接没有问题。

  3. 点击创建并启动应用程序。在创建 EMR 应用程序后,我们就可以创建 Jupyter Notebook(EMR Studio Workspace)了。

创建 Jupyter Notebook

一旦创建了 EMR Application,我们就可以创建 Jupyter Notebook 了。

1. 在 EMR Studio 中,转到 Workspaces 并点击 Create Workspace。

2. 给工作空间命名并指定 S3 位置来存储笔记本,然后点击创建。

3. 工作空间创建后,您可以点击工作空间名称来启动 Jupyter Notebook。

4. 您需要 spark-sql-kafka-0-10_2.12 及其依赖项来启动您的 PySpark 应用程序以从 Kafka 读取数据。转到 Maven 或 Apache 网站下载 spark-sql-kafka-0-10_2.12:3.5.1 的 jar 文件,并将其上传到步骤 2 中配置的 S3。

<!-- http://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.5.1</version>
    <scope>test</scope>
</dependency>

编写 PySpark 代码

本博客的重点是从 Kafka 读取数据并写入 Redshift 的基本管道,因此我们不会涉及其他 Spark 功能。

1. 在 Jupyter Notebook 中,点击 PySpark 创建一个空白笔记本。

第一步是引用我们刚刚上传的 spark-sql-kafka jar 文件。

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
    }
}

2. 相应地填写基本参数,启动 Spark Session。

import sys
from pyspark.context import SparkContext
import json
import boto3
from urllib.parse import urlparse
from io import StringIO
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import from_json, col, to_json, lit, udf, length, when, encode, expr, coalesce
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, TimestampType, LongType, DoubleType

aws_region = "us-west-2"
checkpoint_interval = 60
checkpoint_location = "s3://xueyao-emrserverless-spark/check_point/"
consumer_group = "group1"
kafka_broker = "kafka_broker_address:9092"
startingOffsets = "earliest"
redshift_host = "myredshift.redshift-serverless.amazonaws.com"
redshift_port = "5439"
redshift_username = "user_name"
redshift_password = "password"
redshift_database = "dev"
redshift_schema = "public"
redshift_table = "your_table_name"

#指定 S3 文件夹位置,用于临时存储要被 Redshift COPY 的 CSV 文件
redshift_tmpdir = "s3://xueyao-emrserverless-spark/redshift_tmp/"
tempformat = "CSV"

#指定具有 S3 bucket 访问权限的 redshift 角色 ARN
redshift_iam_role = "Redshift_IAM_Role_ARN"

#创建 SparkSession
spark = SparkSession.builder \
.config('spark.scheduler.mode', 'FAIR') \
.getOrCreate()
sc = spark.sparkContext
maxerror = 0

执行代码块后,您将能够看到 SparkSession 可用,如图所示:

3. 将数据从 Kafka 加载到 Spark Data Frame 中。

topic = "mytopic9"

kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_broker)
    .option("subscribe", topic)
    .option("kafka.consumer.commit.groupid", consumer_group)
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "PLAINTEXT") \
    .load()
)

kafka_json_df = kafka_df.withColumn("value", expr("CAST(value AS STRING)"))

您可以通过调用以下命令查看数据框的初始结构:

kafka_json_df.printSchema()

您可以通过调用以下命令验证数据是否成功加载:

kafka_json_df.show()

4. 如您所见,来自 Kafka 的初始数据模式是键值对,以及一些元数据信息。我们将进一步展开数据。

发送到 Kafka 的原始 json 数据如下:

{
    "tlsDetails": {
      "tlsVersion": "TLSv1.3",
      "cipherSuite": "TLS_AES_128_GCM_SHA256",
      "clientProvidedHostHeader": "ec2.ap-southeast-1.amazonaws.com"
    },
    "userIdentity": {
      "type": "IAMUser",
      "principalId": "AIDATNBC4RC76RXDO7ERO",
      "arn": "arn:aws:iam::xxxxx:user/rd-monitor",
      "accountId": "xxxxx",
      "accessKeyId": "xxxxxx",
      "userName": "rd-monitor"
    },
    "requestParameters": {
      "instancesSet": {},
      "filterSet": {
        "items": [
          {
            "name": "instance-state-name",
            "valueSet": {
              "items": [
                {
                  "value": "running"
                }
              ]
            }
          }
        ]
      },
      "includeAllInstances": "false"
    }
  }

为了将数据存储到如下表中:

CREATE TABLE public.test_13022025 (
    tlsversion character varying(65535) ENCODE lzo,
    ciphersuite character varying(65535) ENCODE lzo,
    clientprovidedhostheader character varying(65535) ENCODE lzo,
    type character varying(65535) ENCODE lzo,
    principalid character varying(65535) ENCODE lzo,
    arn character varying(65535) ENCODE lzo,
    accountid character varying(65535) ENCODE lzo,
    accesskeyid character varying(65535) ENCODE lzo,
    username character varying(65535) ENCODE lzo
) DISTSTYLE EVEN;

我们需要为存储在数据框 kafka_json_df 中的数据创建相应的模式来展开字段。

我们通过使用 StructType 来实现这一点,如下所示:

schema = StructType([
    StructField("tlsDetails", StructType([
        StructField("tlsVersion", StringType(), True),
        StructField("cipherSuite", StringType(), True),
        StructField("clientProvidedHostHeader", StringType(), True)
    ]), True),
    StructField("userIdentity", StructType([
        StructField("type", StringType(), True),
        StructField("principalId", StringType(), True),
        StructField("arn", StringType(), True),
        StructField("accountId", IntegerType(), True),
        StructField("accessKeyId", StringType(), True),
        StructField("userName", StringType(), True)
    ]), True)
])
streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), schema)).selectExpr("values_json.*")

这将提取 tlsDetailsuserIdentity 中的字段,您可以通过 streaming_df.show() 验证:

5. 我们进一步将所有字段选择到 flattended_df 中:

flattened_df = streaming_df.select(
    col("tlsDetails.tlsVersion").alias("tlsVersion"),
    col("tlsDetails.cipherSuite").alias("cipherSuite"),
    col("tlsDetails.clientProvidedHostHeader").alias("clientProvidedHostHeader"),
    col("userIdentity.type").alias("type"),
    col("userIdentity.principalId").alias("principalId"),
    col("userIdentity.arn").alias("arn"),
    col("userIdentity.accountId").alias("accountId"),
    col("userIdentity.accessKeyId").alias("accessKeyId"),
    col("userIdentity.userName").alias("userName") 
)

我们可以看到扁平化的数据框包含来自 Kafka 的数据:

现在您可以对数据进行任何必要的修改、聚合等操作。由于我们不会涉及转换部分,我们将把扁平化的数据框保存到 Redshift。

6. 为了避免向 Redshift 表写入 null,我们将为所有没有值的列填入 ““。

final_df = flattened_df.select([coalesce(col(c), lit("")).alias(c) for c in flattened_df.columns])

7. 我们将从 flattened data frame 创建 GlobalTempView,并准备从数据框到 redshift 表的列映射。

flattened_df.createOrReplaceGlobalTempView("internal_source_table")

columnMap = {
    "tlsVersion": "tlsVersion",
    "cipherSuite": "cipherSuite",
    "clientProvidedHostHeader": "clientProvidedHostHeader",
    "type": "type",
    "principalId": "principalId",
    "arn": "arn",
    "accountId": "accountId",
    "accessKeyId": "accessKeyId",
    "userName": "userName"
}

8. 最后,我们使用 spark sql 查询从上一步创建的 internal source table 中选择数据,并写入 Redshift:

select_expr = [f"{original} AS {new}" for original, new in columnMap.items()]
query = f"SELECT {', '.join(select_expr)} FROM global_temp.internal_source_table"
csdf = spark.sql(query)
csdf.write \
    .format("io.github.spark_redshift_community.spark.redshift") \
    .option("url", "jdbc:redshift://{0}:{1}/{2}".format(redshift_host, redshift_port, redshift_database)) \
    .option("dbtable", "{0}.{1}".format(redshift_schema, "test_0224")) \
    .option("user", redshift_username) \
    .option("password", redshift_password) \
    .option("tempdir", redshift_tmpdir) \
    .option("tempformat", tempformat) \
    .option("tempdir_region", aws_region) \
    .option("aws_iam_role", redshift_iam_role).mode("append").save()

注意,如果表不存在,EMR Serverless connector 将创建该表。如果表已存在,数据将被追加到表中。

您应该能够在 Spark Job Progress 中看到完成信息:

在 Redshift Query Editor V2 中,我们可以看到数据已插入到表中。

故障排除提示

在某些情况下,您可能会发现数据没有写入 Redshift 表,或者表没有被创建。您可以首先检查在之前步骤中配置的 S3 bucket 中是否创建了临时 CSV 文件。

您应该能够看到创建的 CSV 文件:

您可以下载 CSV 文件查看里面是否有数据。如果有,您的 PySpark 代码应该工作正常。

接下来,您可以转到 Query Editor V2,从表 “SYS_LOAD_ERROR_DETAIL” 中选择错误信息 select * from SYS_LOAD_ERROR_DETAIL,这将提供 Redshift 从 S3 COPY 数据失败的详细错误消息。

总结

在本博客中,我们使用 EMR Serverless 和 Redshift Serverless 构建了无服务器 ETL 管道,并提供了关于如何使用 PySpark 编码以及如何调试管道的分步说明。

您只需要具备 Apache Spark 和 Python 的基础知识,就可以动手构建一个经过验证的强大数据分析解决方案。

这个解决方案已被一个 AWS 客户采用,该客户在 Redshift Serverless 中存储了超过 7000 亿条记录,每天摄入超过 150 亿条新记录,同时只需要最少的数据工程师和系统维护人员。如果您需要分析来自 Kafka 源的数据,我们鼓励读者尝试使用它。

本篇作者

白雪尧

亚马逊云科技解决方案架构师,毕业于瑞典皇家理工学院,曾任职于 SAP、微软的开发、技术支持部门。对高并发低延迟现代化应用架构、数据分析有丰富经验,对 HPC/半导体设计行业有行业经验。

潘超

亚马逊云科技数据分析解决方案架构师。负责客户大数据解决方案的咨询与架构设计,在开源大数据方面拥有丰富的经验。工作之外喜欢爬山。

谢佰臻

亚马逊云科技解决方案架构师,负责基于云计算方案架构的咨询和设计,目前专注于 Serverless、DevSecOps。