亚马逊AWS官方博客

使用 HAQM Managed Service for Apache Flink 进行 Apache Paimon CDC 数据摄取

1. 背景

Apache Paimon 是一个开源的开放数据湖表格式,其突出的 Partial Update、Changelog Producer、Branch 等功能特性使得用户可以基于此来构建近实时的流式数据湖。Apache Paimon 项目还提供强大的 CDC Ingestion 功能,使用户能够以流式方式将数据库和业务变更数据快速摄取至基于 Apache Paimon 构建的数据湖中,从而实时构建流式数据湖,从数据中获得实时的洞察。在亚马逊云科技云中,可以向 HAQM EMR 集群中提交 CDC Ingestion Job 来使用 Apache Paimon CDC 的 Ingestion 功能,但这种方式需要用户维护和管理 EMR 集群,对于中小团队来讲运维成本过高。本文将研究和实现使用 HAQM Managed Service for Apache Flink 以 Serverless 方式来运行 Apache Paimon CDC Ingestion Job,帮助用户快速将业务数据摄取到数据湖中。

2. 方案与实现

HAQM Managed Service for Apache Flink(以下简称 HAQM MSF)是亚马逊云科技提供的无服务器流式数据处理服务,MSF 为 Apache Flink 应用提供底层的基础架构,如计算资源、AZ Failover 弹性、并行计算、自动扩展、应用备份(通过 Checkpoints 和 Snapshots 来实现)等。用户可以使用 Apache Flink API(Flink SQL, Table API, DataStream API, Process Function)来构建流式数据应用,然后将 Flink 应用部署到 MSF 上,而不用考虑底层资源的管理以及计算资源的扩容,对 Flink 流式应用的开发、部署十分友好。

Apache Paimon 本身提供了很好的 CDC Ingestion 能力,目前支持从 MySQL、PostgreQL、MongoDB 数据库中进行数据库和表级别的 CDC Ingestion,也支持从 Kafka / Pulsar 消息系统中摄取各种 CDC 格式的数据形成 Paimon 数据表,支持的 CDC 数据格式包括 Canal、Debezium、Maxwell、OGG、AWS DMS CDC。用户通过向 Flink 集群提交 Paimon Flink Action Job,就可以将对应数据源的数据摄取到数据湖中。

提交 Paimon Flink Action Job 时,需要给 Job 设置相应的参数,如指定具体的 action、Paimon 目标数据库和表信息、数据源等相关参数信息。MSF 支持通过 Web Console、AWS CLI、SDK 及 API 等多种方式提交 Flink Job,不过在向 MSF 提交 Flink Job 时,首先需要在 MSF 中创建 MSF 应用,然后对应用进行配置,如应用 Jar 包的位置、应用的参数配置、应用的并发度等信息。在 Job 中,需要通过 KinesisAnalyticsRuntime.getApplicationProperties() 方法来获取 Job 需要的参数。

要把 Paimon Flink Action Job 提交到 MSF 中,除了要对 Job 的参数获取和设置进行改造外,还需要考虑:

  1. MSF 写入 HAQM S3 支持。MSF 底层没有集成 EMRFS,使用 Flink 的 flink-s3-fs-hadoop 来实现对 HAQM S3 数据的读写。
  2. Paimon Hive Catalog。通过创建 Hive Catalog 并和 AWS Glue Data Catalog 集成,来实现 Paimon 表的元数据管理。
  3. Athena 查询和分析 Paimon 表数据。如果要通过 Athena 查询和分析 Paimon 表数据,可以通过 Paimon 的 Iceberg 元数据兼容能力,在表数据提交的同时生成 Iceberg 兼容的元数据,这样 Athena 就可以直接查询 Paimon 表数据。

CDC 数据通过 MSF 摄取成 Paimon 表后形成原始数据层,可以通过 AWS Glue / MSF / EMR 来对这些原始数据进一步进行处理,以此来构建基于 Paimon 的 Lakehouse。整体架构如下图所示:

本文将集中在如何通过 MSF 来实现 Paimon CDC Ingestion。MSF 应用的代码实现请参考代码。需要注意的是,如果需要使用 AWS Glue Data Catalog 作为 Paimon 的 Hive Metastore,在编译打包代码前需要做以下操作:

  1. 安装 aws-glue-datacatalog-hive3-client 到本地。可以通过自行编译安装来自 awslabs 的 aws-glue-data-catalog-client-for-apache-hive-metastore 到本地,或者从一个已有的 HAQM EMR 集群中拷贝该 jar 包(usr/share/aws/hmclient/lib/ 目录下)到本地并进行 maven 本地安装(对于 Paimon 1.0+版本,Paimon Hive Catalog 可以完美集成 AWS Glue Data Catalog,不需要额外进行 patch)。
  2. 拷贝已有的 HAQM EMR 集群 hive-site.xml(/etc/hive/conf.dist 目录下)文件到代码根目录,以便 Paimon Hive Catalog 能和 AWS Glue Data Catalog 进行连接。

下面将以从 HAQM MSK 中摄取 Debezium Json 格式的 CDC 数据到 Paimon 表为例,来创建、配置和运行基于 MSF 的 Paimon CDC Ingestion 应用。

1. 编译、打包应用

代码 Clone 到本地,进入 PaimonCDCSink 目录,执行 mvn 命令进行打包,并将打好的 jar 包放到 S3 中。

# Maven 打包
mvn clean package -P KafkaCDC
#将jar 包放到s3上
aws s3 cp target/kafka-paimon-cdc-sink-1.0.jar s3://YourBucket/path/kafka-paimon-cdc-sink.jar

2. 创建 MSF 应用

在亚马逊云科技控制台上创建 MSF 应用。

MSF 应用创建后会自动创建一个 service role,修改该 IAM role,为该 role 添加 AWS Glue、EC2 以及 S3 资源访问权限。 然后,可以为 Paimon CDC 应用进行配置。准备如下 JSON 文件,该文件中设置 Paimon CDC Ingestion Flink Job 的各种配置信息,如 CDC action name、kafka 集群信息、Paimon 表信息(本配置中将配置生成 Iceberg 兼容元数据,并将表的元数据信息同步到 AWS Glue Data Catalog)。

{
  "ApplicationName": "msk-paimon-cdc",
  "CurrentApplicationVersionId": 1,
  "ApplicationConfigurationUpdate": {
    "ApplicationCodeConfigurationUpdate": {
      "CodeContentTypeUpdate": "ZIPFILE",
      "CodeContentUpdate": {
        "S3ContentLocationUpdate": {
          "BucketARNUpdate": "arn:aws:s3:::YOURBUCKET",
          "FileKeyUpdate": "PATH/OF/YOUR/JAR"
        }
      }
    },
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "ActionConf",
          "PropertyMap": {
            "action": "kafka_sync_database",
            "database": "paimon_flink",
            "primary_keys": "id",
            "table_prefix": "ods_",
            "warehouse": "s3://YOURBUCKET/PATH/OF/DATA/"
          }
        },
        {
          "PropertyGroupId": "KafkaConf",
          "PropertyMap": {
            "kafka_conf@_properties.bootstrap.servers": "YOUR KAFKA SERVERS",
            "kafka_conf@_properties.auto.offset.reset": "earliest",
            "kafka_conf@_properties.group.id": "YOUR_GROUP_ID",
            "kafka_conf@_topic": "YOUR_TOPIC",
            "kafka_conf@_value.format": "debezium-json"
          }
        },
        {
          "PropertyGroupId": "CatalogConf",
          "PropertyMap": {
            "catalog_conf@_hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "catalog_conf@_hadoop.fs.s3.buffer.dir": "/var/tmp",
            "catalog_conf@_case-sensitive": "false"
          }
        },
        {
          "PropertyGroupId": "TableConf",
          "PropertyMap": {
            "table_conf@_bucket": "4",
            "table_conf@_metadata.iceberg.storage": "hive-catalog",
            "table_conf@_metadata.iceberg.manifest-legacy-version": "true",
            "table_conf@_metadata.iceberg.hive-client-class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient",
            "table_conf@_fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "table_conf@_fs.s3.buffer.dir": "/var/tmp",
            "table_conf@_sink.parallelism": "4"
          }
        }
      ]
    },
    "FlinkApplicationConfigurationUpdate": {
      "ParallelismConfigurationUpdate": {
        "AutoScalingEnabledUpdate": true,
        "ParallelismPerKPUUpdate": 1,
        "ParallelismUpdate": 4
      }
    }
  }
}

执行 AWS CLI 命令更新 MSF 应用配置:

aws kinesisanalyticsv2 update-application \
--cli-input-json file://path/of/update-msk-paimon-cdc.json

配置 MSF 应用在 VPC 中运行,准备如下命令 JSON 文件:

{
  "ApplicationName": "msk-paimon-cdc",
  "CurrentApplicationVersionId": 2,
  "VpcConfiguration": {
    "SecurityGroupIds": ["SecurityGroupId"],
    "SubnetIds": ["subnet-id1", "subnet-id2"]
  }
}
aws kinesisanalyticsv2 add-application-vpc-configuration \
--cli-input-json file://path/of/update-vpc-msk-paimon-cdc.json

命令执行完后,在 MSF 控制台可以看到完整的配置信息:

3. 运行 MSF 应用

准备如下 JSON 文件,这里我们指定不从指定的 snapshot 进行恢复:

{
    "ApplicationName": "msk-paimon-cdc",
    "RunConfiguration": {
        "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
         }
    }
}
aws kinesisanalyticsv2 start-application \
--cli-input-json file://path/of/start-msk-paimon-cdc.json

MSF 应用成功运行后,可以通过 MSF 提供的 Flink Web UI 来查看应用的运行情况。

CDC 数据解析摄取后,数据以 Paimon 格式写到 S3,并把元数据以 Iceberg 兼容的形式写入到 Glue Data Catalog 中。

最后,通过 Athena 可以查询 Paimon CDC Ingestion 的表中的数据。

3. 总结

本文通过代码实现了在 HAQM managed service for Apache Flink 服务中运行 Apache Paimon CDC Ingestion Job,支持从 MySQL / PostgreSQL / MongoDB / Kafka 这些源中摄取 CDC 数据,将数据以 Apache Paimon 的表格式的形存储到 HAQM S3 中,并支持将 Paimon 元数据同步到 AWS Glue Data Catalog 中,实现通过 Athena 来查询 Paimon 表数据。 HAQM managed service for Apache Flink 以 Serverless 的方式运行 Apache Paimon CDC Ingestion Job,用户无需关心 Flink 集群资源的管理。同时,借助 Paimon 强大的 CDC Ingestion 能力,用户可以在亚马逊云科技上快速构建基于 Paimon 的流式数据湖.

参考资料

本篇作者

程亮

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的解决方案咨询和设计. 有多年的互联网软件研发、系统架构设计及大数据产品开发经验。