Java HornetQ 客户端中的线程处理
Posted
技术标签:
【中文标题】Java HornetQ 客户端中的线程处理【英文标题】:Thread handling in Java HornetQ client 【发布时间】:2016-10-05 17:42:43 【问题描述】:我试图了解如何处理连接到 HornetQ 的 Java 客户端中的线程。我没有收到具体的错误,但我不明白我应该如何处理线程(关于 HornetQ 客户端,特别是MessageHandler.onMessage()
——线程对我来说一般没问题)。
如果这是相关的:我正在使用'org.hornetq:hornetq-server:2.4.7.Final'
来运行嵌入到我的应用程序中的服务器。我不打算这样做。在我的情况下,从操作的角度来看,这比运行独立的服务器进程更方便。
到目前为止我做了什么:
创建嵌入式服务器:new EmbeddedHornetQ(),
.setConfiguration()
创建服务器定位器:HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))
serverLocator.createSessionFactory()
现在对我来说很明显我可以使用hornetqClientSessionFactory.createSession()
创建一个会话,为该会话创建一个生产者和消费者,并使用.send()
和.receive()
在单个线程中处理消息。
但我也发现了consumer.setMessageHandler()
,这告诉我我根本不了解客户端中的线程。我尝试使用它,但随后消费者在两个线程中调用messageHandler.onMessage()
,这两个线程与创建会话的线程不同。这似乎与我查看代码的印象相符——HornetQ 客户端使用线程池来发送消息。
这让我很困惑。 javadocs 说会话是"single-thread object"
,并且代码同意——那里没有明显的同步。但是onMessage()
在多个线程中被调用,message.acknowledge()
也在多个线程中被调用,并且那个只是委托给会话。
这应该如何工作? MessageHandler 不从多个线程访问会话的场景会如何?
更进一步,我将如何从 onMessage() 中发送后续消息? 我将 HornetQ 用于持久的“待办事项”工作队列,因此发送后续消息是对我来说是一个典型的用例。但同样,在onMessage()
中,我在错误的线程中访问会话。
请注意,我可以远离 MessageHandler 并以允许我控制线程的方式使用 send() / receive()
。但我确信我根本不了解整个情况,而与多线程结合起来只是自找麻烦。
【问题讨论】:
那些后续消息:您是否预先知道要发送它们的目的地?后续目的地是静态的还是动态的? 地址和队列与原始消息相同——工作单元的单个队列。不确定您所说的静态/动态是什么意思。它们是动态的,因为必须处理原始工作单元才能知道必须生成哪些后续消息。 我的意思是,如果你可以将生产者传递给消息处理程序(而不是 Session),为什么你会担心 Session 的线程安全。 有两个原因:首先,生产者 (ClientProducerImpl) 似乎在没有任何额外同步的情况下委托给会话的方法。例如,send() 到 doSend() 到 session.startCall()。其次,即使没有产生任何后续消息,我仍然必须确认每条消息,这具有相同的问题:它在没有任何额外同步的情况下委托给会话。 onMessage() 用于多个并发消费者,每个消费者都有自己的会话。例如,如果您喜欢源代码,请查看org.springframework.jms.listener.DefaultMessageListenerContainer
。要在同一队列中重新调度消息,您可以使用属性 _HQ_SCHED_DELIVERY 创建新消息。同样,我建议使用org.springframework.jms.core.JmsTemplate
发送消息,它关心连接、会话等。
【参考方案1】:
我可以回答你的部分问题,但我希望你现在已经解决了这个问题。
形成HornetQ documentation on ClientConsumer(强调我的):
ClientConsumer 接收来自 HornetQ 队列的消息。 消息可以通过使用 receive() 方法同步消费,该方法将阻塞直到接收到消息(或超时到期)或通过设置 MessageHandler 异步消费。这两种消费是专有的 : 如果调用了它的 receive() 方法,带有 MessageHandler 集的 ClientConsumer 将抛出 HornetQException。
所以你有两种处理消息接收的选择:
-
自己同步接收
不要不向 HornetQ 提供 MessageListener
在您自己的用户线程中,随意调用
.receive()
或.receive(long itmeout)
检索调用返回的(可选)ClientMessage
对象
专业人士:使用您希望在消费者中携带的Session
,您可以根据需要转发消息
缺点:所有这些消息处理都是按顺序进行的
将线程同步委托给 HornetQ
不要不在消费者上调用.receive()
提供onMessage(ClientMessage)
的MessageListener实现
专业:所有消息处理都将是并发的、快速的、无忧的
缺点:我认为无法从此对象中检索到 Session
,因为它没有被接口公开。
未经测试的解决方法: 在我的应用程序(与您的一样在虚拟机中)中,我将底层的线程安全QueueConnection 作为应用程序范围内可用的静态变量公开。从您的 MessageListener 中,您可以在其上调用 QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
以获取新的 Session 并从中发送您的消息...这可能是 alright 就我而言 see 因为 Session 对象并没有真正重新创建.我这样做也是因为 Sessions 有变得陈旧的趋势。
我认为您不应该非常希望控制您的消息执行线程,尤其是仅转发消息的瞬态线程。如您所料,HornetQ 具有内置的线程池,并且可以有效地重用这些对象。
您也知道您不需要在单个线程中访问对象(如队列),因此无论是通过多个线程还是通过多个会话访问队列都没有关系。你只需要确保一个 Session 只能被一个线程访问,这是 MessageListener 的设计。
【讨论】:
以上是关于Java HornetQ 客户端中的线程处理的主要内容,如果未能解决你的问题,请参考以下文章