ActiveMQ 重发机制与确认机制 实践

Posted 技术宅home

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ 重发机制与确认机制 实践相关的知识,希望对你有一定的参考价值。

一、配置spring-activemq.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 5 
 6     <!-- 第三方MQ工厂: ConnectionFactory -->
 7     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 8         <!-- ActiveMQ Address -->
 9         <property name="brokerURL" value="${activemq.brokerURL}"/>
10         <property name="userName" value="${activemq.userName}"/>
11         <property name="password" value="${activemq.password}"/>
12         <!-- 是否异步发送 -->
13         <property name="useAsyncSend" value="true"/>
14         <!-- 引用重发机制 -->
15         <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
16         <!-- 消息传输监听器 处理网络及服务器异常 -->
17         <!--<property name="transportListener">-->
18         <!--<bean class="com.schooling.activemq.ActiveMQTransportListener"/>-->
19         <!--</property>-->
20     </bean>
21 
22     <!--
23         ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
24         可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包
25      -->
26     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
27         <property name="connectionFactory" ref="targetConnectionFactory"/>
28         <property name="maxConnections" value="${activemq.pool.maxConnections}"/>
29     </bean>
30 
31     <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 -->
32     <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
33         <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
34         <property name="useExponentialBackOff" value="true"/>
35         <!--重发次数,默认为6次   这里设置为1次 -->
36         <property name="maximumRedeliveries" value="2"/>
37         <!--重发时间间隔,默认为1秒 -->
38         <property name="initialRedeliveryDelay" value="1000"/>
39         <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
40         <property name="backOffMultiplier" value="2"/>
41         <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,
42         第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
43         <property name="maximumRedeliveryDelay" value="1000"/>
44     </bean>
45 
46     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
47     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
48         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
49         <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
50     </bean>
51 
52     <!--这个是目的地-->
53     <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
54         <constructor-arg value="${activemq.queueName}"/>
55     </bean>
56 
57     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
58     <!-- 队列模板 -->
59     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
60         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
61         <property name="connectionFactory" ref="connectionFactory"/>
62         <property name="defaultDestinationName" value="${activemq.queueName}"/>
63     </bean>
64 
65     <!-- 配置自定义监听:MessageListener -->
66     <bean id="msgQueueMessageListener" class="com.schooling.activemq.consumer.MsgQueueMessageListener"/>
67 
68     <!-- 将连接工厂、目标对了、自定义监听注入jms模板 -->
69     <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
70         <property name="connectionFactory" ref="connectionFactory"/>
71         <property name="destination" ref="msgQueue"/>
72         <property name="messageListener" ref="msgQueueMessageListener"/>
73         <!--应答模式是 INDIVIDUAL_ACKNOWLEDGE-->
74         <property name="sessionAcknowledgeMode" value="4"/>
75     </bean>
76 
77 </beans>

二、生产者

 1 @Service("activeMQProducer")
 2 public class ActiveMQProducer {
 3 
 4     private JmsTemplate jmsTemplate;
 5 
 6     public JmsTemplate getJmsTemplate() {
 7         return jmsTemplate;
 8     }
 9 
10     @Autowired
11     public void setJmsTemplate(JmsTemplate jmsTemplate) {
12         this.jmsTemplate = jmsTemplate;
13     }
14 
15     public void sendMessage(final String info) {
16         jmsTemplate.send(new MessageCreator() {
17             public Message createMessage(Session session) throws JMSException {
18                 return session.createTextMessage(info);
19             }
20         });
21     }
22 }

三、消费者(监听模式)

 1 public class MsgQueueMessageListener implements SessionAwareMessageListener<Message> {
 2 
 3     @Override
 4     public void onMessage(Message message, Session session) throws JMSException {
 5 
 6         if (message instanceof TextMessage) {
 7 
 8             String msg = ((TextMessage) message).getText();
 9 
10             System.out.println("============================================================");
11             System.out.println("消费者收到的消息:" + msg);
12             System.out.println("============================================================");
13 
14             try {
15                 if ("我是队列消息002".equals(msg)) {
16                     throw new RuntimeException("故意抛出的异常");
17                 }
18                 // 只要被确认后  就会出队,接受失败没有确认成功,会在原队列里面
19                 message.acknowledge();
20             } catch (Exception e) {
21                 // 此不可省略 重发信息使用
22                 session.recover();
23             }
24         }
25     }
26 }

四、测试方法

1 @Test
2 public void send() {
3     for (int i = 1; i < 5; i++) {
4         this.activeMQProducer.sendMessage("我是队列消息00" + i);
5     }
6     while (true) {}
7 }

五、测试结果

六、测试小结

“我是队列消息002”由于异常,未接收成功。在重发2次都失败的情况下被发送到“死信队列”。其他4条信息都接收成功。

 

源码地址:https://gitee.com/tab_sun/TestSpringActiveMQ

 

以上是关于ActiveMQ 重发机制与确认机制 实践的主要内容,如果未能解决你的问题,请参考以下文章

(7)消息的确认和重发机制

activemq-重发去重

ActiveMQ重试机制

ActiveMQ 重发机制(消息发送失败后的重新发送)

ActiveMQ中消息的重发与持久化保存

springboot+activemq中引入重发机制