同步 Spring ActiveMQ 接收器
Posted
技术标签:
【中文标题】同步 Spring ActiveMQ 接收器【英文标题】:Synchronous Spring ActiveMQ receiver 【发布时间】:2014-03-30 21:12:01 【问题描述】:我的应用程序是基于 Spring 的应用程序。我正在使用 activemq 作为代理。我在我的应用中管理两个不同的队列接收消息。
对于每个队列,我的应用程序的目标是侦听在代理上发送的消息,然后处理消息(读取它,根据来自该消息的信息执行数据库操作等),然后处理下一条消息(因此处理是同步的,我想按照它们到达的顺序处理消息)。
我的实际设计是这个:
我创建了一个线程
<bean id="pollThread" class="my.app.receiver" init-method="start" destroy-method="interrupt">
</bean>
线程在while(true)循环中调用run()方法的方法:
此线程创建与活动 mq 的连接并使用接收阻塞 收到后,我关闭连接并处理消息(数据库等) 治疗完成,方法结束然后治疗重新开始(收听、治疗等)
我的问题是:有什么方法可以更好地设计它吗?我唯一的强制性流程是按到达的顺序处理消息,并在处理下一条消息之前进行处理。
我已经阅读了很多关于 JMSTemplate 等的内容,但我对所有信息都不知所措。
实际上,我最好的猜测是创建一个 PooledConnectionFactory(因为我们将只使用 ActiveMQ 并且 CachingConnectionFactory 似乎在我们的架构中具有重要的限制)并将其限制为一个并发消费者。然后使用 MessageListener 接口继续我的消息。
谢谢
【问题讨论】:
【参考方案1】:您可以使用 Spring 框架提供的消息驱动 POJO 基础架构。它将对目的地进行轮询,并在代理失败等情况下恢复。
您可以更改您的代码(例如YourMessageListener
)来实现MessageListener
。然后下面的配置可以让你开始
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="messageListener" class="org.foo.YourMessageListener"/>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://your-server:61616"/>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue/yourQueue"/>
</bean>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="concurrency" value="1"/>
<property name="messageListener" ref="messageListener"/>
</bean>
</beans>
重要的设置是concurrency
元素,它限制了可以同时处理该目标上的消息的线程数。通过将其设置为 1,一次只有一个线程使用一条消息。
查看documentation了解更多详情
【讨论】:
抱歉,我的回复延迟了。您的回答与我在帖子和您回答时间之间实施的类似。我终于使用了 PooledConnectionFactory 和 jms 命名空间:spring-jms-3.0 concurrency 参数是键,maxConcurrentConsumers 也设置为 1。以上是关于同步 Spring ActiveMQ 接收器的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot的JMS发送和接收队列消息,基于ActiveMQ
JAVAEE——宜立方商城09:Activemq整合spring的应用场景添加商品同步索引库商品详情页面动态展示与使用缓存
ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用