从 AWS Lambda 读取 SQS 队列
Posted
技术标签:
【中文标题】从 AWS Lambda 读取 SQS 队列【英文标题】:Read SQS queue from AWS Lambda 【发布时间】:2016-04-13 03:27:01 【问题描述】:我有以下基础设施:
我有一个带有 NodeJS+Express 进程的 EC2 实例,该进程在端口上侦听消息(进程 1)。每次进程接收到消息时,它都会将其发送到 SQS 队列。然后我在同一台机器上有另一个进程使用长轮询(process 2)读取队列。当它在队列中找到一条消息时,它会将数据插入 RDS 实例上的 MariaDB 数据库中。
(澄清一下,消息是由用户生成的,他们向进程 1 正在侦听的端点发送一段可以包含任意信息的数据)
现在我想将读取 SQS 的进程(process 2)放在 Lambda 函数中,以便写入队列的进程和从队列读取的进程完全独立。问题是我不知道这是否可能。
我知道调用 Lambda 函数是为了响应事件,目前支持的事件是 S3、SNS、SES、DynamoDB、Kinesis、Cognito、CloudWatch 和 Cloudformation,但不是 SQS。
我正在考虑使用 SNS 通知来调用 Lambda 函数,以便每次将消息推送到队列时,都会触发 SNS 通知并调用 Lambda 函数,但在玩了一会儿之后,我意识到这是无法从 SQS 创建 SNS 通知,只能将 SNS 通知写入队列。
现在我有点卡住了,因为我不知道如何继续。由于 AWS 服务的当前限制,我感觉无法创建此基础设施。有没有其他方法可以做我想做的事,还是我陷入了死胡同?
只是用我所做的一些研究来扩展我的问题,这个 github 存储库显示了如何从 Lambda 函数读取 SQS 队列 但是 lambda 函数只有在从命令行触发时才有效:
https://github.com/robinjmurphy/sqs-to-lambda
在自述文件中,作者提到以下内容:
更新:Lambda 现在支持将 SNS 通知作为事件源, 这使得这种 hack 对于 SNS 通知完全没有必要。你 如果您喜欢使用 Lambda 的想法,它可能仍然有用 处理 SQS 队列上的作业的函数。
但我认为这并不能解决我的问题,SNS 通知可以调用 Lambda 函数,但我不知道如何在 SQS 队列中收到消息时创建通知。
谢谢
【问题讨论】:
2 件可以用来摆脱困境的东西 (1) Lambda 可以收听 SNS。如果这不是您想要的,那么 (2) 使 SQS 队列成为 SNS 主题的订阅者之一 [每条 SNS 消息都将写入 SQS 队列] 我认为这是我开始感到困惑的地方。我不想将 SNS 消息写入队列。进入队列的消息由用户生成(他们将数据发布到 URL,我的 nodejs 线程处理请求,格式化数据并将其发送到 SQS 队列)。然后我想做的是,每次将用户消息插入队列时,以某种方式触发 SNS 通知以调用 Lambda 函数(实际上是通过使 Lambda 函数监听 SNS 来实现的) 而不是连接 Lambda、SQS 和 SNS 之间的点。我想建议考虑调度 lambda 函数来查看队列,如果存在则处理这些项目。另一种变体是使用 2 个 Lambda 函数 - 一个将从 SQS[scheduled] 中读取项目并将项目推送到 SNS,然后由另一个处理 Lambda 函数处理。 这是有道理的。事实上,这个解决方案类似于我们目前使用 cronjobs 并每隔几分钟或几秒检查一次队列的系统。我还在考虑使用 CloudWatch 检查队列统计信息并在有任何消息时触发 lambda 函数。感谢您的建议。我将进一步调查。 很高兴有帮助。我会写这个作为答案。 【参考方案1】:有几种策略可用于连接点,(A)Synchronously 或 Run-Sleep-Run 以保持 SNS、SQS、Lambda 之间的数据处理流程。
策略 1:让 Lambda 函数监听 SNS 并实时处理有助于记录/审计/重试处理]
策略 2 :假设您正在获取来自 SQS 队列的数据。您可以尝试使用 2 个 Lambda 函数 [Feeder & Worker]。
Feeder 将是
scheduled lambda function
,其工作是取走物品 来自 SQS(如果有)并将其作为 SNS 主题推送(并永远继续这样做)Worker 将被链接以监听 SNS 主题,该主题将执行
actual data processing
【讨论】:
任何一种策略的示例代码都可以显示吗? 策略2的好例子可以在这里找到:cloudonaut.io/…【参考方案2】:我们现在可以使用 SQS 消息来触发 AWS Lambda 函数。此外,不再需要运行消息轮询服务或创建 SQS 到 SNS 的映射。
更多细节: https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
【讨论】:
伟大的发现。这里给出一个教程:docs.aws.amazon.com/AWSSimpleQueueService/latest/…【参考方案3】:我也遇到过类似的情况(现在部署了一个可行的解决方案)。我已经通过以下方式解决了它:
即向 SNS 发布事件;然后将其扇出到 Lambda 和 SQS。
注意:这不适用于必须按特定顺序处理的事件。
存在一些问题(有可能的解决方案),例如:
竞速条件:lambda 可能会在消息存入队列之前被调用 SQS 队列的分布式特性可能导致即使有消息 note1 也不会返回任何消息。这两种情况的解决方案都是对 SQS 队列进行长轮询;但这确实使您的 lambda 账单更加昂贵。
注释1
短轮询是在 ReceiveMessage 调用中对一组加权随机机器进行采样的默认行为。这意味着只返回采样机器上的消息。如果队列中的消息数量很少(少于 1000 条),那么您收到的消息可能会少于每次 ReceiveMessage 调用请求的消息。如果队列中的消息数量非常少,您可能不会在特定的 ReceiveMessage 响应中收到任何消息;在这种情况下,您应该重复请求。 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
【讨论】:
【参考方案4】:AWS SQS 是亚马逊最古老的产品之一,直到 2018 年 6 月才支持轮询(长和短)。正如this answer 中提到的,AWS SQS 现在支持在新消息到达时触发 lambda 函数的功能质量保证。 this document 提供了完整的教程。
我曾经使用不同的机制来解决这个问题,下面给出了一些你可以使用的方法。
您可以在 Lambda 中开发一个简单的轮询应用程序,并使用 AWS CloudWatch 每 5 分钟左右调用一次。您可以通过使用 CloudWatch 事件在较短的停机时间内调用 lambda 来实现近乎实时的操作。为此,请使用 this tutorial 或 this tutorial。 (这可能会在 Lambda 上花费更多)
如果你不需要持久化消息也不需要保证传递的顺序,你可以认为 SQS 是多余的。您可以使用 AWS SNS(简单通知服务)直接调用 lambda 函数并执行所需的任何处理。为此目的使用this tutorial。这将实时发生。但主要缺点是在给定时间每个区域可以启动的 lambda 数量。请阅读this 并在遵循此方法之前了解限制。尽管如此,AWS SNS 保证交付顺序。 SNS 也可以直接调用 HTTP 端点并将消息存储在您的数据库中。
【讨论】:
【参考方案5】:我们有一些类似的要求,因此我们最终构建了一个库并将其开源以帮助 SQS 到 Lambda 异步。我不确定这是否满足您的特定要求,但认为它可能值得一看:https://read.iopipe.com/sqs-lambda-teaming-up-92c4096be49c
【讨论】:
您能否总结一下您提供的链接中的信息?虽然您提供了链接很好,但如果它死了,那么从那时起答案就变得毫无用处了。谢谢! 好的,基本上我们开源了一个项目,可以在 Github 上找到:github.com/iopipe/sqs-to-lambda-async 这允许您通过 SQS 异步触发 Lambda 函数。以上是关于从 AWS Lambda 读取 SQS 队列的主要内容,如果未能解决你的问题,请参考以下文章
当消息存在于 SQS 队列中时触发 AWS 中的 Lambda 函数