批量消费消息 - RabbitMQ

Posted

技术标签:

【中文标题】批量消费消息 - RabbitMQ【英文标题】:Consume messages in batches - RabbitMQ 【发布时间】:2016-12-15 03:18:39 【问题描述】:

我能够使用上述代码使用多个生产者发送到具有不同路由密钥的同一交易所的多条消息,并且能够将每条消息插入数据库。

但这会消耗过多的资源,因为消息会一个接一个地插入 DB。所以我决定去批量插入,我发现我可以设置BasicQos

在BasicQos中设置消息限制为10条后,我的预期是Console.WriteLine必须写10条消息,但并不像预期的那样。

我的期望是从队列中消耗 N 条消息并进行批量插入并成功发送 ACK 否则没有 ACK

这是我使用的一段代码。

using (var connection = factory.CreateConnection())

    using (var channel = connection.CreateModel())
    
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");

        channel.BasicQos(0, 10, false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);

        consumer.Received += (model, ea) =>
        
            try
            
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                // Insert into Database

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                Console.WriteLine(" Recevier Ack  " + ea.DeliveryTag);
            
            catch (Exception e)
            
                channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine(" Recevier No Ack  " + ea.DeliveryTag);
            
        ;

        Console.ReadLine();
    

【问题讨论】:

【参考方案1】:

BasicQos = 10 表示客户端一次只获取 10 条消息,但是当您使用它时,您一次只能看到一条消息。 在这里阅读:https://www.rabbitmq.com/consumer-prefetch.html

AMQP 指定basic.qos 方法允许你限制数量 消费时通道(或连接)上未确认的消息 (又名“预取计数”)。

对于您的范围,您必须下载消息,将其放入临时列表中,然后插入数据库。

然后你可以使用:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

void basicAck()

参数: deliveryTag - 收到的标签 AMQP.Basic.GetOk 或 AMQP.Basic.Deliver

multiple - true 表示确认 直到并包括提供的交付标签的所有消息;假 只确认提供的交货标签。

示例

final List<String> myMessagges = new ArrayList<String>();
        channel.basicConsume("my_queue", false, new DefaultConsumer(channel) 

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                myMessagges.add(new String(body));
                System.out.println("Received...");

                if (myMessagges.size() >= 10) 
                    System.out.println("insert into DB...");
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    myMessagges.clear();
                


            
        );

【讨论】:

谢谢。这里“ea”的范围在事件consumer.Received中,那么在将所有消息插入DB后如何进行ACK? 好吧,只是不要为每条消息执行basicAck,而是使用标志multiple = true在每x条消息中执行它 如果我理解正确的话,在consumer.Received的范围内,如果我设置'multiple = true',那么只有在BasicQos中设置的10条消息全部消费完后才会发送ACK。很抱歉再次打扰。如果您能给我一些代码示例,将会很有帮助。提前致谢! @Gabriele 如果 10 条消息中的任何一条在插入数据库时​​出错,会发生什么?你将如何处理? 您将如何处理此模型的错误?如果没有错误,ack where multiple = true 很好。但是,如果我不能只为某些人做一个 ack 并为其他人做一个 nack,我看不出我将如何同步它。【参考方案2】:

可以使用 channel.basicQos() 来完成基于批量大小的消耗。

Channel channel = connection.createChannel();
channel.basicQos(10);

它指定在不发送ACK的情况下获取的最大消息数。

使用 DefaultConsumer 类并覆盖其方法。

Consumer batchConsumer = new DefaultConsumer(channel) 

  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
 
  

  @Override
  public void handleCancelOk(String consumerTag) 
 
  
;

使用 channel.basicConsume() 消费 10 条消息

channel.basicConsume(QUEUE_NAME, false, batchConsumer);

调用 channel.basicConsume() 时,它将获取一批 10 条消息。 'false' 设置为禁用自动确认,并且在消耗整个批次后仅发送一次 ACK。

channel.basicAck(getLastMessageEnvelope().getDeliveryTag(), true);

这里的“真”表示我们正在为多条消息发送 ACK。

详细解释见

RabbitMQ Batch Consumption

【讨论】:

以上是关于批量消费消息 - RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ(06)——消息的批量发送和消费

RabbitMQ拉模式批量消费消息

RabbitMQ中有大批量的消息,此时多个消费者同时访问消息队列是怎样取里面的消息的?

RocketMQ(06)——消息的批量发送和消费

RocketMQ-quickstart(批量消费)

rabbitmq redis