[进阶之路]ActiveMQ实现消息失败重发机制和两种模式(PTP和PubSub)

Posted 爱折腾的稻草

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[进阶之路]ActiveMQ实现消息失败重发机制和两种模式(PTP和PubSub)相关的知识,希望对你有一定的参考价值。

正文

ActiveMQ是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。
消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。

然而ActiveMQ默认的配置性能偏低,需要优化,但是配置文件复杂,ActiveMQ本身不提供管理工具,主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,很难由浅入深进行了解。

准备

添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- activeMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
添加配置
# ActiveMQ   
activemq: 
    url: failover:(tcp://127.0.0.1:61616) 
    username: admin
    password: admin 
    defaultQueueName: xiaojl.queue

No.1 点对点模式:

1.1、自定义ActiveMQ的配置类
package io.xiaojl.xmq.config;

import javax.jms.Queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

/**
 * <p> Title: ActiveMQConfig  </p>
 * <p> Description: 自定义ActiveMQ的配置 </p>
 * 
 * activemq的消息确认机制就是文档中说的ack机制有:<br>
 * AUTO_ACKNOWLEDGE = 1 自动确认  <br>
 * CLIENT_ACKNOWLEDGE = 2 客户端手动确认<br>
 * DUPS_OK_ACKNOWLEDGE = 3 自动批量确认<br>
 * SESSION_TRANSACTED = 0 事务提交并确认<br>
 * INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认 activemq 独有<br>
 *
 * @author jilong.xiao
 * @date 2018年9月30日
 */

@EnableJms
@Configuration
public class ActiveMQConfig {
    @Value("${activemq.defaultQueueName}")
    private String defaultQueueName;
    @Value("${activemq.url}")
    private String url;
    @Value("${activemq.username}")
    private String username;
    @Value("${activemq.password}")
    private String password;

    //配置默认的消息目的地
    @Bean
    public Queue queue() {
        return new ActiveMQQueue(defaultQueueName);
    }

    /**
     *消息重发机制RedeliveryPolicy 以下情况会导致消息重发:
     * 1.在使用事务的Session中,调用rollback()方法;
     * 2.在使用事务的Session中,调用commit()方法之前就关闭了Session;
     * 3.在Session中使用CLIENT_ACKNOWLEDGE签收模式或者INDIVIDUAL_ACKNOWLEDGE模式,并且调用了recover()方法。 
     */

    @Bean
    public RedeliveryPolicy rediliveryPolicy() {
        RedeliveryPolicy rp = new RedeliveryPolicy();
        // 是否在每次尝试重新发送失败后,增长这个等待时间
        rp.setUseExponentialBackOff(true);
        // 重发次数,默认为6次 这里设置为10次
        rp.setMaximumRedeliveries(10);
        // 重发时间间隔,默认为1秒
        rp.setInitialRedeliveryDelay(1);
        // 第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
        rp.setBackOffMultiplier(2);
        // 是否避免消息碰撞
        rp.setUseCollisionAvoidance(false);
        // 设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        rp.setMaximumRedeliveryDelay(-1);

        return rp;
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (RedeliveryPolicy redeliveryPolicy){  
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username,password, url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
        JmsTemplate jmsTemplate=new JmsTemplate();
        //进行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setDeliveryMode(2);
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
        //此处可不设置默认,在发送消息时也可设置队列
        jmsTemplate.setDefaultDestination(queue); 
        return jmsTemplate;
    }

    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //设置连接数
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        return factory;
    }

}
1.2、编写消息生产者:
package io.xiaojl.xmq.producer;

import javax.jms.Destination;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/**  
 * <p>Title: Producer</p>  
 *
 * <p>Description: 消息生产者</p>  
 *
 * @author jilong.xiao  
 * @date 2018年9月30日  
 */

@Component
public class Producer {
    private final static Logger log = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * <p>Title: sendMessage</p>  
     * <p>Description: 发送消息到指定的Destination中</p>  
     *
     * @param dest 
     * @param message
     */

    public void sendMessage(Destination dest, final String message){
        log.debug("[消息发送]-配送模式:"+jmsTemplate.getDeliveryMode()+"\t目的地:"+dest.toString()+"\t发送的报文内容:"+message);
        jmsTemplate.convertAndSend(dest, message);
    }

    /**
     * <p>Title: sendMessage</p>  
     * <p>Description: 发送消息到默认的Queue中</p>  
     *
     * @param message
     */

    public void sendMessage(final String message){
        log.debug("[消息发送]-配送模式:"+jmsTemplate.getDeliveryMode()+"\t目的地:默认配置的Queue \t发送的报文内容:"+message);
        jmsTemplate.convertAndSend(message);
    }

}
1.3、编写消息消费者:
package io.xiaojl.xmq.consumer;


import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**  
 * <p>Title: AsynListenerConsumer</p>  
 * <p>Description: 点对点的消费者[异步监听模式]</p>  
 *
 * @author jilong.xiao  
 * @date 2018年9月30日  
 */

@Component
public class AsynListenerConsumer {
    private static final Logger log = LoggerFactory.getLogger(AsynListenerConsumer.class);

    @JmsListener(destination="xiaojl.queue",containerFactory="jmsQueueListener")
    public void receiveQueue(final TextMessage text, Session session) throws JMSException{
        try {
            log.debug("[消息处理]-消息ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());

            boolean f = true;
            if(f){
                throw new JMSException("假设出现消息在消费过程中出现异常!");
            }

            // 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
            text.acknowledge();
        } catch (JMSException e) {
            log.error("[消息处理]-出现异常",e.getLocalizedMessage());
            session.recover();// 此不可省略 重发信息使用
        }

    }

}
1.4、编写单元测试:
package io.xiaojl;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import io.xiaojl.xmq.ActivemqApp;
import io.xiaojl.xmq.producer.Producer;

/**  
 * <p>Title: ApplicationTest</p>  
 *
 * <p>Description: </p>  
 *
 * @author jilong.xiao  
 * @date 2018年9月30日  
 */

@RunWith(SpringRunner.class)
@SpringBootTest(classes=ActivemqApp.class)
public class ApplicationTest {
    @Autowired
    private Producer producer;

    @Test
    public void t_send(){
        String message = "x11111";
        Destination dest = new ActiveMQQueue("xiaojl.queue");
        producer.sendMessage(dest, message);

        //模拟服务在线
        while(true){}
    }

}
1.5、执行单元测试:
2018-09-30 16:21:41.968  INFO 2420 --- [           main] io.xiaojl.ApplicationTest                : Started ApplicationTest in 3.705 seconds (JVM running for 5.228)
2018-09-30 16:21:42.245 DEBUG 2420 --- [           main] io.xiaojl.xmq.producer.Producer          : [消息发送]-配送模式:2    目的地:queue://xiaojl.queue    发送的报文内容:x11111
2018-09-30 16:21:42.256  INFO 2420 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://127.0.0.1:61616
2018-09-30 16:21:42.512 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.516 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:42.532 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.532 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:42.536 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.540 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:42.581 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.582 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:42.872 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.872 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:42.912 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.912 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:42.946 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:42.946 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:43.014 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:43.014 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:43.143 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:43.143 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:43.400 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:43.400 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常
2018-09-30 16:21:43.913 DEBUG 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-消息ID:ID:x6x8-20180227TY-53116-1538295701662-1:4:1:1:1    收到的报文内容:x11111
2018-09-30 16:21:43.914 ERROR 2420 --- [enerContainer-1] i.x.xmq.consumer.AsynListenerConsumer    : [消息处理]-出现异常

从执行结果可以看到,消费者AsynListenerConsumer,在处理消息的过程中出现了异常,从而ActiveMQ重发了10次给AsynListenerConsumer。

No.2 发布/订阅模式:

2.1、添加ActiveMQ的配置类

在1.1中自定义的配置类中添加对Topic的支持:

//定义一个消息监听器连接工厂,这里定义的是发布/订阅模式的监听器连接工厂
@Bean(name = "jmsTopicListener")
public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory){
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(activeMQConnectionFactory);
    //设置jms并发数
    factory.setConcurrency("1-10");
    //重连间隔时间
    factory.setRecoveryInterval(1000L);
    //CLIENT_ACKNOWLEDGE = 2   客户端手动确认
    factory.setSessionAcknowledgeMode(ActiveMQSession.CLIENT_ACKNOWLEDGE); 
    //默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
    factory.setPubSubDomain(true); 

    return factory;
}
2.2、编写消息消费者:
  • 消费者1:

package io.xiaojl.xmq.consumer;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**  
 * <p>Title: TopicConsumer</p>  
 * <p>Description: 发布/订阅模式的消费者</p>  
 *
 * @author jilong.xiao  
 * @date 2018年9月30日  
 */

@Component
public class TopicConsumer {
    private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);

    @JmsListener(destination="xiaojl.topic", containerFactory="jmsTopicListener")
    public void receiveTopic(final TextMessage text, Session session) throws JMSException{
        try {
            log.debug("[消息处理]-消息ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());

            boolean f = true;
            if(f){
                throw new JMSException("假设出现消息在消费过程中出现异常!");
            }

            // 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
            text.acknowledge();
        } catch (JMSException e) {
            log.error("[消息处理]-出现异常",e.getLocalizedMessage());
            session.recover();
        }
    }
}
  • 消费者2:

package io.xiaojl.xmq.consumer;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**  
 * <p>Title: TopicConsumer</p>  
 * <p>Description: 发布/订阅模式的消费者</p>  
 *
 * @author jilong.xiao  
 * @date 2018年9月30日  
 */

@Component
public class TopicConsumer2 {
    private static final Logger log = LoggerFactory.getLogger(TopicConsumer2.class);

    @JmsListener(destination="xiaojl.topic",containerFactory="jmsTopicListener")
    public void receiveTopic(final TextMessage text,Session session) throws JMSException{
        try {
            log.debug("[消息处理2]-消息ID:"+text.getJMSMessageID()+"\t收到的报文内容:"+text.getText());

            // 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
            text.acknowledge();
        } catch (JMSException e) {
            log.error("[消息处理2]-出现异常",e.getLocalizedMessage());
            session.recover();
        }
    }
}
2.3、编写单元测试:
@Test
public void t_sendTopic(){
    String message = "xiaojlTopic-111";
    Destination dest = new ActiveMQTopic("xiaojl.topic");
    producer.sendMessage(dest, message);

    //模拟服务在线
    while(true){}
}
2.4、执行单元测试:
2018-09-30 16:20:23.419  INFO 6732 --- [           main] io.xiaojl.ApplicationTest                : Started ApplicationTest in 3.893 seconds (JVM running for 5.388)
2018-09-30 16:20:23.624 DEBUG 6732 --- [           main] io.xiaojl.xmq.producer.Producer          : [消息发送]-配送模式:2    目的地:topic://xiaojl.topic    发送的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.818  INFO 6732 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://127.0.0.1:61616
2018-09-30 16:20:23.892 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer2    : [消息处理2]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.923 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.924 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:23.962 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.964 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:23.971 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.971 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:23.980 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.982 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:23.990 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:23.991 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:24.011 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.011 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:24.050 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.051 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:24.134 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.135 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:24.266 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.266 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:24.523 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:24.523 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常
2018-09-30 16:20:25.037 DEBUG 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-消息ID:ID:x6x8-20180227TY-53107-1538295623053-1:4:1:1:1    收到的报文内容:xiaojlTopic-111
2018-09-30 16:20:25.037 ERROR 6732 --- [enerContainer-1] io.xiaojl.xmq.consumer.TopicConsumer     : [消息处理]-出现异常

从执行结果可以看到,消费者TopicConsumer,在处理消息的过程中出现了异常,从而ActiveMQ重发了10次给TopicConsumer。

总结

本文中通过一个示例程序讲解ActiveMQ对点对点模式和发布/订阅模式的配置和使用。在此同时,我们还配置了消息失败重试机制。

·end·
- 如果喜欢,快分享给你的朋友们吧 -
我们一起愉快的玩耍吧

以上是关于[进阶之路]ActiveMQ实现消息失败重发机制和两种模式(PTP和PubSub)的主要内容,如果未能解决你的问题,请参考以下文章

学习ActiveMQ:JMS消息的确认与重发机制

springboot+activemq中引入重发机制

(7)消息的确认和重发机制

使用SpringCloud Stream结合rabbitMQ实现消息消费失败重发机制

activemq-重发去重

ActiveMQ中消息的重发与持久化保存