mq消费者数量如何定义
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mq消费者数量如何定义相关的知识,希望对你有一定的参考价值。
参考技术A mq 消费者数量是监听器监听的,我们就可以在不重启项目时,调整消费者数量。 通过web动态设置消费者数量 @Resource RabbitListenerEndpointRegistryRabbit MQ 消息确认 ACK 解析,消息确认详解 Rabbit MQ
文章目录
Rabbit MQ 消息确认 ACK 解析,消息确认详解 Rabbit MQ
如果觉得本文对你有帮助,可以一键三连支持,谢谢
相关阅读
Related Reading
Docker - rabbitmq 镜像详解,以及rabbitmq 的运行指令
Related Reading
Docker - rabbitmq常用的管理脚本
Related Reading
Docker - 搭建rabbitmq 集群
Related Reading
Docker - 创建并运行rabbitmq镜像
传送门
Portal
Rabbit MQ 官网
Portal
Rabbit MQ 官方文档
Process
如何确认发送的 MQ 消息是否发送成功
可以通过实现 ConfirmCallback
接口来实现
消息会在发送到 Broker
后触发回调,可以确认消息是否正确到达 Exchange
中
我们编写接口实现
public static final RabbitTemplate.ConfirmCallback CONFIRM_CALLBACK = (correlationData, ack, cause) ->
{
if (ack) {
log.info("MQ 发送消息 [SUCCESS],消息唯一标志 {}", correlationData);
} else {
log.error("MQ 发送消息 [FAIL],消息唯一标志 {} , 原因 {}", correlationData, cause);
}
};
然后在配置文件中开启配置
spring:
rabbitmq:
publisher-confirms: true
然后增加配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new SimpleMessageConverter());
template.setConfirmCallback(CONFIRM_CALLBACK);
template.setMandatory(true);
return template;
}
通过实现 ReturnCallback
可以启动消息失败时触发回调
然后在配置中新增
spring:
rabbitmq:
publisher-returns: true
然后增加配置
template.setConfirmCallback(RETURN_CALLBACK);
Process
消息的消费者如何通知 MQ 消息消费成功
我们可以通过 ACK 机制通知 MQ 消息消费成功
ACK : acknowledged
ACK 机制有三种模式 ,自动确认模式 AcknowledgeMode.NONE
,根据情况确认模式 AcknowledgeMode.AUTO
,手动确认模式 AcknowledgeMode.MANUAL
我们可以通知 MQ 消息的几种形式, 确认 ack
,否认 nack
拒绝 reject
nack
的消息会重新发送和消费,reject
掉的消息会被丢弃
Spring 默认 ACK 模式为 NONE ,我们可以自己设置为手动,方式如下
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
或者我们通过 Bean 来设置开启
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
Process
如何通知 MQ 消息已经被成功消费
/**
* ACK
*
* @param channel 通道
* @param message 消息
*/
default void basicAck(Channel channel, Message message) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
其中 deliveryTag
为唯一标准 ID, 详细如下
当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver
方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel
投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
multiple
,是否批处理
为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag
小于等于传入值的所有消息
Process
MQ 消息确认模式中的自动确认细节
AUTO
模式会根据方法的执行情况来决定是否确认还是拒绝,就是决定消息到底是成功消费并删除还是
重入队列
1、如果消费过程中没有抛出异常,则自动确认
2、如果抛出 `AmqpRejectAndDontRequeueException ` 异常,则 reject 掉该消息,消息被删除
3、如果抛出 `ImmediateAcknowledgeAmqpException ` 异常,则消费者被确认
4、如果抛出其它异常,则消息被退回队列,然后继续发送给消费者,如果只有一个消费者,则会进入死循环,
有多个消费者也会导致资源极度浪费,可以通过 `setDefaultRequeueRejected` 设置进行避免
消息可靠性上面,需要注意如下的内容
对于持久化,需要注意 exchange
交换机的持久化, queue
队列的持久化, message
消息的持久化
对于消息确认,通过开启消费返回,生产者就可以知道哪些消息没有发出去,生产者
需要注意和 Broker
之间的消息确认,也需要注意消费者与 Broker
之间的消息确认
Broker
是 MQ 服务端(Server) 的
参考资料
Reference Resources
Rabbit MQ 官方文档
本文地址 https://wretchant.blog.csdn.net/article/details/117999669
博客地址 https://wretchant.blog.csdn.net/
以上是关于mq消费者数量如何定义的主要内容,如果未能解决你的问题,请参考以下文章
消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]