RabbitMQ发布确认和交换机基础总结与实战

Posted 崇尚学技术的科班人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ发布确认和交换机基础总结与实战相关的知识,希望对你有一定的参考价值。

📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《RabbitMQ发布确认和交换机基础总结与实战》🍣
🍣这是RabbitMQ的发布确认和交换机基础总结与实战🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗

🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣源码地址🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣

1、发布确认

1.1、发布确认的引出

一个消息的持久化需要经历的步骤:

  1. 设置要求队列持久化。
  2. 设置要求队列中的消息必须持久化。
  3. 发布确认
  • 如果缺少了发布确认的话,那么消息在磁盘上持久化之前会发生丢失,从而不能满足消息持久化的目的。

1.2、发布确认的策略

1.2.1、开启发布确认的方法

Channel channel = RabbitmqUtil.getChannel();
//开启发布确认
channel.confirmSelect();
  • 发布确认默认是没有开启的,如果需要开启需要调用confirmSelect,每当需要使用发布确认的时候,都需要调用该方法。

1.2.2、单个确认发布

  • 单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布
  • 该确认方式主要通过waitForConfirms方法实现,这个方法只有在消息被确认的时候才会返回,如果在指定时间范围内这个消息没有被确认那么它将会抛出异常。
  • 这种确认方式的最大的缺点就是:发布速度特别慢
public static void ConfirmMessageIndividually() throws Exception
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 进行单个发布确认
            boolean flag = channel.waitForConfirms();
            if(flag)
                System.out.println("消息发送成功");
            
        

        long end = System.currentTimeMillis();

        System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
    

1.2.3、批量确认发布

  • 先发布一批消息然后一起确认。
  • 缺点:当发生故障导致发布出现问题时,不知道那个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的消息而后重新发布消息。
    public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException 
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        // 批量处理消息的个数
        int batchSize = 100;

        for (int i = 1; i <= MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 进行批量发布确认
            if(i % batchSize == 0)
                channel.waitForConfirms();
                System.out.println("批量处理消息成功");
            
        

        long end = System.currentTimeMillis();

        System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");

    

1.2.4、异步确认发布

原理

  • 异步确认发布是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

代码

    public static void ConfirmMessageAsync() throws Exception
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();


        ConfirmCallback ackCallback = (var1,var2)->
            System.out.println("已确认的消息" + var1);
        ;


        ConfirmCallback nackCallback = (var1,var2)->
            System.out.println("未确认的消息" + var1);
        ;


        /**
         * 1. 确认收到消息的回调函数
         * 2. 未确认收到消息的回调函数
         */

        channel.addConfirmListener(ackCallback,nackCallback);

        for (int i = 1; i <= MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        

        long end = System.currentTimeMillis();

        System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
    

1.2.5、如何处理异步未确认消息

  • 最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说ConcurrentSkipListMap
    public static void ConfirmMessageAsync() throws Exception
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();



        /**
         * 1. 消息标识
         * 2. 是否批量处理
         */
        ConfirmCallback ackCallback = (var1,var2)->
            if(var2)
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
                longStringConcurrentNavigableMap.clear();
            else
                map.remove(var1);
            
            String message = map.get(var1);
            System.out.println("已确认的消息是:" + message + "     已确认的消息tag:" + var1);
        ;


        ConfirmCallback nackCallback = (var1,var2)->
            // 未确认的消息
            String s = map.get(var1);
            System.out.println(s);
            System.out.println("未确认的消息" + var1);
        ;


        /**
         * 1. 确认收到消息的回调函数
         * 2. 未确认收到消息的回调函数
         */

        channel.addConfirmListener(ackCallback,nackCallback);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 1. 将消息保存到一个线程安全地队列中
            map.put(channel.getNextPublishSeqNo(),message);
        

        long end = System.currentTimeMillis();

        System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
    

1.2.6、以上3种发布确认的速度对比

  • 单个发布确认:同步等待确认,简单,但吞吐量非常有限。
  • 批量确认发布:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪一条消息出现了问题。
  • 异步确认发布:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.xiao.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class ConfirmMessage 

    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception 
        // 单个发布确认
        ConfirmMessageIndividually(); // 单个确认发送1000条消息所消耗的时间是680ms

        // 批量发布确认
        ConfirmMessageBatch(); //批量确认发送1000条消息所消耗的时间是112ms

        //异步发布确认
        ConfirmMessageAsync(); // 异步确认发送1000条消息所消耗的时间是41ms
                                // 异步确认发送1000条消息所消耗的时间是33ms
    

    public static void ConfirmMessageIndividually() throws Exception
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 进行单个发布确认
            boolean flag = channel.waitForConfirms();
            if(flag)
                System.out.println("消息发送成功");
            
        

        long end = System.currentTimeMillis();

        System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
    

    public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException 
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        // 批量处理消息的个数
        int batchSize = 100;

        for (int i = 1; i <= MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 进行批量发布确认
            if(i % batchSize == 0)
                channel.waitForConfirms();
                System.out.println("批量处理消息成功");
            
        

        long end = System.currentTimeMillis();

        System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");

    

    public static void ConfirmMessageAsync() throws Exception
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();

        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();



        /**
         * 1. 消息标识
         * 2. 是否批量处理
         */
        ConfirmCallback ackCallback = (var1,var2)->
            if(var2)
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
                longStringConcurrentNavigableMap.clear();
            else
                map.remove(var1);
            
            String message = map.get(var1);
            System.out.println("已确认的消息是:" + message + "     已确认的消息tag:" + var1);
        ;


        ConfirmCallback nackCallback = (var1,var2)->
            // 未确认的消息
            String s = map.get(var1);
            System.out.println(s);
            System.out.println("未确认的消息" + var1);
        ;


        /**
         * 1. 确认收到消息的回调函数
         * 2. 未确认收到消息的回调函数
         */

        channel.addConfirmListener(ackCallback,nackCallback);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= MESSAGE_COUNT; i++) 
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 1. 将消息保存到一个线程安全地队列中
            map.put(channel.getNextPublishSeqNo(),message);
        

        long end RabbitMQ基础总结

RabbitMQ基础总结

RabbitMQ基础总结

RabbitMQ学习总结

Java SpringBoot集成RabbitMq实战和总结

Java SpringBoot集成RabbitMq实战和总结