RabbitMQ-发布确认高级

Posted weixin_43956692

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-发布确认高级相关的知识,希望对你有一定的参考价值。

  一、介绍

         背景:在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。特别是比较极端的情况,rabbitmq集群不可用时。

  二、实现
        1、正常情况下代码

        配置类

@Configuration
public class ConfirmConfig 
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange()
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    
    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue()
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    
    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange) 
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    

      生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController 

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //发消息
    @GetMapping("/sendMessage/message")
    public void sendMessage(@PathVariable String message)
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,message);
        log.info("发送消息的内容为:",message);
    

      消费者

@Slf4j
@Component
public class ConfirmConsumer 
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message)
        log.info("接收到队列confirm_queue消息:",new String(message.getBody()));
    
      2、发布确认实现

         实现回调接口RabbitTemplate.ConfirmCallback,并且在配置文件中添加下面配置

spring.rabbitmq.publisher-confirm-type=correlated

        NONE:禁用发布模式,默认值

       CORRELATED:发布消息成功到交换机后出发回调方法

       SIMPLE:经测试有两种效果

                    1、和CORRELATED一样触发回调

                    2、在消息发布成功使用rabbitTemplate调用waitForConfirms或waitForConfirmOrDie方法等待broker节返回发送结果,根据返回结果来判定下一步的逻辑,需要注意的是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback 

  
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init()
        //注入
        rabbitTemplate.setConfirmCallback(this);
    
    /**
     * 交换机回调方法
     * @param correlationData    保存的回调消息的id及相关信息
     * @param ack   交换机收到消息  成功为true  失败为false
     * @param cause  失败原因     成功为null 
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) 
        String id = correlationData.getId() != null?correlationData.getId():"";
        if(ack)
            log.info("交换机已经收到了id为的消息",id);
        else
            log.info("交换机没有收到id为的消息,由于原因",id,cause);
        
    

       修改消费者代码

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController 

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //发消息
    @GetMapping("/sendMessage/message")
    public void sendMessage(@PathVariable String message)
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
        log.info("发送消息的内容为:",message);
    

        但这样只是解决了交换机出现故障,可以有回调,而队列出现问题的时候,还无法解决。

     3、回退消息

          Mandatory参数

          在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个时事件的。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

         实现

         1>添加如下配置

spring.rabbitmq.publisher-returns=true

         2>实现RabbitTemplate.ReturnCallback 接口,重写returnedMessage方法

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback 


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init()
        //注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    
    /**
     * 交换机回调方法
     * @param correlationData    保存的回调消息的id及相关信息
     * @param ack   交换机收到消息  成功为true  失败为false
     * @param cause  失败原因     成功为null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) 
        String id = correlationData.getId() != null?correlationData.getId():"";
        if(ack)
            log.info("交换机已经收到了id为的消息",id);
        else
            log.info("交换机没有收到id为的消息,由于原因",id,cause);
        
    
    //消息不可达时,将消息返回给生产者
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 
        log.info("消息:。被交换机:退回,退回的原因:,路由key:",
                new String(message.getBody()),exchange,replyText,routingKey);
    
  4、备份交换机

       1>修改之前确认交换机的配置列,添加备份交换机与备份队列,以及报警队列及绑定,同时修改确认交换机的声明(添加备份交换机的配置)

@Configuration
public class ConfirmConfig 
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //报警队列
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    //备份交换机
    @Bean("backupExchange")
    public FanoutExchange backupExchange()
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    
    //备份队列
    @Bean("backupQueue")
    public Queue backupQueue()
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    
    //报警队列
    @Bean("warningQueue")
    public Queue warningQueue()
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    
    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange()

        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME) //指定备份交换机
                .build();
    
    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue()
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    
    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange) 
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    
    //绑定
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
                                                    @Qualifier("backupExchange") FanoutExchange backupExchange)
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    
    //绑定
    @Bean
    public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange)
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    

   2>添加报警消费者

@Slf4j
@Component
public class WarningConsumer 
   @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMessage(Message message)
       String msg = new String(message.getBody());
       log.info("报警发现不可路由消息:",msg);
   

    当消息回退和备份交换机同时配置,则备份交换机优先

以上是关于RabbitMQ-发布确认高级的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:发布确认高级

RabbitMQ发布确认高级

RabbitMQ-发布确认高级

RabbitMQ学习--发布确认高级

RabbitMQ学习--发布确认高级

09-rabbitMq-发布确认高级-springBoot版本