RabbitMQ----消息可靠性传递

Posted 未来.....

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ----消息可靠性传递相关的知识,希望对你有一定的参考价值。

RabbitMQ的高级特性------消息可靠性传递

  在开始之前,先想想几个问题,我们采用了生产者(channel)---->交换机------>队列中,那么如果其中生产者在向交换机发送信息是能一定发送成功吗?,该如何确认信息的状态,或者说交换机在向队列发送信息的时候发送失败,那么该如何解决。
  举个例子:你购买了一件商品,商品要通过快递到你手里,就可以理解成你就是队列, 商品要到快递站,此时你的快递就有可能出现丢失的情况(这里可以理解成生产者(channel)---->交换机),快递丢了,是不是就要告诉一下商家,我没有收到快递,此时商家就会再发一个快递过来;
  好,此时快递已经到达快递站了,快递小哥开始派送了,但是你一直没有收到快递, 就给快递小哥打电话,说我没有收到快递, 此时快递小哥又再次给你派送快递 (这里可以理解成交换机------>队列)。
上面这个例子就简单说明如果在商品运输的过程中出现丢失,那么会再次发送,直到确定你收到了快递。
此时就引入了一个一个名词,消息可靠性传递

1、消息可靠性传递

那么如何保证消息可靠性传递呢?
此时就有两个模式:Confirm 确认模式Return 退回模式

1、1 confirm 确认机制:

  1.消息的确认,是指生产者投递消息后, 如果broker收到消息,则会给我们生产者一个应答
  2.生产者进行接收应答,用来确定这条消息是否正常发送到broker, 这种方式也是可靠性投递的核心保障

接下来上代码:
创建一个spring boot项目,并在项目里面创建maven项目

在java包中创建一个启动类

@SpringBootApplication
public class ProductApp 
    public static void main(String[] args) 
        SpringApplication.run(ProductApp.class,args);
    

创建application.yml文件

server:
  port: 8001

spring:
  rabbitmq:
  	#RabbitMQ的服务器IP地址
    host: 192.168.31.33
    # 开启rabbitMQ的生产方确认模式
    # none:禁用发布确认模式
    # correlated:发布消息成功到交换机后触发回调方法
    publisher-confirm-type: correlated

这里只需要操作生产者(发送方)
为了方便测试,在测试类中创建测试单元
此时的交换机和队列需要使用现有的,如果没有【点这里】,有代码可以直接拿着用,去创建交换机和队列,我采用的是路由模式
提示:需要保证测试的交换机所绑定的队列是不能有消息的,否则里面的输出语句是不会被执行的。

@SpringBootTest
public class TestRabbitMq 

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 保证发送方到交换机的可靠性
     *   1、开启confirm模式 默认是关闭的
     *   2、设置rabbitTemplate的确认回调函数,如果消息到达交换机则返回true
     *   如果消息没有到达交换机则返回一个false
     */
    @Test
    public void testConfrim()
        //确认回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() 
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) 
                //需要队列中不能存在消息,否则此语句不会输出
                System.out.println("信息是否发送到交换机"+b);
                if(b==false)//消息没有到达交换机  根据业务需求。
                    System.out.println("继续发送消息");
                    //取消订单
                
            
        );
        rabbitTemplate.convertAndSend("ban_exchange_direct","error","hello confirm");
    

如果测试发送失败是否会输出,那么要保证你所要发送交换机的名字是不存在的,这样就可以保证一定是false,同时修改发送语句。

rabbitTemplate.convertAndSend("ban_direct","","hello confirm");

1、2 Return 退回机制

  1.Return Listener 用于处理一些不可路由的消息, 也是生产端需要设置的, 我的的消息生产者,用过指定一个Exchange和Routingkey ,把消息送达到某一个对队列中去,然后我们的消费者监听队列,进行消费的处理操作;但是在某些情况下,如果我们在发送消息的时候, 当前的exchange 不存在或者指定的路由key 路由不到, 这个时候如果我们需要监听这种不可达的消息, 就要使用Return Listener;

同理,Return也是需要开启的,在刚才创建的yml文件中添加开启Return 的语句

# 开启发布者退回模式
    publisher-returns: true

同上,运行前也要保证队列中不能有信息,我采用的是路由模式的交换机。

/**
     * 退回模式
     * 1、开启退回模式
     * 2、设置RabbitTemplate的退回回调函数
     */
    @Test
    public void testReturn() 
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() 
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) 
                //只要交换机到队列失败时才会触发该方法。失败之后可以继续发送也可以取消相应的业务功能。
                System.out.println("消息从交换机到队列失败" + returnedMessage.getReplyText());
            
        );
        rabbitTemplate.convertAndSend("ban_exchange_direct", "abcd", "hello confirm2");
    

路由模式有routingKey,只需要写一个你没有绑定的key,就可以完成这个测试。

这个模式只有在发送失败的时候才会触发,如果发送成功,此时是不会有输出语句。
就拿上面的例子来说,你如果长时间没有给快递小哥打电话,那么他就认为你已经收到快递了,如果你联系了快递小哥就会触发Return 退回机制

上面所列出来的Confrim 确认模式Return 退回模式同时存在就可以保证生产者(发送方—》队列)过程中信息不会丢失。

现在要思考一个问题,如何保证队列中的信息能被消费者消费?

1、3 Consumer ACK

  • 表示消费端收到消息后的确认方式。
  • 有三种确认方式:自动确认手动确认根据异常情况确认(这种使用起来很麻烦,并且不常用,这里不做讲解)
  • 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ
    的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
  • 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

想象一下,如果队列中的信息在被消费者消费的时候,可能会出现异常,而这时你的业务功能还没有完成,那么就会导致该消息丢失。

1、3、1自动确认

在项目里再创建一个maven的comsumer项目

创建application.yml文件

server:
  port: 8001

spring:
  rabbitmq:
    host: 192.168.31.33
    listener:
      simple:
        # 表示自动确认模式
        acknowledge-mode: none

创建主启动类

@SpringBootApplication
public class ComsumerApp 
    public static void main(String[] args) 
        SpringApplication.run(ComsumerApp.class,args);
    

创建Listener消费者

@Component
public class MyListener 
    @RabbitListener(queues = "ban_queue_direct02")
    public void listeber(Message message)
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println(msg);
        //估计创建一个异常,测试是否可以消费掉队列中的信息
        int c=10/0;
        System.out.println("处理业务逻辑");
    

运行主启动类,发现报了异常,但同时也消费了队列中的信息。

这时可以理解成快递小哥把快递送到了,不管你有没有签收快递,他就认为你签收了快递,这种就是自动确认。
但是你可能因为各种原因,并没有收到这个快递,那么这个自动确认就很不好,这时就需要手动确认了。

1、3、2手动确认

修改application.yml文件

    listener:
      simple:
        # none表示自动确认模式
        # manual表示手动确认模式
        acknowledge-mode: manual

设置手动确认代码:

@Component
public class MyListener 
    @RabbitListener(queues = "ban_queue_direct02")
    public void listeber(Message message, Channel channel)throws Exception
        //获取信息的标识
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println(msg);
        try
            //手动创建异常
            int c=10/0;
            System.out.println("处理业务逻辑");
            //消费端手动确认消息
            //long deliveryTag, 表示的标识。
            // boolean multiple:是否允许多确认
            channel.basicAck(deliveryTag,true);//从队列中删除该消息。
        catch (Exception e)
            //(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。
            channel.basicNack(deliveryTag,true,true);
        
    

此时运行代码,发现会一直获取队列中的信息,队列中的信息一直等待确认,并没有被消费掉。

把异常干掉,正常的运行一次。

同时队列中的信息也被消费掉了。

1、3、3 Consumer ACK小结

  • 在RabbitListener-container标签中设置acknowledge属性,设置ack方式none:自动确认,manual:手动确认
  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,true);签收信息。
  • 如果出现异常,则在catch中调用channel.basicNack,让MQ重新发送信息。

2、消息可靠性传递的总结

  1. 保证消息从发送者到交换机的可靠性: 使用Confirm确认机制。
  2. 保证消息从交换机到队列的可靠性; 使用return回退机制。
  3. 消息在队列中的可靠性。 设置队列和消息的持久化。
  4. 保证消息从队列到消费者的可靠性。 使用消费端的手动确认机制。

以上是关于RabbitMQ----消息可靠性传递的主要内容,如果未能解决你的问题,请参考以下文章

史上最透彻的 RabbitMQ 可靠消息传输实战

如何保证消息队列的可靠性传输?

消息队列的常见问题

RabbitMQ

rabbitmq

RabbitMQ简介RabbitMQ 特点AMQP 概念生产者和消费者 Broker 服务节点Queue 队列Exchange 交换器如何保证消息的可靠性?