ActiveMQ CMS:在创建消费者和设置监听器之间会丢失消息吗?
Posted
技术标签:
【中文标题】ActiveMQ CMS:在创建消费者和设置监听器之间会丢失消息吗?【英文标题】:ActiveMQ CMS: Can messages be lost between creating a consumer and setting a listener? 【发布时间】:2020-10-17 21:52:52 【问题描述】:使用侦听器设置 CMS 消费者涉及两个单独的调用:首先,获取消费者:
cms::MessageConsumer* cms::Session::createConsumer( const cms::Destination* );
然后,在消费者上设置一个监听器:
void cms::MessageConsumer::setMessageListener( cms::MessageListener* );
如果实现在侦听器激活之前订阅目的地(并从代理/路由器接收消息),消息会丢失吗?还是这些消息在内部排队并在激活时传递给侦听器?
为什么没有 API 调用来创建使用侦听器作为构造参数的消费者? (是因为 JMS 规范没有吗?)
(附录:这可能是 API 本身的一个缺陷。更合乎逻辑的顺序是从会话中实例化消费者,并在 API 中有一个cms::Consumer::subscribe( cms::Destination*, cms::MessageListener* )
方法。)
【问题讨论】:
activemq-cpp 发行版中的SimpleAsyncConsumer
示例是一部彻头彻尾的恐怖秀,是一部糟糕的 C++ 经典。它看起来像是 Java 程序员尝试编写 C++ 的工作。
如果 ActiveMQ CMS 不能满足您的需求,总会有使用 AMQP 的 C++ 客户端。此外,C++ STOMP 客户端。 ActiveMQ 支持这两种协议。
关于 Qpid Proton C++ 客户端的说法越少越好。 (代码库看起来更合理,但是基于一些愚蠢的“proactor”设计模式的 API 来自 Mars;而且 AMQP 1.0 似乎不太适合 pub/sub)不过,我仍然从 CMS 获得有用的里程。也许粗糙的边缘并非不可克服。
如果需要,我也不反对修改源代码。有一些非常明显的嚎叫,例如开发人员忘记 - 或者不知道 - C++ 支持协变返回类型,所以你会发现像 ActiveMQConnectionFactory::createConnection()
这样的 Java 主义返回 cms::Connection*
,无缘无故失去了需要 dynamic_cast
才能在客户端恢复的子类类型代码...叹息
它是开源的。我相信捐款会受到欢迎。另外,我相信正在开发一个新的 Qpid C++ 客户端。请参阅qpid.2158936.n2.nabble.com/… 进行讨论。
【参考方案1】:
我认为 API 不一定有缺陷。显然它可以设计成不同的方式,但我相信您所谓的问题的解决方案来自Connection
对象上的start
方法(通过Startable
继承)。 Connection
的文档指出:
CMS 客户端通常会创建一个连接、一个或多个会话以及许多消息生产者和消费者。创建连接时,它处于停止模式。这意味着没有消息被传递。
在设置完成之前(即,直到所有消息使用者都创建完毕),通常会保持连接处于停止模式。此时,客户端调用连接的 start 方法,消息开始到达连接的消费者。此设置约定可最大限度地减少客户端仍在自行设置过程中异步消息传递可能导致的任何客户端混淆。
可以立即开始连接,之后可以进行设置。执行此操作的客户端必须准备好在它们仍在设置过程中时处理异步消息传递。
这与 JMS 遵循的模式相同。
在任何情况下,无论您何时调用start()
,我都不认为有任何消息丢失的风险。如果消费者使用自动确认模式,则只有在通过其中一种接收方法同步传递或通过侦听器的onMessage
异步传递消息后,才应自动确认消息。否则将是我估计的错误。在过去的 10 年里,我一直在使用 JMS 进行各种实现,我从未见过任何与此相关的消息丢失的情况。
如果您想在调用 start()
之后添加消费者,您当然可以先调用 stop()
,但我认为简单地即时添加它们没有任何问题。
【讨论】:
嗯,是的,这适用于在start()
调用之前设置的订阅。如果我想稍后在start()
调用之后添加另一个订阅怎么办?我应该打电话给stop()
,设置新的消费者,然后再打电话给start()
吗?
我仍然认为这是 API 的一个缺陷。正常的过程(在大多数消息传递系统中)是在注册订阅之前设置一个监听器/接收器,正是为了避免竞争条件。 (同样的逻辑适用于请求/响应场景,顺便说一句 - 在发送请求之前设置响应处理程序。)
谢谢!我同意你关于 JMS 的观点,但我的重点是 CMS,据说是一个“克隆”API。至少可以说,代码库并没有激发信心,所以我仍然对使用 C++ 客户端库的体验感到好奇。
另外,不管怎样,如果 JMS 中的请求/回复使用点对点语义(即队列),那么在发送请求之前不需要设置响应处理程序。
好点。我的重点是发布/订阅系统(主题)。以上是关于ActiveMQ CMS:在创建消费者和设置监听器之间会丢失消息吗?的主要内容,如果未能解决你的问题,请参考以下文章
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
ActiveMQ 消息消费者不主动监听消息队列是不是有消息,只监听是是不是有消息进去消息队列