RabbitMQ发布确认高级部分

Posted 北海冥鱼未眠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ发布确认高级部分相关的知识,希望对你有一定的参考价值。

首先复习一下我们前面学习的发布确认,生产者将信道设置成 confirm 模式,这样就可以确定消息是否被队列接收,这样就可以在回调中进行相应的处理了。

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,
导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

实验演示环境搭建

编写配置类

package com.dongmu.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig 
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";
    //声明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange()
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    
    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue()
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    
    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    


编写生产者

package com.dongmu.rabbitmq.controller;

import com.dongmu.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController 

    @Autowired
    RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("sendMessage/message")
    public void sendMessage (@PathVariable("message") String message)
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,message);

        log.info("发送消息内容:",message);
    


编写消费者

@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfigQueue(Message message)
        String s = new String(message.getBody());
        log.info("接收到的队列confirm.queue的消息",s);
    


上面的环境已经搭建完成并且可以成功访问,消费者也能够正常消费,但是我们为了能够在交换机服务失败的时候能够保证服务不出现问题,就还需要下面的内容

RabbitTemplate.ConfirmCallback是一个内部接口我们需要实现这个接口并且创建这个对象,并且把这个对象注入到RabbitTemplate实例当中去。

package com.dongmu.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback 

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /*
     首先,JVM在加载阶段通过类的全路径找到该类并把类的信息加载到JVM的方法区,
     然后在堆区实例化有关java.lang.Class对象,作为方法区中这个类的信息的入口;
     然后在连接阶段,为类的静态变量logClinet分配内存并赋予JVM默认初始值null;
     因为注解@PostConstruct的缘故,在类初始化之前会先加载该使用该注解的方法;然后再执行类的初始化。

    注:
    构造方法 ——> @Autowired —— > @PostConstruct ——> 静态方法 (按此顺序加载)

     */
    @PostConstruct
    public void init()
        rabbitTemplate.setConfirmCallback(this);
    

    /*
    交换机确认回调的方法
    1:生产者发送消息到交换机,交换机接收到了,就会回调
        correlationData:保存回调消息的id等信息
        b:交换机接收到了消息:true
        s:null
    2:发消息接收机接收失败了
        correlationData:保存回调消息的id等相关信息
        b:false
        s:失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) 
        if (b)
            log.info("交换机收到了消息,id是:",correlationData.getId());
        else 
            log.info("接收机收到消息失败了,消息id是:,失败的原因是:",correlationData.getId(),s);
        
    


代码中我们注意到
correlationData:保存回调消息的id等信息这个里面的消息不是凭空而来的,而是我们在生产者中需要注入的信息,这样才能回去到。

这个时候我们就要把前面生产者中的代码添加下面的参数

//发消息
    @GetMapping("sendMessage/message")
    public void sendMessage (@PathVariable("message") String message)

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("10011");
        



        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送消息内容:",message);
    

最后我们还需要在配置文件中添加一句
spring.rabbitmq.publisher-confirm-type=correlated

  • 他的默认值是none,表示禁用发布确认模式。
  • correlated:表示发布消息成功到交换机后的回调方法
  • simple:效果和correlated一样,区别是在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

这个时候我们启动项目然后访问接口,结果如下

下面延时接收机接收消息失败的情况
把生产者的代码改成下面这样,即只是修改一下交换机的名字

//发消息
    @GetMapping("sendMessage/message")
    public void sendMessage (@PathVariable("message") String message)

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("10011");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123",
                ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送消息内容:",message);
    

然后继续访问

2022-06-10 19:42:25.226  INFO 104744 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
2022-06-10 19:42:25.312  INFO 104744 --- [nio-8080-exec-1] c.d.r.controller.ProducerController      : 发送消息内容:dongmubeihai
2022-06-10 19:42:25.349 ERROR 104744 --- [08.176.110:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
2022-06-10 19:42:25.351  INFO 104744 --- [nectionFactory2] c.d.rabbitmq.config.MyConfirmCallback    : 接收机收到消息失败了,消息id是:10011,失败的原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)

可以发现发送消息失败了一样会出现提示,这样我们就可以知道消息是不是真的到达了交换机,进而做进一步的处理。但是这只是能保证生产者到交换机不会出现丢失,无法保证交换机到队列能够不丢失消息,这怎么办呢?

回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如
果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

我们继续完善前面的配置类,再继承一个接口RabbitTemplate.ReturnsCallback,重写方法

/*
        在当消息传递过程中不可达目的地时将消息返回给生产者
    */

    @Override
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) 
        log.error("消息,被交换机退回,退回的原因是:,路由routingkey是:",new String(returnedMessage.getMessage().getBody()),returnedMessage.getExchange(),
                returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    

@PostConstruct
    public void init()
		rabbitTemplate.setConfirmCallback(this);
		rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(this);
    

下面我们把routingkey改了进行测试
把前面生产者发送消息的内容改成

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData);

这样就由于routinhkey错误可以到达交换机但是无法到达队列,最后实验结果如下

以上是关于RabbitMQ发布确认高级部分的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ——发布确认高级 & 备份交换机的概念理解及应用举例

RabbitMQ——发布确认高级 & 备份交换机的概念理解及应用举例

RabbitMQ RabbitMQ高级特性

RabbitMQ RabbitMQ高级特性

RabbitMQ学习(下)——发布确认高级幂等性优先级惰性和RabbitMQ集群

RabbitMQ:发布确认高级