使用队列时跳过 SQS 消息

Posted

技术标签:

【中文标题】使用队列时跳过 SQS 消息【英文标题】:Skip SQS messages while consuming a queue 【发布时间】:2021-12-29 08:52:54 【问题描述】:

我有一个消息生产者,将"user_id": 1, "message": "Ciao" 之类的消息发送到 SQS 队列。

我有三个 Websocket API 实例,它们的名称是 A、B 和 C。

假设有五个用户连接到该 API,他们的 ID 分别为 1、2、3、4 和 5。

每个用户都连接到 API,我的平衡器(sticky ofc)让客户端以这种方式连接到 API:

A: 1, 3
B: 5
C: 2, 4

现在,Websocket API 是上面提到的 SQS 队列的消费者。

当我将用户 1 的消息排入队列时,任何消费者都可以先将其出列,假设 C 可以。

实例 C 无法处理 user_id 1 的消息。A 确实持有与该用户的实际连接。

在我看来,这将是这样的:所有三个 API 实例都会收到任何消息。

C 将读取它,并将其留在队列中 B 将读取它,并将其留在队列中 A 将读取、处理并将其从队列中移除

我的问题是:

上述工作流程是否可以通过 SQS 实施?

是否可以从队列中读取消息并将其留在那里?或者如果当前消费者无法处理,我是否必须将其出队并重新入队?

【问题讨论】:

【参考方案1】:

这是发布者-订阅者架构的一个用例。要在 AWS 基础设施上实现这一点,首先假设 SQS 作为订阅者。在使用订阅的消息时不应进行消息过滤。这是 Publisher 的一部分。 AWS 提供 SNS,可以充当发布者。

假设以下设置。使用 SNS 作为发布者。在此,您可以进行消息过滤(https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)。这允许 SNS 有选择地发送消息。

在您的用例中,[Mesage for user 1/3] -> SNS -> A 的 SQS -> Websocket A only gets messages for user 1/3[Mesage for user 5] -> SNS -> B 的 SQS -> Websocket A only gets messages for user 5

请注意,两种情况下的 SNS 主题保持不变。但是订阅者过滤了消息,您将拥有多个 SQS 队列。

【讨论】:

就我而言,每个用户不能有一个队列。我不能只使用带有过滤功能的 SNS,完全跳过 SQS 吗?【参考方案2】:

仅使用 SQS 是可行的(但需要在消费者中自定义代码)。

示例: SQS 根据消费者发送的确认删除消息。如果消费者发送确认,SQS 将删除该条目。因此,一种设计方法是:

a) 消费消息 b)检查它是否是所需的消息 c) 如果是,则向 SQS 发送一个 ACK​​,SQS 将删除该条目 d) 如果消息不是想要的消息,不要发送和确认,SQS 会保留记录

其他方法:

您可以将 SQS 与 SNS 结合来实现。

您可以为 3 个 Websocket API 设置 3 个单独的队列,例如SQS_A, SQS_B, SQS_C。您可以有一个 SNS 主题,例如 MyTopic。然后,您可以根据订阅过滤使队列订阅SNS主题。

例子,

    如果事件过滤器中包含user_id 1 and 3SQS_A 将订阅 SNS 主题。 如果事件过滤器中包含user_id 5SQS_B 将订阅 SNS 主题。 如果事件过滤器中包含user_id 2SQS_C 将订阅 SNS 主题。

SQS_A 的事件过滤如下所示:


  "EventType": [
    1,
    3"
  ]

所以...现在发布者在发布消息时,会将消息连同事件类型一起发布到 SNS 主题。

例子:

sns.PublishInput
        Message:  msg,
        TopicArn: topic,
        MessageAttributes: 
            EventType: 
                DataType: "String",
                StringValue: <listOfUserId>
            
        

现在,SNS 只会将此消息发送到SQS_A,而不是其余的 SQS。因此,只有 Websocket A 会消费该消息。

更多:https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html

注意:这不是一个健壮、易于扩展、可扩展的设计

【讨论】:

但是......这不是真的 SQS 不可行!我读到:Amazon SQS 在为您检索消息后不会自动删除它,以防您没有成功接收到消息(例如,如果消费者失败或您失去连接)。要删除消息,您必须发送一个单独的请求,确认您已成功接收并处理了该消息。请注意,您必须先收到一条消息,然后才能将其删除。 是的......你是对的。但是以您的方式,您必须在您的消费者中具有自定义逻辑,即 a) 使用消息 b) 检查它是否是我想要的消息 c) 如果是,则向 SQS 发送 ACK 并且 SQS 将删除消息 d) 如果它不,不发送ACK,SQS不会删除它。 --- 可以说,它也可以做到。但是正如我所说,您将使用自定义逻辑(这也不一定意味着坏事)。所以......是的,你也可以这样设计...... 更新了我的答案

以上是关于使用队列时跳过 SQS 消息的主要内容,如果未能解决你的问题,请参考以下文章

使用SQS接收SES消息

在 Azure 机器人服务中发送消息时跳过机器人

在 SQS 队列中使用多个消费者

如何让我的不和谐机器人等到当前歌曲结束而不是在添加新歌曲时跳过队列中的当前歌曲

如何使用 spring 集成来调整 sqs 队列的消耗

当消息存在于 SQS 队列中时触发 AWS 中的 Lambda 函数