Spring JMS(ActiveMQ) 延迟消息传递
Posted
技术标签:
【中文标题】Spring JMS(ActiveMQ) 延迟消息传递【英文标题】:Spring JMS(ActiveMQ) delayed delivery of messages 【发布时间】:2014-09-27 01:39:47 【问题描述】:我们正在尝试对一些 JMS 消息设置延迟,以便消息只会在 x 时间后被添加到队列中/由侦听器接收。到目前为止,我们已经尝试了 2 种无效的方法。
1) 根据spring文档,我们可以在JMSTemplate上设置delivery delay。这是我们尝试的示例代码:
@Autowired
private JmsTemplate jmsTemplate;
...
long deliveryDelay = ...;
this.jmsTemplate.setDeliveryDelay(deliveryDelay);
this.jmsTemplate.convertAndSend(
queue.getName(),
event);
...
然而,我们得到以下异常,即使我们的 spring jms 版本是 4.0.5:
java.lang.IllegalStateException: setDeliveryDelay requires JMS 2.0
2) 我们也尝试在消息本身上设置延迟,但看起来延迟被忽略了,而且消息还是立即送达。
@Component
public class MyMessageConverter implements MessageConverter
...
@Override
public Message toMessage(Object eventObject, Session session) throws JMSException, MessageConversionException
...
long deliveryDelay = ...;
objectMessage.setLongProperty(
ScheduledMessage.AMQ_SCHEDULED_DELAY,
deliveryDelay);
return objectMessage;
spring xml中的jmsTemplate定义:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="messageConverter" ref="myMessageConverter" />
<property name="sessionTransacted" value="true" />
</bean>
是否有人对问题是什么/有关如何实现延迟消息传递的其他想法有任何建议? 谢谢!
【问题讨论】:
您的第一种方法仅适用于符合 JMS 2.0 的 JMS 代理,ActiveMQ 不是(目前)其中之一。仅当您还在 JMS 代理端启用了调度时,第二个选项才会起作用,否则该属性将不执行任何操作。 谢谢@M.Deinum。我在 activemq.xml 中设置了 schedulerSupport="true",现在它可以工作了。 延迟消息存储在客户端? 【参考方案1】:cmets 给出了答案。默认情况下,计划消息支持被禁用。您必须在代理 XML 配置文件中启用它,如 documentation 页面所述。
启用调度程序支持的示例代理标记:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true">
您当然必须重新启动代理才能使配置更改生效。然后,当您发送消息时,您需要添加 JMS 标头,告诉代理您想要什么类型的延迟。
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, scheduledDelay);
【讨论】:
嘿,@Tim Bish!我想知道我应该在哪里定义这条线以启用调度程序支持<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true">
。我检查了您共享的文档,它没有说明任何内容。我的倾向是conf/activem/xml
,但是,我找不到与启用schedulerSupport
相关的任何内容。谢谢!【参考方案2】:
需要做两件事来解决这个问题。
默认情况下,计划消息支持被禁用。我们必须在代理 XML 配置文件中启用它。代理 xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true">
在发送消息之前设置延迟。
public void send(Object object)
log.info("put <" + object + ">");
jmsTemplate.convertAndSend(QUEUE_NAME, object, m ->
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
return m;
);
【讨论】:
您能告诉我在conf/activemq.xml
的哪个位置,我应该定义这一行吗? <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="$activemq.data" schedulerSupport="true">
【参考方案3】:
同样根据 ActiveMQ BrokerService 类中的方法,您应该配置持久性以能够使用调度程序功能。
public boolean isSchedulerSupport()
return this.schedulerSupport && (isPersistent() || jobSchedulerStore != null);
【讨论】:
这实际上取决于您使用的代理版本,以后的版本确实有一个内存调度程序存储,用于在没有持久性的情况下运行的代理。【参考方案4】: jmsTemplate.convertAndSend(destination, message, new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws JMSException
message.setIntProperty("JMS_OracleDelay", 200);
return message;
);
【讨论】:
您能解释一下这段代码 sn-p 是如何回答这个问题的吗?【参考方案5】:broker-url: vm://embedded?broker.persistent=true&broker.useShutdownHook=false&broker.schedulerSupport=true
【讨论】:
感谢您提供此代码 sn-p,它可能会提供一些有限的即时帮助。 proper explanation 将通过展示为什么这是解决问题的好方法,并使其对有其他类似问题的未来读者更有用,从而大大提高其长期价值。请edit您的回答添加一些解释,包括您所做的假设。【参考方案6】:activemq 包不支持 JMS 2.0。尝试改用阿尔忒弥斯。尝试替换包名
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>1.5.6.RELEASE</version>
</dependency>
进入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
<version>1.5.6.RELEASE</version>
</dependency>
并在 application.properties 中添加
spring.artemis.mode=native
spring.artemis.host=localhost
spring.artemis.port=61616
spring.artemis.user=admin
spring.artemis.password=admin
关注这个article
【讨论】:
【参考方案7】:文档http://activemq.apache.org/delay-and-schedule-message-delivery.html
示例:10 秒后消费者收到消息
public void send(Object object)
log.info("put <" + object + ">");
jmsTemplate.convertAndSend(QUEUE_NAME, object, m ->
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
return m;
);
【讨论】:
【参考方案8】:我在活动 mq 配置 xml 中添加了schedulerSupport=true
。请不要忘记在配置更改后重新启动活动的 mq 服务器。重新启动并登录到 activemq 管理控制台上的“已调度”选项卡后,您将看到已调度的消息详细信息。
jmsTemplate.setDeliveryDelay
对我不起作用,所以我添加了以下代码:
用过
jmsTemplate.convertAndSend(queueName, object, new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws JMSException
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, scheduledDelay);
return message;
);
请注意:一开始它对我不起作用。但是需要的是在 activeMQ 服务器上重新启动以反映配置中的更改
【讨论】:
以上是关于Spring JMS(ActiveMQ) 延迟消息传递的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot的JMS发送和接收队列消息,基于ActiveMQ
Spring JMS 和 ActiveMQ 在哪里查看死信队列中的消息