从 JMS 队列中批量获取

Posted

技术标签:

【中文标题】从 JMS 队列中批量获取【英文标题】:Batch fetching from JMS queue 【发布时间】:2021-10-16 04:10:47 【问题描述】:

我需要实现以下工作流程:

    每 N 毫秒从 JMS 队列中获取所有可用消息,但不超过 K 个项目 做一些处理 处理完成后确认所有这些

我对如何使用 JMS 正确实施步骤 1 很感兴趣。

我尝试的是:

    创建BlockingQueue 大小 K 在我的 JMS MessageListeneronMessage 方法中,将消息放入 BlockingQueue。 每 N 毫秒耗尽 BlockingQueue,处理消息并确认它们。

一目了然,但问题是,如果在批处理过程中在onMessage 中获取了一些消息,那么一旦批处理完成,它也会在BlockingQueue 中被确认,并且没有尚未处理 (reference)。因此,如果我的应用出现故障,则根本不会处理消息。

实施步骤 #1 的更好方法是什么?是 JMS API 还是一些标准方法?

【问题讨论】:

【参考方案1】:

JMS 不支持“批量获取”。必须一次获取/接收一条消息。但是,可以使用事务处理会话将多条消息的工作批处理在一起。

此外,通过使用MessageListener 实现,您将放弃对应用程序何时接收消息的控制,因为只要将消息放入队列,就会调用onMessage 方法。

我建议您完全消除MessageListener。相反,每 N 秒使用一个已处理的 javax.jms.Session 并调用 javax.jms.MessageConsumer.receive(long) 直到它返回 null(这意味着队列为空)或者您有 K 条消息。然后,您可以处理所有这些消息并在完成提交事务会话并确认所有消息后调用javax.jms.Session.commit()

请记住,您的批次越大,您重复工作的机会就越大。例如,如果您收到 K 条消息并在发生某种故障之前处理了 K - 1 条消息,从而阻止您提交事务会话,那么这些消息将被取消回队列,并可能在稍后重新传递和处理。

【讨论】:

以上是关于从 JMS 队列中批量获取的主要内容,如果未能解决你的问题,请参考以下文章

获取Weblogic中JMS队列处理的消息的历史记录

JMS 和 Spring 批处理

JMS学习

Spring JMS ActiveMQ 跟踪作业状态

从 mule 中的队列/主题中读取消息

如何找到未传递的 JMS 消息的原始目的地。