Java HornetQ 客户端中的线程处理

Posted

技术标签:

【中文标题】Java HornetQ 客户端中的线程处理【英文标题】:Thread handling in Java HornetQ client 【发布时间】:2016-10-05 17:42:43 【问题描述】:

我试图了解如何处理连接到 HornetQ 的 Java 客户端中的线程。我没有收到具体的错误,但我不明白我应该如何处理线程(关于 HornetQ 客户端,特别是MessageHandler.onMessage()——线程对我来说一般没问题)。

如果这是相关的:我正在使用'org.hornetq:hornetq-server:2.4.7.Final' 来运行嵌入到我的应用程序中的服务器。我不打算这样做。在我的情况下,从操作的角度来看,这比运行独立的服务器进程更方便。

到目前为止我做了什么:

    创建嵌入式服务器:new EmbeddedHornetQ(), .setConfiguration()

    创建服务器定位器:HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

    创建会话工厂:serverLocator.createSessionFactory()

现在对我来说很明显我可以使用hornetqClientSessionFactory.createSession() 创建一个会话,为该会话创建一个生产者和消费者,并使用.send().receive() 在单个线程中处理消息。

但我也发现了consumer.setMessageHandler(),这告诉我我根本不了解客户端中的线程。我尝试使用它,但随后消费者在两个线程中调用messageHandler.onMessage(),这两个线程与创建会话的线程不同。这似乎与我查看代码的印象相符——HornetQ 客户端使用线程池来发送消息。

这让我很困惑。 javadocs 说会话是"single-thread object",并且代码同意——那里没有明显的同步。但是onMessage() 在多个线程中被调用,message.acknowledge() 也在多个线程中被调用,并且那个只是委托给会话。 这应该如何工作? MessageHandler 不从多个线程访问会话的场景会如何?

更进一步,我将如何从 onMessage() 中发送后续消息? 我将 HornetQ 用于持久的“待办事项”工作队列,因此发送后续消息是对我来说是一个典型的用例。但同样,在onMessage() 中,我在错误的线程中访问会话。

请注意,我可以远离 MessageHandler 并以允许我控制线程的方式使用 send() / receive()。但我确信我根本不了解整个情况,而与多线程结合起来只是自找麻烦。

【问题讨论】:

那些后续消息:您是否预先知道要发送它们的目的地?后续目的地是静态的还是动态的? 地址和队列与原始消息相同——工作单元的单个队列。不确定您所说的静态/动态是什么意思。它们是动态的,因为必须处理原始工作单元才能知道必须生成哪些后续消息。 我的意思是,如果你可以将生产者传递给消息处理程序(而不是 Session),为什么你会担心 Session 的线程安全。 有两个原因:首先,生产者 (ClientProducerImpl) 似乎在没有任何额外同步的情况下委托给会话的方法。例如,send() 到 doSend() 到 session.startCall()。其次,即使没有产生任何后续消息,我仍然必须确认每条消息,这具有相同的问题:它在没有任何额外同步的情况下委托给会话。 onMessage() 用于多个并发消费者,每个消费者都有自己的会话。例如,如果您喜欢源代码,请查看org.springframework.jms.listener.DefaultMessageListenerContainer。要在同一队列中重新调度消息,您可以使用属性 _HQ_SCHED_DELIVERY 创建新消息。同样,我建议使用org.springframework.jms.core.JmsTemplate 发送消息,它关心连接、会话等。 【参考方案1】:

我可以回答你的部分问题,但我希望你现在已经解决了这个问题。

形成HornetQ documentation on ClientConsumer(强调我的):

ClientConsumer 接收来自 HornetQ 队列的消息。 消息可以通过使用 receive() 方法同步消费,该方法将阻塞直到接收到消息(或超时到期)或通过设置 MessageHandler 异步消费。这两种消费是专有的 : 如果调用了它的 receive() 方法,带有 MessageHandler 集的 ClientConsumer 将抛出 HornetQException。

所以你有两种处理消息接收的选择:

    自己同步接收 不要向 HornetQ 提供 MessageListener 在您自己的用户线程中,随意调用.receive().receive(long itmeout) 检索调用返回的(可选)ClientMessage 对象 专业人士:使用您希望在消费者中携带的Session,您可以根据需要转发消息 缺点:所有这些消息处理都是按顺序进行的 将线程同步委托给 HornetQ 不要在消费者上调用.receive() 提供onMessage(ClientMessage)的MessageListener实现 专业:所有消息处理都将是并发的、快速的、无忧的 缺点:我认为无法从此对象中检索到 Session,因为它没有被接口公开。 未经测试的解决方法: 在我的应用程序(与您的一样在虚拟机中)中,我将底层的线程安全QueueConnection 作为应用程序范围内可用的静态变量公开。从您的 MessageListener 中,您可以在其上调用 QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 以获取新的 Session 并从中发送您的消息...这可能是 alright 就我而言 see 因为 Session 对象并没有真正重新创建.我这样做也是因为 Sessions 有变得陈旧的趋势。

我认为您不应该非常希望控制您的消息执行线程,尤其是仅转发消息的瞬态线程。如您所料,HornetQ 具有内置的线程池,并且可以有效地重用这些对象。

您也知道您不需要在单个线程中访问对象(如队列),因此无论是通过多个线程还是通过多个会话访问队列都没有关系。你只需要确保一个 Session 只能被一个线程访问,这是 MessageListener 的设计。

【讨论】:

以上是关于Java HornetQ 客户端中的线程处理的主要内容,如果未能解决你的问题,请参考以下文章

java中的NIO

java中的NIO

Java并发包中的几种ExecutorService

JMS 队列上多线程消息处理的最佳实践

简单聊聊java中的BIONIOAIO

如何解决空指针异常和 ThreadException 中的错误android