[进阶之路]ActiveMQ实现消息失败重发机制和两种模式(PTP和PubSub)
Posted 爱折腾的稻草
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[进阶之路]ActiveMQ实现消息失败重发机制和两种模式(PTP和PubSub)相关的知识,希望对你有一定的参考价值。
正文
ActiveMQ是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。
消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
然而ActiveMQ默认的配置性能偏低,需要优化,但是配置文件复杂,ActiveMQ本身不提供管理工具,主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,很难由浅入深进行了解。
准备
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- activeMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
添加配置
# ActiveMQ
activemq:
url: failover:(tcp://127.0.0.1:61616)
username: admin
password: admin
defaultQueueName: xiaojl.queue
No.1 点对点模式:
1.1、自定义ActiveMQ的配置类
package io.xiaojl.xmq.config;
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
/**
* <p> Title: ActiveMQConfig </p>
* <p> Description: 自定义ActiveMQ的配置 </p>
*
* activemq的消息确认机制就是文档中说的ack机制有:<br>
* AUTO_ACKNOWLEDGE = 1 自动确认 <br>
* CLIENT_ACKNOWLEDGE = 2 客户端手动确认<br>
* DUPS_OK_ACKNOWLEDGE = 3 自动批量确认<br>
* SESSION_TRANSACTED = 0 事务提交并确认<br>
* INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认 activemq 独有<br>
*
* @author jilong.xiao
* @date 2018年9月30日
*/
@EnableJms
@Configuration
public class ActiveMQConfig {
@Value("${activemq.defaultQueueName}")
private String defaultQueueName;
@Value("${activemq.url}")
private String url;
@Value("${activemq.username}")
private String username;
@Value("${activemq.password}")
private String password;
//配置默认的消息目的地
@Bean
public Queue queue() {
return new ActiveMQQueue(defaultQueueName);
}
/**
*消息重发机制RedeliveryPolicy 以下情况会导致消息重发:
* 1.在使用事务的Session中,调用rollback()方法;
* 2.在使用事务的Session中,调用commit()方法之前就关闭了Session;
* 3.在Session中使用CLIENT_ACKNOWLEDGE签收模式或者INDIVIDUAL_ACKNOWLEDGE模式,并且调用了recover()方法。
*/
@Bean
public RedeliveryPolicy rediliveryPolicy() {
RedeliveryPolicy rp = new RedeliveryPolicy();
// 是否在每次尝试重新发送失败后,增长这个等待时间
rp.setUseExponentialBackOff(true);
// 重发次数,默认为6次 这里设置为10次
rp.setMaximumRedeliveries(10);
// 重发时间间隔,默认为1秒
rp.setInitialRedeliveryDelay(1);
// 第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
rp.setBackOffMultiplier(2);
// 是否避免消息碰撞
rp.setUseCollisionAvoidance(false);
// 设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
rp.setMaximumRedeliveryDelay(-1);
return rp;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username,password, url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
JmsTemplate jmsTemplate=new JmsTemplate();
//进行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setDeliveryMode(2);
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
//此处可不设置默认,在发送消息时也可设置队列
jmsTemplate.setDefaultDestination(queue);
return jmsTemplate;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//设置连接数
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
return factory;
}
}
1.2、编写消息生产者:
package io.xiaojl.xmq.producer;
import javax.jms.Destination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* <p>Title: Producer</p>
*
* <p>Description: 消息生产者</p>
*
* @author jilong.xiao
* @date 2018年9月30日
*/
@Component
public class Producer {
private final static Logger log = LoggerFactory.getLogger(Producer.class);
@Autowired
private JmsTemplate jmsTemplate;
/**
* <p>Title: sendMessage</p>
* <p>Description: 发送消息到指定的Destination中</p>
*
* @param dest
* @param message
*/
public void sendMessage(Destination dest, final String message){
log.debug("[消息发送]-配送模式:"+jmsTemplate.getDeliveryMode()+"\t目的地:"+dest.toString()+"\t发送的报文内容:"+message);
jmsTemplate.convertAndSend(dest, message);
}
/**
* <p>Title: sendMessage</p>
* <p>Description: 发送消息到默认的Queue中</p>
*
* @param message
*/
public void sendMessage(final String message){
log.debug("[消息发送]-配送模式:"+jmsTemplate.getDeliveryMode()+"\t目的地:默认配置的Queue \t发送的报文内容:"+message);
jmsTemplate.convertAndSend(message);
}
}
1.3、编写消息消费者:
package io.xiaojl.xmq.consumer;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* <p>Title: AsynListenerConsumer</p>
* <p>Description: 点对点的消费者[异步监听模式]</p>
*
* @author jilong.xiao
* @date 2018年9月30日
*/
@Component
public class AsynListenerConsumer {
private static final Logger log = LoggerFactory.getLogger(AsynListenerConsumer.class);
@JmsListener(destination="xiaojl.queue",containerFactory="jmsQueueListener")
public void receiveQueue(final TextMessage text, Session session) throws JMSException{
try {
log.debug("[消息处理]-消息ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());
boolean f = true;
if(f){
throw new JMSException("假设出现消息在消费过程中出现异常!");
}
// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
text.acknowledge();
} catch (JMSException e) {
log.error("[消息处理]-出现异常",e.getLocalizedMessage());
session.recover();// 此不可省略 重发信息使用
}
}
}
1.4、编写单元测试:
package io.xiaojl;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import io.xiaojl.xmq.ActivemqApp;
import io.xiaojl.xmq.producer.Producer;
/**
* <p>Title: ApplicationTest</p>
*
* <p>Description: </p>
*
* @author jilong.xiao
* @date 2018年9月30日
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes=ActivemqApp.class)
public class ApplicationTest {
@Autowired
private Producer producer;
@Test
public void t_send(){
String message = "x11111";
Destination dest = new ActiveMQQueue("xiaojl.queue");
producer.sendMessage(dest, message);
//模拟服务在线
while(true){}
}
}
1.5、执行单元测试:
2018-09-30 16:21:41.968 INFO 2420 --- [ main] io.xiaojl.ApplicationTest : Started ApplicationTest in 3.705 seconds (JVM running for 5.228)
2018-09-30 16:21:42.245 DEBUG 2420 --- [ main] io.xiaojl.xmq.producer.Producer : [消息发送]-配送模式:2 目的地:queue://xiaojl.queue 发送的报文内容:x11111
2018-09-30 16:21:42.256 INFO 2420 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://127.0.0.1:61616
2018-09-30 16:21:42.512 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.516 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:42.532 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.532 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:42.536 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.540 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:42.581 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.582 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:42.872 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.872 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:42.912 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.912 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:42.946 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:42.946 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:43.014 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:43.014 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:43.143 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:43.143 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:43.400 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:43.400 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
2018-09-30 16:21:43.913 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1 收到的报文内容:x11111
2018-09-30 16:21:43.914 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer : [消息处理]-出现异常
从执行结果可以看到,消费者AsynListenerConsumer,在处理消息的过程中出现了异常,从而ActiveMQ重发了10次给AsynListenerConsumer。
No.2 发布/订阅模式:
2.1、添加ActiveMQ的配置类
在1.1中自定义的配置类中添加对Topic的支持:
//定义一个消息监听器连接工厂,这里定义的是发布/订阅模式的监听器连接工厂
@Bean(name = "jmsTopicListener")
public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//设置jms并发数
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
//CLIENT_ACKNOWLEDGE = 2 客户端手动确认
factory.setSessionAcknowledgeMode(ActiveMQSession.CLIENT_ACKNOWLEDGE);
//默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
factory.setPubSubDomain(true);
return factory;
}
2.2、编写消息消费者:
消费者1:
package io.xiaojl.xmq.consumer;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* <p>Title: TopicConsumer</p>
* <p>Description: 发布/订阅模式的消费者</p>
*
* @author jilong.xiao
* @date 2018年9月30日
*/
@Component
public class TopicConsumer {
private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);
@JmsListener(destination="xiaojl.topic", containerFactory="jmsTopicListener")
public void receiveTopic(final TextMessage text, Session session) throws JMSException{
try {
log.debug("[消息处理]-消息ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());
boolean f = true;
if(f){
throw new JMSException("假设出现消息在消费过程中出现异常!");
}
// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
text.acknowledge();
} catch (JMSException e) {
log.error("[消息处理]-出现异常",e.getLocalizedMessage());
session.recover();
}
}
}
消费者2:
package io.xiaojl.xmq.consumer;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* <p>Title: TopicConsumer</p>
* <p>Description: 发布/订阅模式的消费者</p>
*
* @author jilong.xiao
* @date 2018年9月30日
*/
@Component
public class TopicConsumer2 {
private static final Logger log = LoggerFactory.getLogger(TopicConsumer2.class);
@JmsListener(destination="xiaojl.topic",containerFactory="jmsTopicListener")
public void receiveTopic(final TextMessage text,Session session) throws JMSException{
try {
log.debug("[消息处理2]-消息ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());
// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
text.acknowledge();
} catch (JMSException e) {
log.error("[消息处理2]-出现异常",e.getLocalizedMessage());
session.recover();
}
}
}
2.3、编写单元测试:
@Test
public void t_sendTopic(){
String message = "xiaojlTopic-111";
Destination dest = new ActiveMQTopic("xiaojl.topic");
producer.sendMessage(dest, message);
//模拟服务在线
while(true){}
}
2.4、执行单元测试:
2018-09-30 16:20:23.419 INFO 6732 --- [ main] io.xiaojl.ApplicationTest : Started ApplicationTest in 3.893 seconds (JVM running for 5.388)
2018-09-30 16:20:23.624 DEBUG 6732 --- [ main] io.xiaojl.xmq.producer.Producer : [消息发送]-配送模式:2 目的地:topic://xiaojl.topic 发送的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.818 INFO 6732 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://127.0.0.1:61616
2018-09-30 16:20:23.892 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer2 : [消息处理2]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.923 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.924 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:23.962 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.964 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:23.971 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.971 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:23.980 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.982 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:23.990 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.991 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:24.011 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.011 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:24.050 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.051 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:24.134 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.135 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:24.266 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.266 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:24.523 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.523 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
2018-09-30 16:20:25.037 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1 收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:25.037 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer : [消息处理]-出现异常
从执行结果可以看到,消费者TopicConsumer,在处理消息的过程中出现了异常,从而ActiveMQ重发了10次给TopicConsumer。
总结
本文中通过一个示例程序讲解ActiveMQ对点对点模式和发布/订阅模式的配置和使用。在此同时,我们还配置了消息失败重试机制。
·end·
- 如果喜欢,快分享给你的朋友们吧 -
我们一起愉快的玩耍吧
以上是关于[进阶之路]ActiveMQ实现消息失败重发机制和两种模式(PTP和PubSub)的主要内容,如果未能解决你的问题,请参考以下文章