RabbitMQ应用Demo:支持多个消费者实例热插拔

Posted Elon.Yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ应用Demo:支持多个消费者实例热插拔相关的知识,希望对你有一定的参考价值。

需求背景

有这样一些原因需要在项目中部署多个消费者实例:
1. 基于系统的可靠性提升要求。特别是基于微服务架构的容器部署方案,微服务多实例部署是保证系统可靠性的基本要求。
2. 系统负载分担的诉求。通过多实例部署提升系统整体响应效率。

应对场景包括:1)同时启动多个消费者实例能并行处理MQ分派的消息。 2) 当其中一个消费者实例挂了后重启,能继续处理消息队列中的消息。3)动态部署增加了一个消费者实例,能立即投入到接收消息,处理消息的过程中。

方案分析

要满足上面描述的应用场景,有两点需要处理:1)自动确认消息修改为主动确认。2)“消息预取”数量设置为1.

1. 自动确认消息修改为主动确认
消息确认是消费者告知MQ服务端,收到的消息已处理完。如果收到消息就确认,消费者在处理过程中挂了,这些消息就丢失了,不会再分配给其它消费者。

修改为消费者处理完所有业务逻辑后再向MQ服务主动发送确认消息, 能保证消息不丢失。

2. “消息预取”数量设置为1
MQ服务器将收到的消息“公平”的分配给监听同一消息队列的消费者,并不知道消息处理需要的资源消耗情况。这样就可能导致一个消费者实例一直处理重任务,另一个一直处理轻任务。累的累死,闲的闲死。

“消息预取数量设置为1” 意味者消费者处理完一个任务后才会分派下一个任务,大家都有机会挑大梁。千金重担大家挑

Demo样例代码

  1. 消费消息方法
    /**
     * 消费消息
     */
    public void consumeMessage() 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.*.*.*");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("******");

        try 
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 一次只预取一个消息处理
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);

            // 不自动确认,在消费完成后主动确认.
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, new MQMessageHandler(channel), tag -> 
            );
         catch (Exception e) 
            LOGGER.error("Consume message exception.", e);
        
    
  1. 消息处理类
package com.elon.consumer.service;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * MQ消息消费处理类
 *
 * @author elon
 * @since 2022-03-06
 */
public class MQMessageHandler implements DeliverCallback 
    private static final Logger LOGGER = LoggerFactory.getLogger(MQMessageHandler.class);

    private Channel channel;

    public MQMessageHandler(Channel channel) 
        this.channel = channel;
    

    @Override
    public void handle(String s, Delivery delivery) throws IOException 
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        LOGGER.info("Receive message from queue:", message);
        try 
            Thread.sleep(3 * 1000);
            long tag = delivery.getEnvelope().getDeliveryTag();

            // 处理完任务后再主动确认
            channel.basicAck(tag, false);
         catch (InterruptedException e) 
            LOGGER.error("Handle error.", e);
        
    


以上是关于RabbitMQ应用Demo:支持多个消费者实例热插拔的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ应用Demo:支持多个消费者实例热插拔

RabbitMQ 跨多个队列的多个消费者 - 消息延迟处理

springboot rabbitMQ demo

rabbitmq入门

RabbitMQ消费者demo

rabbitMq与spring boot搭配实现监听