手动的进行消息应答

Posted 杀手不太冷!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手动的进行消息应答相关的知识,希望对你有一定的参考价值。

消息应答

概念

现在有很多消费者线程同时访问RabbitMQ消息队列,如果其中的一个消费者取到RabbitMQ消息队列的一个消息之后,突然挂掉了,那么RabbitMQ消息队列中的这条消息是不是就丢失了呢?我们该怎么解决这个问题呢?

我们引入了消息应答机制,当消费者从RabbitMQ消息队列中取出消息并且成功的处理了之后,消费者会告诉RabbitMQ消息队列,说“我已经成功的处理了这条消息,你的这条消息可以删除了。”然后RabbitMQ消息队列就会把它里面的这条对应的消息给删除掉。

消息应答的方法

Channel.basicAck()调用该方法之后,消费者就告诉了RabbitMQ消息队列说“你的消息我已经成功处理了,现在你可以把这条信息给删除了”。

Channel.basicNack()和Channel.basicReject()这两个方法的作用都是告诉RabbitMQ消息队列说"你的消息我没有成功处理,所以你不要把这条信息给删除"。

应答方式分为两种:一种是自动应答,另外一种是手动应答。

如果是自动应答,可能会出现bug,为什么呢?因为自动应答就表示只要消费者一取到消息队列中的消息,就表示处理消息成功,RabbitMQ消息队列中就会删除掉这条消息。但是问题是,消费者取到消息之后,后面的代码才是处理消息的代码,如果后面的代码出现异常,那么这条消息就丢失了,所以我们工作中要使用非自动应答的方式,也即是手动应答的方式,即什么时候消费者处理消息成功,然后手动的告诉RabbitMQ消息队列“我已经处理信息成功了”。

"如果手动进行消息应答RabbitMQ中的消息不会丢失"的测试思路

首先需要一个生产者,这个生产者可以通过控制台连续的往RabbitMQ的消息队列中存放信息。然后需要两个消费者线程,一个消费者线程从RabbitMQ消息队列中取到消息的速度比较快是1s,另外一个消费者线程从RabbitMQ消息队列中取到消息的速度比较慢是100s,如果我们这这100s之内,把这个消费者线程宕机,那么这条消息就会被另外一个消费者取到,因为RabbitMQ消息队列中没有删除这条消息。

生产者代码

/**
 * @Date 2021/11/10 19:14
 * @Author 望轩
 *
 *
 */
public class Task02 {
    //队列名称
    public static final String QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //从控制台当中接收信息
        Scanner scanner=new Scanner(System.in);
        while(scanner.hasNext()){
            String message=scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送消息完成"+message);
        }
    }
}

消费者代码

/**
 * @Date 2021/11/10 19:21
 * @Author 望轩
 */
public class Worker02 {
    //队列名称
    public static final String QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("182.92.210.39");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("C2等待接收消息处理时间较长");

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("消费者接收到的消息:"+new String(message.getBody()));
            //手动应答
            /**
             * 1.消息的标记tag,也就是我们是手动应答的消息队列中的哪个消息
             * 2.是否批量应答,false:不批量应答消息队列中的消息  true:批量应答消息队列中的消息
             *
             * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println(consumerTag+"消息消费被中断");
        };

        //采用手动应答:这里的第二个参数为false表示是使用手动应答的方式
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

    }
}

上面是执行的获取消息比较快的消费者线程,另外一个线程也好获取,只要改上面代码的两个地方就行了,一个地方是把C2改成C1,另外一个地方是把睡眠时间从100s改成1s,然后重新启动一下这个main方法就能重新启动一个线程了,但是有条件,我们必须要给Worker02勾选一个允许多个线程同时运行的选项,如下图:

测试

启动生产者

首先启动生产者,如下图:

生产者启动之后,在RabbitMQ消息队列中会多出一个ack_queue队列,如下图:

启动消费者

然后启动两个消费者线程,一个消费者获取信息比较快是1s,另外一个消费者获取信息比较慢是100s,如下图:

生产者生产两个消息看看消费者的接收情况

在生产者生产两条消息到RabbitMQ消息队列,按道理来说这两条消息会轮询地发送到两个消费者线程中,但是我们故意在一个消费者线程处理这条消息的时候,把这个消费者线程down机,那么这两条消息全部都会发送给同一个消费者线程。如下图:

以上是关于手动的进行消息应答的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ消息手动应答(结果成功)

RabbitMQ:工作队列模式

RabbitMQ 消息队列学习

RabbitMQ——消息手动应答队列/消息持久化不公平分发预取值的概念理解及应用举例

RabbitMQ——消息手动应答队列/消息持久化不公平分发预取值的概念理解及应用举例

RabbitMQ消息队列笔记