使用队列时跳过 SQS 消息
Posted
技术标签:
【中文标题】使用队列时跳过 SQS 消息【英文标题】:Skip SQS messages while consuming a queue 【发布时间】:2021-12-29 08:52:54 【问题描述】:我有一个消息生产者,将"user_id": 1, "message": "Ciao"
之类的消息发送到 SQS 队列。
我有三个 Websocket API 实例,它们的名称是 A、B 和 C。
假设有五个用户连接到该 API,他们的 ID 分别为 1、2、3、4 和 5。
每个用户都连接到 API,我的平衡器(sticky ofc)让客户端以这种方式连接到 API:
A: 1, 3
B: 5
C: 2, 4
现在,Websocket API 是上面提到的 SQS 队列的消费者。
当我将用户 1 的消息排入队列时,任何消费者都可以先将其出列,假设 C 可以。
实例 C
无法处理 user_id 1 的消息。A
确实持有与该用户的实际连接。
在我看来,这将是这样的:所有三个 API 实例都会收到任何消息。
C 将读取它,并将其留在队列中 B 将读取它,并将其留在队列中 A 将读取、处理并将其从队列中移除我的问题是:
上述工作流程是否可以通过 SQS 实施?
是否可以从队列中读取消息并将其留在那里?或者如果当前消费者无法处理,我是否必须将其出队并重新入队?
【问题讨论】:
【参考方案1】:这是发布者-订阅者架构的一个用例。要在 AWS 基础设施上实现这一点,首先假设 SQS 作为订阅者。在使用订阅的消息时不应进行消息过滤。这是 Publisher 的一部分。 AWS 提供 SNS,可以充当发布者。
假设以下设置。使用 SNS 作为发布者。在此,您可以进行消息过滤(https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)。这允许 SNS 有选择地发送消息。
在您的用例中,[Mesage for user 1/3]
-> SNS -> A 的 SQS -> Websocket A only gets messages for user 1/3
[Mesage for user 5]
-> SNS -> B 的 SQS -> Websocket A only gets messages for user 5
请注意,两种情况下的 SNS 主题保持不变。但是订阅者过滤了消息,您将拥有多个 SQS 队列。
【讨论】:
就我而言,每个用户不能有一个队列。我不能只使用带有过滤功能的 SNS,完全跳过 SQS 吗?【参考方案2】:仅使用 SQS 是可行的(但需要在消费者中自定义代码)。
示例: SQS 根据消费者发送的确认删除消息。如果消费者发送确认,SQS 将删除该条目。因此,一种设计方法是:
a) 消费消息 b)检查它是否是所需的消息 c) 如果是,则向 SQS 发送一个 ACK,SQS 将删除该条目 d) 如果消息不是想要的消息,不要发送和确认,SQS 会保留记录
其他方法:
您可以将 SQS 与 SNS 结合来实现。
您可以为 3 个 Websocket API 设置 3 个单独的队列,例如SQS_A, SQS_B, SQS_C
。您可以有一个 SNS 主题,例如 MyTopic
。然后,您可以根据订阅过滤使队列订阅SNS主题。
例子,
-
如果事件过滤器中包含
user_id 1 and 3
,SQS_A
将订阅 SNS 主题。
如果事件过滤器中包含user_id 5
,SQS_B
将订阅 SNS 主题。
如果事件过滤器中包含user_id 2
,SQS_C
将订阅 SNS 主题。
SQS_A 的事件过滤如下所示:
"EventType": [
1,
3"
]
所以...现在发布者在发布消息时,会将消息连同事件类型一起发布到 SNS 主题。
例子:
sns.PublishInput
Message: msg,
TopicArn: topic,
MessageAttributes:
EventType:
DataType: "String",
StringValue: <listOfUserId>
现在,SNS 只会将此消息发送到SQS_A
,而不是其余的 SQS。因此,只有 Websocket A 会消费该消息。
更多:https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html
注意:这不是一个健壮、易于扩展、可扩展的设计
【讨论】:
但是......这不是真的 SQS 不可行!我读到:Amazon SQS 在为您检索消息后不会自动删除它,以防您没有成功接收到消息(例如,如果消费者失败或您失去连接)。要删除消息,您必须发送一个单独的请求,确认您已成功接收并处理了该消息。请注意,您必须先收到一条消息,然后才能将其删除。 是的......你是对的。但是以您的方式,您必须在您的消费者中具有自定义逻辑,即 a) 使用消息 b) 检查它是否是我想要的消息 c) 如果是,则向 SQS 发送 ACK 并且 SQS 将删除消息 d) 如果它不,不发送ACK,SQS不会删除它。 --- 可以说,它也可以做到。但是正如我所说,您将使用自定义逻辑(这也不一定意味着坏事)。所以......是的,你也可以这样设计...... 更新了我的答案以上是关于使用队列时跳过 SQS 消息的主要内容,如果未能解决你的问题,请参考以下文章