RabbitMQ:发布确认模式

Posted 不断前进的皮卡丘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:发布确认模式相关的知识,希望对你有一定的参考价值。

✨ RabbitMQ:发布确认模式


📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.基本介绍

生产者把信道设置成为confirm(确认)模式,一旦信道进入confirm模式,所有在这个信道上面发布的消息都会被指定唯一的一个ID(ID从1开始).一旦消息被投递到所有匹配的队列以后,broker就会发送一个确认给生产者(包含ID),这样使得生产者知道消息已经正确到底目的队列了。如果消息和队列是可持久化的,那么确认消息就会在消息被写入磁盘以后发出,broker回传给生产者的确认消息中delivery-tag包含了确认消息的序列号。

2.实现消息可靠传递的三个条件

2.1队列持久化

生产者发送消息到队列的时候,把durable参数设置为true(表示队列持久化)

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

2.2消息持久化

我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

//交换机名称,队列名称,消息持久化,消息
channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

2.3发布确认

  • 队列接收到生产者发送的数据以后,队列把消息保存在磁盘(为了实现持久化),队列会把最终的可靠性传递结果告诉给生产者,这就是发布确认。
  • 三种常用的发布确认策略:单个确认发布、批量确认发布、异步确认发布

3.发布确认模式

RabbitMQ的发布确认模式默认是没有开启的,我们可以通过调用channel.confirmSelect()方法来手动开启发布确认模式。

3.1单个确认发布模式

  • 单个确认发布模式是一种简单的同步确认发布的方式。也就是说发布一个消息以后,只要确认它被确认发布,才可以继续发布后续的消息。
  • waitForConfirms(long)这一个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认,就会抛出异常。
  • 缺点:速度慢,因为如果没有确认消息的话,后面的消息都会被阻塞
public class ConfirmMessage 
    //消息数量
    public static final int MSG_CNT=200;
    public static void main(String[] args) 
        //调用单个确认发布方法
        confirmSingleMessage();
    

    public static void confirmSingleMessage() 
        try 
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //开启确认发布
            channel.confirmSelect();
            //声明队列
            String queue = UUID.randomUUID().toString();
            //队列持久化
            channel.queueDeclare(queue, true, false, false, null);
            //发送消息
            long start= System.currentTimeMillis();
            for (int i = 0; i < MSG_CNT; i++) 
                String msg="消息:"+i;
                //发送消息,消息需要持久化
                channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
                //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                boolean flag=channel.waitForConfirms();
                if (flag)
                    System.out.println("————————第"+(i+1)+"条消息发送成功————————");
                else 
                    System.out.println("========第"+(i+1)+"条消息发送失败=========");
                

            
            //记录结束时间
            long end=System.currentTimeMillis();
            System.out.println("发布:"+MSG_CNT+"个单独确认消息,耗时:"+(end-start)+"毫秒");


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    




3.2批量确认发布模式

  • 先发布一批信息然后一起确认可以大大提高吞吐量
  • 缺点:当故障发生的时候,我们不知道是哪一个消息出现了问题,我们需要把整个批处理保存在内存中,记录重要的信息后重新发布消息
  • 这种方案仍然是同步的方式,会阻塞消息的发布
public class ConfirmMessage 
    //消息数量
    public static final int MSG_CNT = 200;

    public static void main(String[] args) 
        //调用单个确认发布方法
        //confirmSingleMessage();//发布:200个单独确认消息,耗时:192毫秒
        confirmBatchMessage();
    

    public static void confirmSingleMessage() 
        try 
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //开启确认发布
            channel.confirmSelect();
            //声明队列
            String queue = UUID.randomUUID().toString();
            //队列持久化
            channel.queueDeclare(queue, true, false, false, null);
            //发送消息
            long start = System.currentTimeMillis();
            for (int i = 0; i < MSG_CNT; i++) 
                String msg = "消息:" + i;
                //发送消息,消息需要持久化
                channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                boolean flag = channel.waitForConfirms();
                if (flag) 
                    System.out.println("————————第" + (i + 1) + "条消息发送成功————————");
                 else 
                    System.out.println("========第" + (i + 1) + "条消息发送失败=========");
                

            
            //记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布:" + MSG_CNT + "个单独确认消息,耗时:" + (end - start) + "毫秒");


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

    public static void confirmBatchMessage() 
        try 
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //开启确认发布
            channel.confirmSelect();
            //批量确认消息数量
            int batchSize=20;
            //未确认消息数量
            int nackMessageCount=0;
            //声明队列
            String queue = UUID.randomUUID().toString();
            //队列持久化
            channel.queueDeclare(queue, true, false, false, null);
            //发送消息
            long start = System.currentTimeMillis();
            for (int i = 0; i < MSG_CNT; i++) 
                String msg = "消息:" + i;
                //发送消息,消息需要持久化
                channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                //累加未确认的发布数量
                nackMessageCount++;
                //判断的未确认消息数量和批量确认消息的数量是否一致
                if (nackMessageCount==batchSize)
                    //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                    boolean flag = channel.waitForConfirms();
                    if (flag) 
                        System.out.println("————————第" + (i + 1) + "条消息发送成功————————");
                     else 
                        System.out.println("========第" + (i + 1) + "条消息发送失败=========");
                    
                    //清空未确认发布消息个数
                    nackMessageCount=0;
                
            
            //为了确认剩下的是没有确认的消息,所以要再次进行确认
            if (nackMessageCount>0)
                //再次重新确认
                channel.waitForConfirms();
            
            //记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布:" + MSG_CNT + "个单独确认消息,耗时:" + (end - start) + "毫秒");


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    




3.3异步确认发布模式

 //异步消息发布确认
    public static void publishMessageAsync() throws Exception 
        Channel channel = ConnectUtil.getChannel();
        //声明队列,此处使用UUID作为队列的名字
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        //开启发布确认模式
        channel.confirmSelect();
        //创建ConcurrentSkipListMap集合(跳表集合)
        ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
        //确认收到消息回调函数
        ConfirmCallback ackCallBack = new ConfirmCallback() 

            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException 
                //判断是否批量异步确认
                if (multiple) 
                    //把集合中没有被确认的消息添加到该集合中
                    ConcurrentNavigableMap<Long, String> confirmed = concurrentSkipListMap.headMap(deliveryTag, true);
                    //清除该部分没有被确认的消息
                    confirmed.clear();
                 else 
                    //只清除当前序列胡的消息
                    concurrentSkipListMap.remove(deliveryTag);

                
                System.out.println("确认的消息序列序号:" + deliveryTag);
            
        ;

        //未被确认消息的回调函数
        ConfirmCallback nackCallBack = new ConfirmCallback() 
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException 
                //获取没有被确认的消息
                String msg = concurrentSkipListMap.get(deliveryTag);
                System.out.println("发布的消息:" + msg + "未被确认,该消息序列号:" + deliveryTag);
            
        ;
        //添加异步确认监听器
        channel.addConfirmListener(ackCallBack, nackCallBack);
        //记录开始时间
        long start = System.currentTimeMillis();
        //循环发送消息
        for (int i = 0; i < MSG_CNT; i++) 
            //消息内容
            String message = "消息:" + i;
            //把未确认的消息放到集合中,通过序列号和消息进行关联
//            channel.getNextPublishSeqNo(); 获取下一个消息的序列号
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
            //发送消息
            channel.basicPublish("", queueName, null, message.getBytes());

        
        //记录结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MSG_CNT+"个批量确认消息,一共耗时:"+(end-start)+"毫秒");


    

以上是关于RabbitMQ:发布确认模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ之持久化不公平分发 预取值发布确认模式

RabbitMQ:发布确认模式

消息队列RabbitMQ核心:简单(Hello World)模式队列(Work Queues)模式发布确认模式

如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?

RabbitMQ confirm的确认监听模式

RabbitMQ - 发布确认