在 SQS 队列中使用多个消费者
Posted
技术标签:
【中文标题】在 SQS 队列中使用多个消费者【英文标题】:Using many consumers in SQS Queue 【发布时间】:2016-09-25 02:57:46 【问题描述】:我知道可以使用多个线程来使用 SQS 队列。我想保证每条消息都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时(例如连接速度较慢),其他线程可以使用相同的消息。
保证消息被处理一次的最佳方法是什么?
【问题讨论】:
FIFO 模式改变了“exactly once”处理的可能性:aws.amazon.com/about-aws/whats-new/2016/11/… 【参考方案1】:保证消息被处理一次的最佳方法是什么?
您要求保证 - 您不会得到保证。您可以将消息被多次处理的可能性降低到非常少,但您不会获得保证。
我将解释原因,以及减少重复的策略。
重复从何而来
-
当您将消息放入 SQS 时,SQS 实际上可能会多次收到该消息
例如:发送消息时的轻微网络中断导致了自动重试的瞬时错误 - 从消息发送者的角度来看,它失败了一次,成功发送了一次,但 SQS 收到了两条消息。
SQS can internally generate duplicates
与第一个示例类似 - 有很多计算机在后台处理消息,SQS 需要确保不会丢失任何内容 - 消息存储在多个服务器上,这会导致重复。
在大多数情况下,通过利用SQS message visibility timeout,从这些来源复制的机会已经非常小——比如小到百分之几。
如果处理重复确实不是那么糟糕(strive to make your message consumption idempotent!),我认为这已经足够好了 - 进一步减少重复的机会是复杂的并且可能很昂贵......
您的应用程序可以做些什么来进一步减少重复?
好的,我们进入兔子洞……在高层次上,您需要为您的消息分配唯一的 id,并在开始处理之前检查正在进行或已完成的 id 的原子缓存:
-
确保您的消息在插入时提供了唯一标识符
没有这个,您将无法区分重复项。
在消息的“行尾”处理重复。
如果您的消息接收者需要在机外发送消息以进行进一步处理,那么它可能是另一个重复源(原因与上述类似)
您需要在某个地方以原子方式存储和检查这些唯一 ID(并在超时后刷新它们)。有两个重要的状态:“InProgress”和“Completed”
InProgress 条目应该有一个超时,这取决于您在处理失败时需要恢复的速度。
已完成的条目应该有一个超时,具体取决于您希望重复数据删除窗口的时间长度
最简单的可能是Guava cache,但仅适用于单个处理应用程序。如果您有大量消息或分布式消费,请考虑为此作业使用数据库(使用后台进程扫描过期条目)
在处理消息之前,尝试将 messageId 存储在“InProgress”中。如果它已经存在,请停止 - 您刚刚处理了一个副本。
检查消息是否“已完成”(如果存在则停止)
您的线程现在对该 messageId 具有独占锁定 - 处理您的消息
将 messageId 标记为“已完成” - 只要此 messageId 保留在此处,您就不会处理该 messageId 的任何重复项。
但您可能买不起无限存储空间。
从“InProgress”中删除 messageId(或者让它从这里过期)
一些注意事项
请记住,如果没有所有这些,重复的机会已经很低了。根据消息重复数据删除对您来说价值多少时间和金钱,您可以随意跳过或修改任何步骤 例如,您可以省略“InProgress”,但这会导致两个线程同时处理重复消息的可能性很小(第二个线程在第一个线程“完成”之前开始) 只要您可以将 messageIds 保持在“已完成”状态,您的重复数据删除窗口就会长。由于您可能无法承受无限存储,因此请至少将其持续时间为 SQS 消息可见性超时的 2 倍;之后重复的机会减少了(在已经非常低的机会之上,但仍然不保证)。 尽管如此,仍有重复的机会 - 所有预防措施和 SQS 消息可见性超时都有助于将这种机会减少到非常小,但机会仍然存在: 您的应用程序可能会在处理完消息后,但在 messageId 为“已完成”之前崩溃/挂起/执行很长时间的 GC(可能您正在使用数据库进行此存储并且与它的连接已断开)李> 在这种情况下,“正在处理”最终会过期,另一个线程可以处理此消息(在 SQS 可见性超时也过期或因为 SQS 中有重复消息)。【讨论】:
【参考方案2】:当您收到消息时,将消息或对消息的引用存储在对消息 ID 具有唯一约束的数据库中。如果 ID 存在于表中,那么您已经收到它,并且数据库将不允许您再次插入它——因为唯一性约束。
【讨论】:
【参考方案3】:当您使用 API 等阅读消息时,AWS SQS API 不会自动“使用”消息。开发者需要自己调用删除消息。
SQS 确实有一个称为“重新驱动策略”的功能,作为“死信队列设置”的一部分。您只需将读取请求设置为 1。如果消费进程崩溃,则对同一消息的后续读取会将消息放入死信队列。
SQS 队列可见性超时最长可设置为 12 小时。除非您有特殊需要,否则您需要实现将消息处理程序存储在数据库中以供检查的过程。
【讨论】:
【参考方案4】:您可以对消息和批处理使用 setVisibilityTimeout(),以延长可见时间,直到线程完成对消息的处理。
这可以通过使用 scheduleExecutorService 来完成,并在初始可见时间的一半之后安排一个可运行事件。下面的代码 sn-p 每隔一半的 visibilityTime 创建和执行 VisibilityTimeExtender,周期为可见时间的一半。 (保证消息被处理的时间,以visibilityTime/2延长)
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS);
VisibilityTimeExtender 必须实现 Runnable,并且是您更新新可见性时间的地方。
当线程处理完消息后,你可以将它从队列中删除,并调用futureEvent.cancel(true)来停止调度的事件。
【讨论】:
以上是关于在 SQS 队列中使用多个消费者的主要内容,如果未能解决你的问题,请参考以下文章
CloudWatch SQS 指标问题上的 AWS Autoscaling