Spring JMS(ActiveMQ) 延迟消息传递

Posted

技术标签:

【中文标题】Spring JMS(ActiveMQ) 延迟消息传递【英文标题】:Spring JMS(ActiveMQ) delayed delivery of messages 【发布时间】:2014-09-27 01:39:47 【问题描述】:

我们正在尝试对一些 JMS 消息设置延迟,以便消息只会在 x 时间后被添加到队列中/由侦听器接收。到目前为止,我们已经尝试了 2 种无效的方法。

1) 根据spring文档,我们可以在JMSTemplate上设置delivery delay。这是我们尝试的示例代码:

@Autowired
private JmsTemplate jmsTemplate;

...
long deliveryDelay = ...;
this.jmsTemplate.setDeliveryDelay(deliveryDelay);
this.jmsTemplate.convertAndSend(
                    queue.getName(),
                    event);
...

然而,我们得到以下异常,即使我们的 spring jms 版本是 4.0.5:

java.lang.IllegalStateException: setDeliveryDelay requires JMS 2.0

2) 我们也尝试在消息本身上设置延迟,但看起来延迟被忽略了,而且消息还是立即送达。

@Component
public class MyMessageConverter implements MessageConverter 

...

@Override
public Message toMessage(Object eventObject, Session session) throws JMSException, MessageConversionException 

...
long deliveryDelay = ...;
objectMessage.setLongProperty(
                  ScheduledMessage.AMQ_SCHEDULED_DELAY,
                  deliveryDelay);
return objectMessage;


spring xml中的jmsTemplate定义:

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="messageConverter" ref="myMessageConverter" />
    <property name="sessionTransacted" value="true" />
</bean>

是否有人对问题是什么/有关如何实现延迟消息传递的其他想法有任何建议? 谢谢!

【问题讨论】:

您的第一种方法仅适用于符合 JMS 2.0 的 JMS 代理,ActiveMQ 不是(目前)其中之一。仅当您还在 JMS 代理端启用了调度时,第二个选项才会起作用,否则该属性将不执行任何操作。 谢谢@M.Deinum。我在 activemq.xml 中设置了 schedulerSupport="true",现在它可以工作了。 延迟消息存储在客户端? 【参考方案1】:

cmets 给出了答案。默认情况下,计划消息支持被禁用。您必须在代理 XML 配置文件中启用它,如 documentation 页面所述。

启用调度程序支持的示例代理标记:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true">

您当然必须重新启动代理才能使配置更改生效。然后,当您发送消息时,您需要添加 JMS 标头,告诉代理您想要什么类型的延迟。

message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, scheduledDelay);

【讨论】:

嘿,@Tim Bish!我想知道我应该在哪里定义这条线以启用调度程序支持&lt;broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true"&gt;。我检查了您共享的文档,它没有说明任何内容。我的倾向是conf/activem/xml,但是,我找不到与启用schedulerSupport 相关的任何内容。谢谢!【参考方案2】:

需要做两件事来解决这个问题。

默认情况下,计划消息支持被禁用。我们必须在代理 XML 配置文件中启用它。

代理 xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true">

在发送消息之前设置延迟。

public void send(Object object) 
    log.info("put <" + object + ">");
    jmsTemplate.convertAndSend(QUEUE_NAME, object, m -> 
       m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
       return m;
    );

【讨论】:

您能告诉我在conf/activemq.xml 的哪个位置,我应该定义这一行吗? &lt;broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true"&gt;【参考方案3】:

同样根据 ActiveMQ BrokerService 类中的方法,您应该配置持久性以能够使用调度程序功能。

public boolean isSchedulerSupport() 
    return this.schedulerSupport && (isPersistent() || jobSchedulerStore != null);

【讨论】:

这实际上取决于您使用的代理版本,以后的版本确实有一个内存调度程序存储,用于在没有持久性的情况下运行的代理。【参考方案4】:
  jmsTemplate.convertAndSend(destination, message, new MessagePostProcessor() 
            @Override
            public Message postProcessMessage(Message message) throws JMSException 
                message.setIntProperty("JMS_OracleDelay", 200);
                return message;
            
        );

【讨论】:

您能解释一下这段代码 sn-p 是如何回答这个问题的吗?【参考方案5】:
broker-url: vm://embedded?broker.persistent=true&broker.useShutdownHook=false&broker.schedulerSupport=true

【讨论】:

感谢您提供此代码 sn-p,它可能会提供一些有限的即时帮助。 proper explanation 将通过展示为什么这是解决问题的好方法,并使其对有其他类似问题的未来读者更有用,从而大大提高其长期价值。请edit您的回答添加一些解释,包括您所做的假设。【参考方案6】:

activemq 包不支持 JMS 2.0。尝试改用阿尔忒弥斯。尝试替换包名

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>1.5.6.RELEASE</version> 
    </dependency>

进入

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-artemis</artifactId>
        <version>1.5.6.RELEASE</version> 
    </dependency>

并在 application.properties 中添加

    spring.artemis.mode=native
    spring.artemis.host=localhost
    spring.artemis.port=61616
    spring.artemis.user=admin
    spring.artemis.password=admin

关注这个article

【讨论】:

【参考方案7】:

文档http://activemq.apache.org/delay-and-schedule-message-delivery.html

示例:10 秒后消费者收到消息

public void send(Object object) 
    log.info("put <" + object + ">");
    jmsTemplate.convertAndSend(QUEUE_NAME, object, m -> 
        m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
        return m;
    );

【讨论】:

【参考方案8】:

    我在活动 mq 配置 xml 中添加了schedulerSupport=true。请不要忘记在配置更改后重新启动活动的 mq 服务器。重新启动并登录到 activemq 管理控制台上的“已调度”选项卡后,您将看到已调度的消息详细信息。

    jmsTemplate.setDeliveryDelay 对我不起作用,所以我添加了以下代码: 用过

    jmsTemplate.convertAndSend(queueName, object, new MessagePostProcessor() 
            @Override
            public Message postProcessMessage(Message message) throws JMSException 
                message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, scheduledDelay);
                return message;
            
        );

请注意:一开始它对我不起作用。但是需要的是在 activeMQ 服务器上重新启动以反映配置中的更改

【讨论】:

以上是关于Spring JMS(ActiveMQ) 延迟消息传递的主要内容,如果未能解决你的问题,请参考以下文章

Spring整合JMS——基于ActiveMQ实现

Spring Boot的JMS发送和接收队列消息,基于ActiveMQ

Spring JMS 和 ActiveMQ 在哪里查看死信队列中的消息

深入浅出JMS--Spring和ActiveMQ整合的完整实例

Spring整合activeMQ消息队列

ActiveMQ实例2--Spring JMS发送消息