使用 JMS/ActiveMQ 并发同步请求-回复 - 模式/库?

Posted

技术标签:

【中文标题】使用 JMS/ActiveMQ 并发同步请求-回复 - 模式/库?【英文标题】:Concurrent Synchronous Request-Reply with JMS/ActiveMQ - Patterns/Libraries? 【发布时间】:2012-07-30 02:34:12 【问题描述】:

我有一个网络应用程序,当用户提交请求时,我们将 JMS 消息发送到远程服务,然后等待回复。 (还有异步请求,我们为消息重放等设置了各种细节,所以我们更愿意坚持使用 JMS 而不是 HTTP)

在How should I implement request response with JMS? 中,ActiveMQ 似乎不鼓励每个请求使用临时队列或在 JMSCorrelationID 上使用选择器的临时消费者的想法,因为启动它们会产生开销。

但是,如果我对回复使用池消费者,我如何从回复消费者分派回原始请求线程?

我当然可以编写自己的线程安全回调注册/调度,但我讨厌编写我怀疑已经由比我更了解的人编写的代码。

那个 ActiveMQ 页面推荐 Lingo,它自 2006 年以来就没有更新过,还有 Camel Spring Remoting,我的团队因为它的许多陷阱错误而被地狱禁止。

是否有更好的解决方案,以库的形式实现此模式,或者以不同的模式来模拟 JMS 上的同步请求-回复?


相关的 SO 问题:

Is it a good practice to use JMS Temporary Queue for synchronous use?,这表明使用 JMSCorrelationID 上的选择器启动消费者实际上是低开销的,这与 ActiveMQ 文档所说的相矛盾。谁是对的?

【问题讨论】:

+1 表示研究得很好的问题。 【参考方案1】:

我一直使用 CorrelationID 进行请求/响应,从未遇到任何性能问题。我无法想象为什么这会是一个性能问题,对于任何消息传递系统来说,它都应该是超快的,并且是一个非常重要的特性,并且实现得很好。

http://www.eaipatterns.com/RequestReplyJmsExample.html有两个主流解决方案,使用replyToQueue或correlationID。

【讨论】:

问题不在于使用 CorrelationID(这是基本的),而是启动临时目的地或消费者的开销。 为什么不能对多个消费者使用单个响应队列? @ams 使用单个响应队列和过滤器在关联 ID 上的唯一缺点是,任何流氓消费者都可能破坏整个架构。【参考方案2】:

在过去的项目中,我们遇到过类似的情况,同步 WS 请求是使用一对 Async req/res JMS 消息处理的。我们当时使用的是 Jboss JMS impl 和临时的 destinations,开销很大。

我们最终编写了一个线程安全的调度程序,让 WS 等待 JMS 响应到来。我们使用 CorrelationID 将响应映射回请求。

这个解决方案都是自产的,但我遇到了一个很好的阻塞映射实现,它解决了将响应与请求匹配的问题。

BlockingMap

如果您的解决方案是集群的,您需要注意将响应消息分派到集群中的正确节点。我不知道 ActiveMQ,但我记得 JBoss 消息传递在其可集群目标的引擎盖下存在一些故障。

【讨论】:

在这种情况下,集群不应该成为 ActiveMQ 的问题,因为默认情况下所有队列都是集群的并且在集群中是“全局的”。无需手动将消息转发到正确的节点。 @Petter 我认为如果 requesting web service 是集群的,而不是 MQ,就会出现问题。我猜你必须有一个全局响应队列,并且所有节点都使用 JMSCorrelationID 上的选择器?【参考方案3】:

我仍然会考虑使用 Camel 并让它处理线程,也许没有弹簧远程处理,但只是原始的 ProducerTemplates。

Camel 有一些关于该主题的不错的文档,并且与 ActiveMQ 配合得非常好。 http://camel.apache.org/jms#JMS-RequestreplyoverJMS

对于您关于启动基于选择器的消费者和开销的问题,ActiveMQ 文档实际上说明的是它需要往返于 ActiveMQ 代理,该代理可能位于地球的另一端或在高延迟网络上.这种情况下的开销是到 AMQ 代理的 TCP/IP 往返时间。我会认为这是一种选择。已成功使用过多次。

【讨论】:

【参考方案4】:

一位同事提出了一个潜在的解决方案——每个 webapp 线程一个响应队列/消费者,我们可以将返回地址设置为该特定线程拥有的响应队列。由于这些线程通常是长期存在的(并且被重新用于后续的 Web 请求),因此我们只需要在线程由池生成时承受开销。

也就是说,整个练习让我重新思考 JMS 与 HTTP...:)

【讨论】:

嗨@joshwa。我面临同样的问题,但我不明白你是如何解决这个问题的?你能启发我吗? @M.Heydari 我们最终使用了具有共享持久状态的异步回调模型。我仍然喜欢每个线程一个响应队列的想法,将状态保持在 ThreadLocal 中,但我们最终选择了不同的路线。 感谢重播。 @joshwa 嗨 Joshwa,我对使用异步回调模型让 JMS 以同步方式工作很感兴趣,您能否分享更多关于这些内容的 cmets?谢谢。【参考方案5】:

这是一个旧的,但我已经登陆这里寻找其他东西并且实际上确实有一些见解(希望对某人有所帮助)。

我们已经实现了非常相似的用例,使用 Hazelcast 作为我们的底盘 集群的节点间通信。本质是 2 个数据集:1 个用于响应的分布式地图,1 个响应等待者的“本地”列表(在集群中的每个节点上)。

每个请求(从 Jetty 接收它自己的线程)都会在本地等待者的映射中创建一个条目;该条目显然具有相关 UID 和将用作信号量的对象 然后请求被分派到远程(REST/JMS)并且原始线程开始等待信号量; UID 必须是请求的一部分 remote 返回响应并使用相关的 UID 将其写入响应映射 正在监听响应映射;如果在本地等待者的映射中找到新响应的 UID,则通知其信号量,释放原始请求的线程,从响应映射中提取响应并将其返回给客户端

这是一个笼统的描述,我可以通过一些优化来更新答案,以防有任何兴趣。

【讨论】:

以上是关于使用 JMS/ActiveMQ 并发同步请求-回复 - 模式/库?的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出JMS--ActiveMQ简单的HelloWorld实例

深入浅出JMS--ActiveMQ简单的HelloWorld实例

ActiveMQ从入门到精通

从入门到精通的ActiveMQ

Spring JMS ActiveMQ 跟踪作业状态

使用 JMS (ActiveMQ) 进行单元测试