RabbitMQ消费方式汇总

Posted duanjt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消费方式汇总相关的知识,希望对你有一定的参考价值。

在学习本章节前,请先学习之前的章节:
Java访问RabbitMQ:https://www.cnblogs.com/duanjt/p/10057330.html
RabbitMQ消息发布时的权衡:https://www.cnblogs.com/duanjt/p/10075308.html


一、推送Consume


前面我们使用到的都是这种模式,注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者。这种方式效率最高最及时。
核心代码如下:

// 接收消息,第二个参数表示是否自动应答
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到数据:" + new String(body));
    }
});

 

 

二、拉取Get


属于一种轮询模型,发送一次get请求,获得一个消息。如果此时RabbitMQ中没有消息,会获得一个表示空的回复。总的来说,这种方式性能比较差,很明显,每获得一条消息,都要和RabbitMQ进行网络通信发出请求。而且对RabbitMQ来说,RabbitMQ无法进行任何优化,因为它永远不知道应用程序何时会发出请求。
核心代码如下:

while(true){
    //如果没有消息,将返回null
    GetResponse getResponse = channel.basicGet(queueName, true);    
    if(null!=getResponse){
        System.out.println("received["+getResponse.getEnvelope().getRoutingKey()+"]"+new String(getResponse.getBody()));
    }
    Thread.sleep(1000);
}

 

 

三、自动确认


方法channel.basicConsume和方法channel.basicGet表示同步或异步获取消息,第二个参数都表示是否自动确认。前面我们都设置为了true。这个时候我们只需要处理逻辑,将自动向RabbitMQ进行确认。
当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。

 

四、手动确认

 

// 接收消息手动确认,第二个参数表示是否自动应答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到数据:" + new String(body));
        //手动确认,第一个参数是消息标识,第二个参数表示是否批量确认。这里是一条一条确认,所以设置false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 

 

五、QoS预取模式


在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)
核心代码:

//参数1表示限制条数,参数2 true=channel,false=消费者
channel.basicQos(100, true);

// 接收消息手动确认,第二个参数表示是否自动应答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到数据:" + new String(body));
        //手动确认,第一个参数是消息标识,第二个参数表示是否批量确认。这里是一条一条确认,所以设置false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 

 

注意:
1.消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于consume和channel的粒度进行设置(global)














以上是关于RabbitMQ消费方式汇总的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合RabbitMQ实现死信队列

从rabbitmq消费消息并通过其客户端连接转发它们的“扭曲”方式是啥?

-RabbitMQ之Spring客户端源码

Kafka如何保证消息顺序消费

RabbitMQ笔记08消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认消费者消息确认消息持久化)

Spring rabbitMq 中 correlationId或CorrelationIdString 消费者获取为null的问题