即使activemq中的队列不为空,JMS实现中的receiveNoWait也会返回null
Posted
技术标签:
【中文标题】即使activemq中的队列不为空,JMS实现中的receiveNoWait也会返回null【英文标题】:receiveNoWait in JMS implementation returning null even when queue not empty in activemq 【发布时间】:2016-05-27 03:23:38 【问题描述】:我正在尝试在我的项目中实现 JMS。我使用活动 mq 作为提供者,并使用持久队列。 以下是从活动 mq 中检索元素的代码
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ObjectMessage obj = (ObjectMessage) consumer.receiveNoWait();
此代码有时会返回数据,有时会返回 null,即使我可以在活动的 mq 管理控制台中看到未决消息的数量为非零。 我读了一堆文章,很少有人提到 JMS api 并不要求你每次都获取元素,你必须相应地编码。 由于在我的场景中,我依赖于队列,一旦队列返回null,我就会终止进程,所以我按以下方式修改了代码
我没有调用receiveNoWait,而是在通过队列浏览器检查队列中是否存在元素后开始使用receive。以下是修改后的代码
public static <T> T retrieveObjectFromQueue(Queue queue, Class<T> clazz)
synchronized (queue)
if(!queueHasMoreElements(queue))
return null;
Connection conn = null;
Session session = null;
try
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ObjectMessage obj = (ObjectMessage) consumer.receive();
return clazz.cast(obj.getObject());
catch(Exception e)
throw new RuntimeException(e);
finally
closeSessionAndConnection(session, conn);
public static boolean queueHasMoreElements(Queue queue)
Connection conn = null;
Session session = null;
try
conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
return enumeration.hasMoreElements();
catch(Exception e)
throw new RuntimeException(e);
finally
closeSessionAndConnection(session, conn);
现在我的代码在处理了大约 20-30 个元素后卡住了,我再次可以在管理控制台中看到待处理的元素。 当我尝试使用调试模式时,我意识到,在检索 20-30 个元素后,我的代码卡在 consumer.receive() 上,如果队列为空,这是预期的,但是当我检查我的管理控制台,它显示了队列中的很多元素。
我使用 jdbc(mysql) 作为 activemq 的持久存储。我使用的配置xml在activemq配置示例中给出(activemq/examples/conf/activemq-jdbc-performance.xml)
我使用 tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 作为 activemq url。
请让我知道我做错了什么。我正在使用 java8 和 apache activeMq 5.13.1
谢谢
【问题讨论】:
【参考方案1】:JMS 规范也没有强制 QueueBrowser 将返回 Queue 中的所有消息,您可能只会得到它启动时的快照,这取决于很多因素。
您试图在 JMS 上强加它不能保证的语义。您可以尝试将预取设置为零,这将导致客户端轮询代理以获取消息并等到代理告知存在或不存在消息。如果您在轮询时消息尚未进入队列,您可能仍然一无所获,这只是您需要处理的事情。
您还可以使用定时接收方法并设置一个您愿意等待的超时,然后再返回和终止您的应用程序。
【讨论】:
嗨蒂姆,我编辑了我的问题,让你有更好的看法。我在调用 receive() 函数时遇到了困难,如果有一个元素,我相信接收可以保证返回一个元素。就我而言,我使用队列浏览器仅用于检查队列是否为非空,然后调用接收。即使队列不为空,接收也不返回 浏览器可能不返回消息的原因有很多,一个是浏览器拍摄快照,因此不会显示新消息。还有一个 maxBrowsePageSize 会影响它可能返回的内容。 我将删除队列浏览器部分,但正如我之前提到的,我的代码卡在 MessageConsumer 的 receive 函数上。我相信,即使我删除队列浏览器,问题也不会得到解决。由于接收不返回元素,即使队列有更多元素以上是关于即使activemq中的队列不为空,JMS实现中的receiveNoWait也会返回null的主要内容,如果未能解决你的问题,请参考以下文章