在 Kinesis 中使用分区键来保证具有相同键的记录由相同的记录处理器 (lambda) 处理

Posted

技术标签:

【中文标题】在 Kinesis 中使用分区键来保证具有相同键的记录由相同的记录处理器 (lambda) 处理【英文标题】:Using the partition key in Kinesis to guarantee that records with the same key are processed by the same record processor (lambda) 【发布时间】:2018-12-19 06:21:11 【问题描述】:

我正在使用 AWS kinesis 和 lambda 开发实时数据管道,我正在尝试弄清楚如何保证来自相同数据生产者的记录由相同的分片处理,并最终由相同的 lambda 函数实例处理.

我的方法是使用分区键来确保来自同一个生产者的记录由同一个分片处理。但是,我无法让来自同一个分片的记录由同一个 lambda 函数实例处理。

基本设置如下:

有多个数据源将数据发送到运动流。 流有多个分片来处理负载。 有一个 lambda 函数通过事件源映射(批量大小为 500)连接到尖叫声。 lambda 函数正在处理记录,进行一些数据转换和其他一些事情,然后将所有内容放入 firehose。 稍后会发生更多事情,但这与问题无关。

看起来是这样的:

如图所示,调用了三个lambda函数实例进行处理;每个分片一个。 在此管道中,来自同一数据源的记录由同一 lambda 函数实例处理非常重要。根据我的阅读,这可以通过确保来自同一来源的所有记录使用相同的分区键来保证,以便它们由相同的分片处理。

分区键

分区键用于按分片对数据进行分组 溪流。 Kinesis Data Streams 服务隔离数据记录 属于一个流到多个分片,使用分区键 与每个数据记录关联以确定给定数据的哪个分片 记录属于。分区键是 Unicode 字符串,最大 长度限制为 256 字节。一个 MD5 哈希函数用于映射 分区键为 128 位整数值并映射关联数据 记录到分片。当应用程序将数据放入流中时,它 必须指定一个分区键。

来源:https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key

这行得通。所以具有相同分区键的记录由同一个分片处理。但是,它们由不同的 lambda 函数实例处理。因此,每个分片调用一个 lambda 函数实例,但它不仅处理来自一个分片的记录,还处理来自多个分片的记录。这里似乎没有将记录移交给 lambda 的模式。

这是我的测试设置: 我将一堆测试数据发送到流中并在 lambda 函数中打印记录。这是三个函数实例的输出(检查每行末尾的分区键。每个键应该只出现在三个日志之一中,而不是多个日志中):

Lambda 实例 1:

'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'
'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'
'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'
'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'
'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'
'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'

Lambda 实例 2:

'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'
'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'
'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'
'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'
'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'

Lambda 实例 3:

'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'
'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'
'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'
'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'

这就是我将数据插入流的方式(如您所见,分区键设置为源 ID):

processed_records = []
for r in records:
    processed_records.append(
        'PartitionKey': str(r['source']),
        'Data': json.dumps(r),
    )

kinesis.put_records(
    StreamName=stream,
    Records=processed_records,
)

所以我的问题是:

为什么每个 lambda 函数不只处理一个分片的记录? 如何做到这一点?

谢谢!

【问题讨论】:

您是否使用 KCL 来消费流中的记录? 我正在通过事件源映射 (docs.aws.amazon.com/de_de/lambda/latest/dg/…) 直接使用流。所以记录通过传递给函数的事件交给lambda函数。 @oneschilling 你解决过这个问题吗?我也有同样的问题 @gfree 显然这是不可能的。我还与 AWS 支持人员进行了交谈,他们证实了这一点。我通过他们提出了功能请求。我认为他们最初计划将其作为与 lambda 结合的 kinesis 的一个特征,因为文档在一开始就说明了这一点,但在某些时候发生了变化。所以对我们来说,此时最好的选择是改变我们插入数据的方式,因为我们依赖于管道中的顺序。我们将该逻辑转移到数据生产者,使管道顺序不可知,这意味着我们可以毫无问题地增加分片的数量。 【参考方案1】:

您为什么要关心哪个 Lambda 实例处理分片? Lambda 实例无论如何都没有状态,因此哪个实例读取哪个分片并不重要。更重要的是,任何时候 Lambda 实例都只会从一个分片中读取。在完成调用后,它可能会从另一个分片中读取。

【讨论】:

当传入的消息来自不同的系统(签名)时,您可能希望使用不同的 lambda 进行处理,这需要以不同的方式进行处理,因此不必将 if...else 放入单个 lambda让单独的 lambdas 负责处理这些消息可能会有所帮助

以上是关于在 Kinesis 中使用分区键来保证具有相同键的记录由相同的记录处理器 (lambda) 处理的主要内容,如果未能解决你的问题,请参考以下文章

具有相同键的(嵌套)字典的 Pythonic 替代方案?

如果自定义分区器为具有相同键的记录选择不同的分区怎么办?

如何确定 AWS kinesis 流中的分区键总数?

kafka主题分区的数量和数据中不同键的数量

Amazon Kinesis Streams - 每个分片有多个“主题”?

如何使用包含Swift中相同键的多个值的查询参数构建URL?