mq消费者数量如何定义

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mq消费者数量如何定义相关的知识,希望对你有一定的参考价值。

参考技术A mq 消费者数量是监听器监听的,我们就可以在不重启项目时,调整消费者数量。 通过web动态设置消费者数量 @Resource RabbitListenerEndpointRegistry

Rabbit MQ 消息确认 ACK 解析,消息确认详解 Rabbit MQ

Rabbit MQ 消息确认 ACK 解析,消息确认详解 Rabbit MQ


如果觉得本文对你有帮助,可以一键三连支持,谢谢

相关阅读

Related Reading Docker - rabbitmq 镜像详解,以及rabbitmq 的运行指令
Related Reading Docker - rabbitmq常用的管理脚本
Related Reading Docker - 搭建rabbitmq 集群
Related Reading Docker - 创建并运行rabbitmq镜像

传送门


Portal Rabbit MQ 官网
Portal Rabbit MQ 官方文档

Process 如何确认发送的 MQ 消息是否发送成功


可以通过实现 ConfirmCallback 接口来实现
消息会在发送到 Broker 后触发回调,可以确认消息是否正确到达 Exchange

我们编写接口实现

    public static final RabbitTemplate.ConfirmCallback CONFIRM_CALLBACK = (correlationData, ack, cause) ->
    {
        if (ack) {
            log.info("MQ 发送消息 [SUCCESS],消息唯一标志 {}", correlationData);
        } else {
            log.error("MQ 发送消息 [FAIL],消息唯一标志 {} , 原因 {}", correlationData, cause);
        }
    };

然后在配置文件中开启配置

spring:
  rabbitmq:
    publisher-confirms: true

然后增加配置

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new SimpleMessageConverter());
        template.setConfirmCallback(CONFIRM_CALLBACK);
        template.setMandatory(true);
        return template;
    }

通过实现 ReturnCallback 可以启动消息失败时触发回调

然后在配置中新增

spring:
  rabbitmq:
    publisher-returns: true

然后增加配置

template.setConfirmCallback(RETURN_CALLBACK);

Process 消息的消费者如何通知 MQ 消息消费成功


我们可以通过 ACK 机制通知 MQ 消息消费成功

ACK : acknowledged

ACK 机制有三种模式 ,自动确认模式 AcknowledgeMode.NONE ,根据情况确认模式 AcknowledgeMode.AUTO ,手动确认模式 AcknowledgeMode.MANUAL

我们可以通知 MQ 消息的几种形式, 确认 ack ,否认 nack 拒绝 reject

nack 的消息会重新发送和消费,reject 掉的消息会被丢弃

Spring 默认 ACK 模式为 NONE ,我们可以自己设置为手动,方式如下

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

或者我们通过 Bean 来设置开启

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
    return factory;
}

Process 如何通知 MQ 消息已经被成功消费



    /**
     * ACK
     *
     * @param channel 通道
     * @param message 消息
     */
    default void basicAck(Channel channel, Message message) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

其中 deliveryTag 为唯一标准 ID, 详细如下

当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver
方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel
投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

multiple ,是否批处理

为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag
小于等于传入值的所有消息

Process MQ 消息确认模式中的自动确认细节


AUTO 模式会根据方法的执行情况来决定是否确认还是拒绝,就是决定消息到底是成功消费并删除还是
重入队列

1、如果消费过程中没有抛出异常,则自动确认
2、如果抛出 `AmqpRejectAndDontRequeueException ` 异常,则 reject 掉该消息,消息被删除
3、如果抛出 `ImmediateAcknowledgeAmqpException ` 异常,则消费者被确认
4、如果抛出其它异常,则消息被退回队列,然后继续发送给消费者,如果只有一个消费者,则会进入死循环,
有多个消费者也会导致资源极度浪费,可以通过 `setDefaultRequeueRejected` 设置进行避免

消息可靠性上面,需要注意如下的内容
对于持久化,需要注意 exchange 交换机的持久化, queue 队列的持久化, message 消息的持久化
对于消息确认,通过开启消费返回,生产者就可以知道哪些消息没有发出去,生产者
需要注意和 Broker 之间的消息确认,也需要注意消费者与 Broker 之间的消息确认

Broker 是 MQ 服务端(Server) 的


参考资料

Reference Resources Rabbit MQ 官方文档

本文地址 https://wretchant.blog.csdn.net/article/details/117999669
博客地址 https://wretchant.blog.csdn.net/

以上是关于mq消费者数量如何定义的主要内容,如果未能解决你的问题,请参考以下文章

mq中如何保证消费者顺序消费

消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]

消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]

RabbitMQ面试问题

2020-12-25:MQ中,如何保证消息的顺序性?

3、rabbitmq如何保证消息不被重复消费