同步 Spring ActiveMQ 接收器

Posted

技术标签:

【中文标题】同步 Spring ActiveMQ 接收器【英文标题】:Synchronous Spring ActiveMQ receiver 【发布时间】:2014-03-30 21:12:01 【问题描述】:

我的应用程序是基于 Spring 的应用程序。我正在使用 activemq 作为代理。我在我的应用中管理两个不同的队列接收消息。

对于每个队列,我的应用程序的目标是侦听在代理上发送的消息,然后处理消息(读取它,根据来自该消息的信息执行数据库操作等),然后处理下一条消息(因此处理是同步的,我想按照它们到达的顺序处理消息)。

我的实际设计是这个:

我创建了一个线程

<bean id="pollThread" class="my.app.receiver" init-method="start" destroy-method="interrupt">
</bean>

线程在while(true)循环中调用run()方法的方法:

此线程创建与活动 mq 的连接并使用接收阻塞 收到后,我关闭连接并处理消息(数据库等) 治疗完成,方法结束

然后治疗重新开始(收听、治疗等)

我的问题是:有什么方法可以更好地设计它吗?我唯一的强制性流程是按到达的顺序处理消息,并在处理下一条消息之前进行处理。

我已经阅读了很多关于 JMSTemplate 等的内容,但我对所有信息都不知所措。

实际上,我最好的猜测是创建一个 PooledConnectionFactory(因为我们将只使用 ActiveMQ 并且 CachingConnectionFactory 似乎在我们的架构中具有重要的限制)并将其限制为一个并发消费者。然后使用 MessageListener 接口继续我的消息。

谢谢

【问题讨论】:

【参考方案1】:

您可以使用 Spring 框架提供的消息驱动 POJO 基础架构。它将对目的地进行轮询,并在代理失败等情况下恢复。

您可以更改您的代码(例如YourMessageListener)来实现MessageListener。然后下面的配置可以让你开始

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans 
   http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="messageListener" class="org.foo.YourMessageListener"/>

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://your-server:61616"/>
    </bean>

    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue/yourQueue"/>
    </bean>

    <bean id="jmsContainer" 
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="destination"/>
        <property name="concurrency" value="1"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>
</beans>

重要的设置是concurrency 元素,它限制了可以同时处理该目标上的消息的线程数。通过将其设置为 1,一次只有一个线程使用一条消息。

查看documentation了解更多详情

【讨论】:

抱歉,我的回复延迟了。您的回答与我在帖子和您回答时间之间实施的类似。我终于使用了 PooledConnectionFactory 和 jms 命名空间:spring-jms-3.0 concurrency 参数是键,maxConcurrentConsumers 也设置为 1。

以上是关于同步 Spring ActiveMQ 接收器的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot的JMS发送和接收队列消息,基于ActiveMQ

JAVAEE——宜立方商城09:Activemq整合spring的应用场景添加商品同步索引库商品详情页面动态展示与使用缓存

ActiveMQ消息队列介绍(转)

ActiveMQ结合Spring开发

学习ActiveMQ:spring与ActiveMQ整合

ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用