Amazon Kinesis 和 AWS Lambda 重试

Posted

技术标签:

【中文标题】Amazon Kinesis 和 AWS Lambda 重试【英文标题】:Amazon Kinesis & AWS Lambda Retries 【发布时间】:2015-12-06 17:55:31 【问题描述】:

我对 Amazon Kinesis 很陌生,所以这可能只是我理解的一个问题,但在 AWS Lambda FAQ 中它说:

发送到您的 AWS Lambda 函数的 Amazon Kinesis 和 DynamoDB Streams 记录按分片严格序列化。这意味着如果您将两条记录放在同一个分片中,Lambda 保证您的 Lambda 函数将在使用第二条记录调用之前成功使用第一条记录调用。如果对一条记录的调用超时、受到限制或遇到任何其他错误,Lambda 将重试直到成功(或记录达到其 24 小时到期),然后再继续下一条记录。不保证跨不同分片的记录顺序,并且每个分片的处理是并行发生的。

我的问题是,如果由于某种原因某些格式错误的数据被生产者放入分片中,而当 Lambda 函数将其拾取时出错并不断重试,会发生什么情况?这意味着该特定分片的处理将被错误阻止 24 小时。

通过将问题包装在自定义错误中并将此错误与所有成功处理的记录一起发送到下游并让消费者处理来处理此类应用程序错误的最佳做法是?当然,这对于像空指针一样使程序崩溃的不可恢复错误仍然无济于事:在接下来的 24 小时内,我们将再次回到阻塞重试循环。

【问题讨论】:

【参考方案1】:

不要想太多,Kinesis 只是一个队列。您必须成功使用一条记录(即从队列中弹出)才能继续下一条。就像一个 FIFO 堆栈。

适当的方法应该是:

从流中获取记录。 在 try-catch-finally 块中处理它。 如果记录处理成功,没问题。 但如果失败,请将其记录到另一个地方以调查 失败的原因。 在逻辑块的末尾,始终保持位置为 动态数据库。 如果您的系统发生内部故障(内存错误、硬件错误 等)那是另一个故事;因为它可能会影响处理所有 记录,而不仅仅是一个。

顺便说一句,如果处理记录的时间超过 1 分钟,那么很明显你做错了什么。由于 Kinesis 旨在每秒处理数千条记录,因此您不应该为每条记录处理如此长的作业。

您要问的问题是队列系统的一般问题,有时称为“有毒消息”。为了安全起见,您必须在业务逻辑中处理它们。

http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages

【讨论】:

听起来很合理,但只是一个关于 DynamoDb 位的快速问题,为什么我需要保留位置(我想你是指序列号)? 因为当您停止“Kinesis Consumer Application”节点并稍后启动时;您应该能够从上一个点继续。 啊,是的,这很有道理。 两个答案都很好,说的也差不多,但我要给@az3答案,因为他先回答了。 在 worker.java 中,它调用 runProcessLoop 并调用 shardConsumer.consumeShard() ,它调用 checkAndSubmitNextTask() 以检查 readyForNextTask 与否。如果 notReady 它不消费新记录。那么工人如何在没有记录处理器处理以前的记录的情况下检索新记录。【参考方案2】:

这是有关在 Kinesis 中处理事件的常见问题,我将尝试为您提供一些要点来构建您的 Lambda 函数来处理“损坏”数据的此类问题。由于最佳实践是将系统的部分写入 Kinesis 流和从 Kinesis 流读取的其他部分分开,因此您通常会遇到此类问题。

首先,为什么会有这样的问题事件

使用 Kinesis 处理您的事件是一种很好的方法,可以将一个复杂的系统分解为一个很好的方法,该系统同时进行前端处理(服务最终用户)和/代码后端处理(分析事件),系统的两个独立部分。前端人员可以专注于他们的业务,而后端人员不需要将代码更改推送到前端,如果他们想添加功能来服务于他们的分析用例。 Kinesis 是一个事件缓冲区,既打破了同步的需求,又简化了业务逻辑代码。

因此,我们希望写入流的事件在其“schema”中具有灵活性,如果前端团队希望更改事件格式,添加字段、删除字段、更改协议或加密密钥,他们应该能够随心所欲地执行此操作。

现在,需要从流中读取数据的团队能够以有效的方式处理此类灵活的事件,而不是在每次发生此类更改时中断他们的处理。因此,您的 Lambda 函数会看到它无法处理的事件应该很常见,并且“poison-pill”并不像您预期​​的那样罕见。

其次,您如何处理此类有问题的事件?

您的 Lambda 函数将获得要处理的批次事件。请注意,您不应该一个一个地获取事件,而是批量获取事件。如果您的批次太小,您将很快在流上出现较大的滞后。

对于每个批次,您将遍历事件、处理它们,然后在 DynamoDB 中检查该批次的最后一个序列 ID。 Lambda 会自动执行大部分这些步骤(在此处查看更多信息:http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):

console.log('Loading function');

exports.handler = function(event, context) 
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) 
        // Kinesis data is base64 encoded so decode here
        payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
    );
    context.succeed();
;

这就是“幸福路径”中发生的事情,如果所有事件都被处理没有任何问题的话。但是,如果您在批处理中遇到任何问题并且您没有“commit”带有成功通知的事件,则批处理将失败,您将再次获得批处理中的所有事件。

现在您需要确定处理失败的原因是什么。

临时问题(限制、网络问题...) - 稍等片刻再试几次即可。在许多情况下,问题会自行解决。

偶尔问题(内存不足...) - 最好增加 Lambda 函数的内存分配或减少批处理大小。在许多情况下,此类修改将解决问题。

Constant 失败 - 这意味着您必须忽略有问题的事件(将其放入 DLQ - 死信队列)或修改您的代码来处理它。

问题在于识别代码中的故障类型并以不同的方式处理它。您需要以某种方式编写 Lambda 代码以识别它(例如异常类型)并做出不同的反应。

您可以使用与 CloudWatch 的集成将此类故障写入控制台并创建相关警报。您还可以使用 CloudWatch Logs 作为记录“死信队列”并查看问题根源的一种方式。

【讨论】:

如果批处理中的一些事件成功,而其他事件失败,你如何处理?考虑一个 lambda,它使用 SES 为它收到的每个事件发送一封电子邮件。我可能会收到一批 100 个事件,并正确发送前 20 封电子邮件,但随后 SES 在其余时间中断。我想报告前 20 个事件的成功(这样我就不会向人们发送垃圾邮件),但我想重试后 80 个事件。这可能吗? 您可以使用查找功能管理列表以避免重复。您可以使用 DynamoDB 表,其中键为电子邮件,最后发送的电子邮件的值。另一种常见的解决方案是在 ElastiCache 中使用带有电子邮件密钥 TTL 的 Redis。在您发送电子邮件之前,您会检查上次向他发送电子邮件的时间,并在每次成功发送时更新记录。 我面临着同样的情况@CamJackson。 DynamoDB 现在支持可能对此有用的 TTL 在消息顺序不重要的情况下,将 80 个失败的事件重新插入同一流(很快重试)或新的retry_5_minutes_later 流是否有效?

以上是关于Amazon Kinesis 和 AWS Lambda 重试的主要内容,如果未能解决你的问题,请参考以下文章

Amazon Kinesis:在同步 Kinesis 分片和租约时捕获异常

将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录

将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录

如何在 Amazon Kinesis 服务上部署和运行 Amazon Kinesis 应用程序

Kinesis Firehose HTTP_Endpoint 目标响应格式

来玩 Serverless: 如何把 Express 应用迁移到 Amazon API 网关和 AWS Lambda 上