RabbitMQ应用Demo:支持多个消费者实例热插拔
Posted Neo Yang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ应用Demo:支持多个消费者实例热插拔相关的知识,希望对你有一定的参考价值。
需求背景
有这样一些原因需要在项目中部署多个消费者实例:
1. 基于系统的可靠性提升要求。特别是基于微服务架构的容器部署方案,微服务多实例部署是保证系统可靠性的基本要求。
2. 系统负载分担的诉求。通过多实例部署提升系统整体响应效率。
应对场景包括:1)同时启动多个消费者实例能并行处理MQ分派的消息。 2) 当其中一个消费者实例挂了后重启,能继续处理消息队列中的消息。3)动态部署增加了一个消费者实例,能立即投入到接收消息,处理消息的过程中。
方案分析
要满足上面描述的应用场景,有两点需要处理:1)自动确认消息修改为主动确认。2)“消息预取”数量设置为1.
1. 自动确认消息修改为主动确认
消息确认是消费者告知MQ服务端,收到的消息已处理完。如果收到消息就确认,消费者在处理过程中挂了,这些消息就丢失了,不会再分配给其它消费者。
修改为消费者处理完所有业务逻辑后再向MQ服务主动发送确认消息, 能保证消息不丢失。
2. “消息预取”数量设置为1
MQ服务器将收到的消息“公平”的分配给监听同一消息队列的消费者,并不知道消息处理需要的资源消耗情况。这样就可能导致一个消费者实例一直处理重任务,另一个一直处理轻任务。累的累死,闲的闲死。
“消息预取数量设置为1” 意味者消费者处理完一个任务后才会分派下一个任务,大家都有机会挑大梁。千金重担大家挑。
Demo样例代码
- 消费消息方法
/**
* 消费消息
*/
public void consumeMessage()
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.*.*.*");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("******");
try
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 一次只预取一个消息处理
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// 不自动确认,在消费完成后主动确认.
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new MQMessageHandler(channel), tag ->
);
catch (Exception e)
LOGGER.error("Consume message exception.", e);
- 消息处理类
package com.elon.consumer.service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* MQ消息消费处理类
*
* @author elon
* @since 2022-03-06
*/
public class MQMessageHandler implements DeliverCallback
private static final Logger LOGGER = LoggerFactory.getLogger(MQMessageHandler.class);
private Channel channel;
public MQMessageHandler(Channel channel)
this.channel = channel;
@Override
public void handle(String s, Delivery delivery) throws IOException
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
LOGGER.info("Receive message from queue:", message);
try
Thread.sleep(3 * 1000);
long tag = delivery.getEnvelope().getDeliveryTag();
// 处理完任务后再主动确认
channel.basicAck(tag, false);
catch (InterruptedException e)
LOGGER.error("Handle error.", e);
以上是关于RabbitMQ应用Demo:支持多个消费者实例热插拔的主要内容,如果未能解决你的问题,请参考以下文章