Redisson,工作队列/ dequeu。系统/ pod关闭时不完整消息处理的消息/元素处理策略

Posted

技术标签:

【中文标题】Redisson,工作队列/ dequeu。系统/ pod关闭时不完整消息处理的消息/元素处理策略【英文标题】:Redisson, working queue / dequeu. Strategies on processing message / element on incomplete message handling on system / pod shutdown 【发布时间】:2020-12-13 22:20:25 【问题描述】:

所以我 takeHead 然后我开始处理。如果要对其进行扩展并为每个 pod/实例使用 8 线程执行器服务,那么在系统突然关闭时,“消息”将被视为已被占用/消耗。

我猜一个策略是使用一个单独的分布式 Redisson list / map per queue / dequeue,表示这些元素/消息正在被处理。我想这会奏效,但是跟踪该列表也不是一件容易的事。

有没有办法获取头,以某种方式使用活动对象,然后将队列中的元素标记为受管理,然后在完成 O(1) 时以某种方式将其删除?

【问题讨论】:

【参考方案1】:

所以我采取Head 然后我开始处理。如果要对其进行扩展并为每个 pod/实例使用 8 线程执行器服务,那么在系统突然关闭时,“消息”将被视为已被占用/消耗。

您需要使用 RStream 对象,而不是它允许跟踪每个消费者未使用的元素。

【讨论】:

啊,那是你在 kafka 问题上提到的新 Redis 5 东西。我需要进一步研究一下。然而,我的队列由包含系统 ID 的映射支持,以列出“处理消息”方法。几乎是固体。但会看看 RStream 方法。 RStream 添加为什么需要一个键和一个值。队列方法只需要添加。需要什么钥匙?也许应该提供一个基于 RStream 的队列?文档目前还不是很好,因为我认为它是新编写的,所以很难弄清楚。 所以 ack 将添加的内容标记为已处理,是否也将其删除。我们readgroup的时候,是多个消费者拿到同一个消息要处理,还是只有一个消费者?如何确保只有一个人处理一条消息? StreamMessageId.NEVER_DELIVERED 将确保它只得到一个? stream.readGroup("specialgroup", "consumer1", 1) 将确保我一次只收到一条消息?如果我得到太多,并且在消息处理成功之前发生核,它们将被标记为已传递但未处理,但没有 StreamMessagId.REPRESENTING_THAT_CASE。这些最终会在没有确认的超时时出现在 NEVER_DELIVERED 堆栈上吗?超时时间是多少?

以上是关于Redisson,工作队列/ dequeu。系统/ pod关闭时不完整消息处理的消息/元素处理策略的主要内容,如果未能解决你的问题,请参考以下文章

STL之双向队列(dequeue)

基于Redisson实现延迟队列

redisson 应用(一)

redisson 应用(二)

综合中间件Redisson实战

activemq 控制面板里的 Number Of Pending Messages Messages EnqueuedMessages Dequeued含义