RabbitMQ学习(下)——发布确认高级幂等性优先级惰性和RabbitMQ集群

Posted AC_Jobim

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习(下)——发布确认高级幂等性优先级惰性和RabbitMQ集群相关的知识,希望对你有一定的参考价值。

一、发布确认高级

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producer 到 exchange 则会返回一个confirmCallback
  • 消息从 exchange–>queue 投递失败则会返回一个returnCallback

我们将利用这两个 callback 控制消息的可靠性投递


消息的可靠投递小结:

  • 通过spring.rabbitmq.publisher-confirm-type属性设置发布确认的类型

    • NONE 值是禁用发布确认模式,是默认值
    • CORRELATED 值是发布消息成功到交换器后会触发回调方法
    • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

    使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  • 设置ConnectionFactory的publisher-returns=“true” 开启退回模式

    使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

1.1 confirm 确认模式

代码架构图:

  1. 配置文件,设置spring.rabbitmq.publisher-confirm-type=correlated

    spring.rabbitmq.host=192.168.2.4
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    # 发布消息成功到交换器后会触发回调方法
    spring.rabbitmq.publisher-confirm-type=correlated
    
  2. 添加配置类

    /**
     * 配置类,发布确认(高级)
     */
    @Configuration
    public class ConfirmConfig {
    
        //交换机
        public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
        //RoutingKey
        public static final String CONFIRM_ROUTING_KEY = "key1";
    
        //声明交换机
        @Bean
        public DirectExchange confirmExchange() {
            return new DirectExchange(CONFIRM_EXCHANGE_NAME);
        }
    
        //声明队列
        @Bean
        public Queue confirmQueue() {
            return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
        }
    
        //绑定
        @Bean
        public Binding queueBindingExchange(
                @Qualifier("confirmQueue") Queue confirmQueue,
                @Qualifier("confirmExchange") DirectExchange confirmExchange) {
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        }
    
    }
    
  3. 消息生产者

    /**
     * 开始发消息,测试确认
     */
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class ProducerController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        //发消息,http://localhost:8080/confirm/sendMessage/你好
        @RequestMapping("/sendMessage/{message}")
        public void sendMessage(@PathVariable("message") String message) {
            CorrelationData correlationData = new CorrelationData("1");
            String routingKey1 = "key1";
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    routingKey1,message,correlationData);
            log.info("发送消息内容:{},routingKey:{}", message,routingKey1);
    
            CorrelationData correlationData2 = new CorrelationData("2");
            String routingKey2 = "key2";
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    routingKey2,message,correlationData2);
            log.info("发送消息内容:{},routingKey:{}", message,routingKey2);
        }
    }
    
  4. 消息生产者的回调接口(重点)

    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            //给rabbitTemplate对象注入ConfirmCallback回调对象
            rabbitTemplate.setConfirmCallback(this);
        }
    
        /**
         * 交换机不管是否收到消息的一个回调方法
         * 1.发消息 交换机收到了
         *  1.1 correlationData 保存回调消息的ID及相关信息,注意该参数默认为null,其值需要发送方发送消息的时候进行设置
         *  1.2 交换机收到消息 true
         *  1.3 cause:null
         * 2.发消息,交换机接收失败了
         *  2.1 correlationData 保存回调消息的ID及相关信息
         *  2.2 交换机未收到消息 false
         *  2.3 cause:失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData != null ? correlationData.getId() : "";
            if(ack){
                log.info("交换机已经收到 id 为:{}的消息",id);
            }else{
                log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
            }
        }
    }
    
  5. 执行测试

    访问:http://localhost:8080/confirm/sendMessage/你好

    可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了

    丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败


    将生产者的代码改成下面的样子:(设置发送到不存在的交换机)

    //发消息,http://localhost:8080/confirm/sendMessage/你好
    @RequestMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        CorrelationData correlationData = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"1",
                routingKey1,message,correlationData);
        log.info("发送消息内容:{},routingKey:{}", message,routingKey1);
    }
    

    访问:http://localhost:8080/confirm/sendMessage/你好

    可以看到当发送到错误的交换机时回调方法还是执行的。

1.2 return 退回模式

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

所以需要通过设置ConnectionFactory的publisher-returns=“true” 开启退回模式,就可以在当消息传递过程中不可达目的地时将消息返回给生产者。

  1. 配置文件,设置spring.rabbitmq.publisher-returns=true

    spring.rabbitmq.host=192.168.2.4
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    # 发布消息成功到交换器后会触发回调方法
    spring.rabbitmq.publisher-confirm-type=correlated
    # 开启时,当消息发送不出去的时候会回退消息
    spring.rabbitmq.publisher-returns=true
    
  2. 修改回调接口

    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
        
    	//依赖注入 rabbitTemplate 之后再设置它的回调对象
        @PostConstruct
        public void init() {
            //给rabbitTemplate对象注入ConfirmCallback回调对象
            rabbitTemplate.setConfirmCallback(this);
            //给rabbitTemplate对象注入ReturnsCallback回调对象
            rabbitTemplate.setReturnsCallback(this);
        }
    
        /**
         * 交换机不管是否收到消息的一个回调方法
         * 1.发消息 交换机收到了
         *  1.1 correlationData 保存回调消息的ID及相关信息,注意该参数默认为null,其值需要发送方发送消息的时候进行设置
         *  1.2 交换机收到消息 true
         *  1.3 cause:null
         * 2.发消息,交换机接收失败了
         *  2.1 correlationData 保存回调消息的ID及相关信息
         *  2.2 交换机未收到消息 false
         *  2.3 cause:失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData != null ? correlationData.getId() : "";
            if(ack){
                log.info("交换机已经收到 id 为:{}的消息",id);
            }else{
                log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
            }
        }
    
        //可以在当消息传递过程中不可达目的地时将消息返回给生产者
        //只有不可达目的地的时候,才进行回退
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",
                    new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey());
        }
    }
    
  3. 测试执行

    访问:http://localhost:8080/confirm/sendMessage/你好

1.3 备份交换机

  • 设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

  • 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

  • 备份交换器是为了实现没有路由到队列的消息,声明交换机的时候添加属性alternate-exchange,声明一个备用交换机,为了方便使用一般声明为fanout类型,这样交换机收到路由不到队列的消息就会发送到备用交换机,进而发送到绑定的备份队列中。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

代码架构图:

  1. 在上面代码的基础上,修改配置类

    @Configuration
    public class ConfirmConfig {
    
        //交换机
        public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
        //RoutingKey
        public static final String CONFIRM_ROUTING_KEY = "key1";
        //备份交换机
        public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
        //备份队列
        public static final String BACKUP_QUEUE_NAME = "backup_queue";
        //报警队列
        public static final String WARNING_QUEUE_NAME = "warning_queue";
    
        //声明交换机,并设置该交换机的备份交换机
        @Bean
        public DirectExchange confirmExchange() {
            return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                    .durable(true)
                    .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)//设置该交换机的备份交换机
                    .build();
        }
    
        //声明队列
        @Bean
        public Queue confirmQueue() {
            return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
        }
    
        //绑定
        @Bean
        public Binding queueBindingExchange(
                @Qualifier("confirmQueue") Queue confirmQueue,
                @Qualifier("confirmExchange") DirectExchange confirmExchange) {
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        }
    
    
        //备份交换机
        @Bean
        public FanoutExchange backupExchange() {
            return new FanoutExchange(BACKUP_EXCHANGE_NAME);
        }
    
        //备份队列
        @Bean
        public Queue backupQueue() {
            return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
        }
    
        //报警队列
        @Bean
        public Queue warningQueue() {
            return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
        }
    
        //绑定,备份队列和备份交换机绑定
        @Bean
        public Binding backupQueueBindingBackupExchange(
                @Qualifier("backupQueue") Queue backupQueue,
                @Qualifier("backupExchange") FanoutExchange backupExchange) {
            return BindingBuilder.bind(backupQueue).to(backupExchange);
        }
    
        //绑定,报警队列和备份交换机绑定
        @Bean
        public Binding warningQueueBindingBackupExchange(
                @Qualifier("warningQueue") Queue warningQueue,
                @Qualifier("backupExchange") FanoutExchange backupExchange) {
            return BindingBuilder.bind(warningQueue).to(backupExchange);
        }
    }
    
  2. 报警消费者

    /**
     * 报警消费者
     */
    @Slf4j
    @Component
    public class WarningConsumer {
    
        @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
        public void receiveWarningMsg(Message message) {
            String msg = new String(message.getBody());
            log.error("报警发现不可路由消息:{}", msg);
        }
    }
    
  3. 执行测试

访问: http://localhost:8080/confirm/sendMessage/你好,可以看到没有被路由到队列的消息被报警消费者消费了。

上面的代码mandatory 参数与备份交换机同时开启了。但最后的结果是备份交换机发现了不可路由的消息,而回退方法没有被调用。
即:备份交换机优先级高

二、幂等性、优先级、惰性

2.1 幂等性

  • 幂等性的实质是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复相同的请求而对该资源重复造成影响。注意关注的是请求操作对资源本身造成的影响,而不是请求资源返回的结果。就是保证同一条消息不会重复或者重复消费了也不会对系统数据造成异常。

RabbitMQ的幂等性

  • 拿RabbitMQ来说的话,消费者在消费完成一条消息之后会向MQ回复一个ACK(可以配置自动ACK或者手动ACK) 来告诉MQ这条消息已经消费了。假如当消费者消费完数据后,准备回执ACK时,系统挂掉了,MQ是不知道该条消息已经被消费了。所以重启之后MQ会再次发送该条消息,导致消息被重复消费,如果此时没有做幂等性处理,可能就会导致数据错误等问题。

如何避免消息的重复消费问题?

全局唯一ID + Redis

  • 生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId,1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃。

  • setnx命令,若给定的key不存在,执行set操作,返回1,若给定的Key已存在,不做任何操作,返回0。

生产者代码:

public void sendMessageIde() {
    MessageProperties properties = new MessageProperties();
    properties.setMessageId(UUID.randomUUID().toString());
    Message message = new Message("Hello RabbitMQ".getBytes(), properties);
    rabbitTemplate.convertAndSend("durable-exchange", "rabbit.long.yuan", message);
}

消费者代码:

@RabbitListener(queues = "durable-queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {
 
    if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
        // 业务操作...
        System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
 
        // 手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

2.2 优先级

  • 在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。

  • 对于上面的情况,就需要使用到优先级队列,即具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。

注意:实现优先级队列,队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费。

设置优先级队列:

  1. 通过RabbitMQ管理界面配置队列的优先级属性

  2. 通过代码去实现

    Map<String,Object> args = new HashMap<String,Object>();
    args.put("x-max-priority", 10);
    channel.queueDeclare("queue_priority", true, false, false, args);
    

配置了队列优先级的属性之后,可以在管理页面看到Pri的标记:

发送的消息中设置消息本身的优先级:

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

实战:发送10条消息,其中第5条为高优先级消息

  1. 生产者
public class PriorityProducer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        
        Map<String, Object> arguments = new HashMap<>();
        //官方允许0-255之间,此处设置10,表示允许优先级范围为0-10,不要设置过大,浪费CPU和内存
        arguments.put("x-max-priority",10);
        channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);

        for (int i = 1;i < 11 ;i++) {
            String message = "info" + i;
            if(i == 5) {
                //发送优先级为5的消息,数字越高,优先级越高
                AMQP.BasicProperties properties RabbitMQ消息中间件技术精讲10 高级篇三 幂等性保障不重复消费

rabbitmq 怎么保证幂等性,数据一致性问题

RabbitMQ--高级特性

php rabbitmq的开发体验

RabbitMQ消息幂等性问题

RabbitMQ的幂等性和集群负载均衡