将最后发送的消息发送给 jms 主题的新消费者

Posted

技术标签:

【中文标题】将最后发送的消息发送给 jms 主题的新消费者【英文标题】:Send last sent message to new consumer on a jms topic 【发布时间】:2014-03-28 23:34:15 【问题描述】:

是否可以将主题配置为仅存储最后一条消息的副本并将其发送到新连接而不知道客户端标识符或其他信息?

更新: 从 Shashi 提供的信息中,我发现这两页使用retroactive consumer 和subscription recovery policy 描述了一个类似于我的用例(适用于股票价格)。我怎么没有得到想要的行为。我目前做的是:

在activemq中包含topic=">"的policyEntry中的以下行

<subscriptionRecoveryPolicy>
  <fixedCountSubscriptionRecoveryPolicy maximumSize="1"/> 
</subscriptionRecoveryPolicy>

添加到用于连接代理的 URL(使用 activemq-cpp)consumer.retroactive=true

设置消费者具有耐用性。 (但我强烈认为这是不想要的,因为我只需要最后一个,但没有它我在第二次启动消费者时没有收到任何消息)

    启动代理。

    启动消费者。

    使用 activemq Web 管理控制台向主题发送消息。 (正如预期的那样,我在消费者中收到它)

    停止消费者。

    向主题发送另一条消息。

    启动消费者。我也按预期收到了消息。

但是,如果消费者收到一条消息,那么它会离线(停止进程)然后我重新启动它,它不会返回最后一条消息。

目标是无论何时消费者开始获取最后一条消息,无论如何(显然,除非没有消息发送到主题)。

关于我缺少什么的任何想法?

背景:

我有一个设备,当它的数据发生变化时,它会将他的数据发布到一个主题。可变数量的消费者可以连接到这个主题,从 0 到小于 10。主题中只有一个发布者,并且总是将他的所有数据作为单个消息发布(数据很少,只是传感器的几个字段读)。此信息的发布率是可变的,不一定基于时间,当发生更改时,会将新的更新消息发送到代理。 问题是当新的消费者连接到主题时,它没有设备读数的数据,直到设备向主题发送新消息。这可以通过创建一个额外的队列来解决,以便新连接可以订阅该主题,然后请求设备通过队列进行当前读取(设备将使用队列消息,这将是对数据的请求,然后在同一个队列)。 但是由于发送到主题的消息始终是完整的信息,我想知道是否可以将主题配置为仅存储最后一条消息的副本并将其发送到不知道客户端标识符或其他信息的新连接?

当前使用的代理是 ActiveMQ。

【问题讨论】:

【参考方案1】:

您想要的是拥有retroactive consumers 并在主题上设置lastImageSubscriptionRecoveryPolicy subscription recovery policy。 Shashi 正确地说,以下将消费者设置为追溯的语法仅适用于 Openwire

topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");

在您的情况下,您可以使用alwaysRetroactive="true" 将所有消费者配置为在代理配置中具有追溯性。我测试了这甚至适用于 AMQP 协议(​​库 qpid-jms-client),我怀疑它适用于所有协议。

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="FOO.>" alwaysRetroactive="true">
        <subscriptionRecoveryPolicy>
          <lastImageSubscriptionRecoveryPolicy />
        </subscriptionRecoveryPolicy>
      </policyEntry>

配置示例取自https://github.com/apache/activemq/blob/master/activemq-unit-tests/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml

【讨论】:

【参考方案2】:

消息传递提供程序(例如 WebSphere MQ)有一个名为 Retained Publication 的特性。使用此功能,消息提供者会保留有关主题的最后发布消息,并在消息发布给给定主题后将其传递给新的消费者。

Retained Publication 可能在 Active MQ 的本机接口中得到支持。这个link 谈到了consumer.retroactive,它仅适用于 OpenWire。

发布者将在发布之前通过设置消息的属性来告诉消息传递提供者保留发布。下面是如何使用 WebSphere MQ 完成的。

// set as a retained publication
msg.setIntProperty(JmsConstants.JMS_IBM_RETAIN, JmsConstants.RETAIN_PUBLICATION)

【讨论】:

以上是关于将最后发送的消息发送给 jms 主题的新消费者的主要内容,如果未能解决你的问题,请参考以下文章

JMS 工作流 - 混合队列和主题

发送到JMS队列的消息将仅由单个消费者使用?

Jms:具有多个消费者的 Pub/Sub

Kafka消息的偏移量和顺序消费原理

JMS 2.0 - 如何接收来自共享消费者主题的消息?

JMS编程模型