如果记录顺序无关紧要,我可以使用单个 Kinesis 分片并行调用 Lambda 函数吗?

Posted

技术标签:

【中文标题】如果记录顺序无关紧要,我可以使用单个 Kinesis 分片并行调用 Lambda 函数吗?【英文标题】:Can I invoke Lambda functions in parallel using a single Kinesis shard if record order doesn't matter? 【发布时间】:2016-03-17 21:15:57 【问题描述】:

我有一个应用程序,我只需要 1 个 Kinesis 分片的带宽,但我需要并行调用许多 lambda 函数来跟上记录处理的速度。我的记录大小处于高端(其中一些超出了 1000 KB 的限制),但传入速率仅为 1 MB/s,因为我使用单个 EC2 实例来填充流。由于每条记录都包含一个内部时间戳,所以我不关心按顺序处理它们。基本上我有几个月的数据需要迁移,我想并行迁移。

处理后的记录为可以处理 1000 个并发客户端的数据库集群提供记录,因此我之前的解决方案是将我的 Kinesis 流拆分为 50 个分片。然而,事实证明这很昂贵,因为我需要分片来并行处理。我使用的带宽不到 1%,我不得不延长保留期。

从长远来看,我想答案包括拆分我的记录,这样消耗时间就不会是生产时间的巨大倍数。这不是现在的选择,但我意识到我有点滥用系统。

有没有一种方法可以让一个保持顺序的 lambda 函数与单分片 Kinesis 流相关联,并让它在一批记录上异步调用另一个 lambda 函数?然后我可以使用单个 Kinesis 分片(或其他数据源)并仍然享受大规模并行处理。

我真正需要的是 Lambda 事件源配置中的一个选项,让 Kinesis 说“我不在乎保留这些记录的顺序”。但是我想跟上失败执行的迭代器位置变得更具挑战。

【问题讨论】:

你能链接你的 lambda 函数吗?第一个函数将获取元事件,它主要将其拆分为较小的事件,您可以使用这些事件触发另一个 lambda 函数。第二个 lambda 函数可以并行触发。 我相信是这样,但现在我需要将我的记录缓存在方便的地方(如 DynamoDB)以正确处理故障和重试,并且由于 lambda 函数不能超过 300 秒,所以我不能具有长期运行的协调器功能,因此它必须在过期(并被重新调用)后仍然存在。 这取决于您可能遇到的错误类型。例如,如果您的数据中有“毒丸”,您只想将它​​们扔掉。您还可以考虑将 Kinesis 流链作为中间缓冲机制。另一种机制是这些异常的“死信队列”,也在 Kinesis 或 SQS 中,取决于此类错误的频率。 您是否考虑过使用 SQS?例如,使用 Elastic Beanstalk 和 SQS 作为工作环境 (docs.aws.amazon.com/elasticbeanstalk/latest/dg/…) 与带有 kinesis 的 lambda 非常相似,但可能更适合您的用例。 谢谢,我去看看 SQS。通过 Event 方法调用 Lambda 时,您还遇到了另一个问题。也就是说,在这种情况下,有效负载大小有 128 KB 的限制,因此我们真正能够直接传递给 lambda 的是有关从何处获取实际有效负载的信息(例如 S3 存储桶和密钥)。 【参考方案1】:

According to somebody that works in AWS,可以将多个 Lambda 函数附加到同一个 Kinesis 流。也就是说,我现在正在测试它,但没有成功。

编辑:

它工作正常。

【讨论】:

据我了解,kinesis 维护数据记录的顺序,那么如果有多个 lambdas 消耗来自单个分片的数据,kinesis 如何为您工作?你能详细说明一下吗?

以上是关于如果记录顺序无关紧要,我可以使用单个 Kinesis 分片并行调用 Lambda 函数吗?的主要内容,如果未能解决你的问题,请参考以下文章

可以移动记录集中的单个记录吗?

具有约 5 亿条记录的 Java 嵌入式数据库 [关闭]

UITableview 的名称部分按排序顺序排列

如果顺序无关紧要,如何比较python中的两个字符串?

关于Promise.all

将多条记录插入 SQL 表时,它不遵循顺序