怎样保证Rabbitmq的可靠性?

Posted 张子行的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎样保证Rabbitmq的可靠性?相关的知识,希望对你有一定的参考价值。

所思

最近在公司开发遇到这样一个需求:如何实现定时下单的?并且可以随时中断、重启、删除这个任务,我当时脑子里想到了如下几种技术实现

  1. @Scheduling注解+Cron 表达式这些技术了哈哈哈哈哈,可能很多小伙伴也会这样吧,但是后来仔细想想,这种方式只会定时的去执行任务但是不会中断呀,更别提去实现中断、重启、删除指定任务的需求了,直接 Pass
  2. 然后我又想到了可以借助 消息队列 来实现。用户新建一个定时任务,先对任务进行入库操作,然后异步发送任务消息(设置过期时间)给MQ的死信队列,死信队列中消息超时后会自动转发至其他队列中,监听其他队列的 Mq 监听器就可以消费到这条消息了,执行我们的业务逻辑。这样定时下单任务就实现了。定时任务的删除也好办,要删除任务时,直接干掉 Mq 中的消息就可以了,这样监听器就消费不到这条消息了。但是中断重启任务用 Mq 实现着实有点费劲,直接 Pass
  3. 借助 Redis 来实现,可以参考我早期写的这篇文章 redis实现延时队列
  4. 最终我还是妥协了,使用了一个专门处理任务的框架 Quatz 去实现上述需求

正文

我在利用 Rabbitmq 来实现开文中的需求,并写测试Demo时候思考了如下几个问题,这也是我为什么写下本文的原因,如果这三个问题在我们的项目中都有对应的处理机制,这个项目还是很健壮的

  1. 如果消息发送失败怎么办?
  2. 如果一条消息被消费失败了又怎么办?
  3. 如果 Rabbitmq 服务挂掉了怎么办?

MQ消息确认机制

首先就第一个问题展开讨论,假如我们再向MQ发送一条消息的时候,由于种种原因导致这条消息没有被MQ所接收,显然这个流程是非正常的。那么我们该怎么去保证我们所发的消息一定会到达消息队列呢?其实 Rabbitmq 给客户端提供了一种消息确认的机制,我们可以通过 ConfirmCallback、ReturnCallback
接口实现对消息的发送过程进行一个实时的监听,如果消息成功到达 MQ,MQ会对该条消息打一个标记: ACK = true 返回给我们(意味着消息发送成功),我们通过这个标记就可以对消息是否成功发送做后续的处理操作。

理论就到这里现在来实操一下吧,首先加入RabbitMq的依赖

   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

然后开启对相关回调函数的支撑

spring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest
    port: 5672
    listener:
      simple:
        #开启消息手动 ACK
        acknowledge-mode: manual
        #开启消息到达交换机没有到达队列会触发回调方法
    publisher-returns: true
    #消息到交换机会触发回调方法
    publisher-confirm-type: correlated

编写相关的测试代码,其中读者也可以自行将 ConfirmCallback、ReturnCallback接口单独实现成一个类,个人喜欢写在一个类里面。值得注意的是,下面这段代码具备发送消息失败会进行重试三次发送消息,并且会对重试发送消息的原因记录到日志里面,如果消息始终发送到MQ失败,修改库中的定时任务的状态为失效。各位读者可以根据自己的业务逻辑替换即可

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class NewsImpl implements RabbitTemplate.ConfirmCallback, NewsService, RabbitTemplate.ReturnCallback {
    private AtomicInteger i = new AtomicInteger(0);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void senMsgToRabbitMq(Order data) {
        String CorrelationData = JSONObject.toJSON(data).toString();
        rabbitTemplate.setExchange("cdTopicExchange");
        rabbitTemplate.setRoutingKey("user.dead");
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启消息未到达队列时,触发ReturnCallback
        rabbitTemplate.setMandatory(true);
        //指定消息到达交换机会触发的回调函数
        rabbitTemplate.setConfirmCallback(this);
        //指定消息到达交换机、但是没有到达队列时的回调函数
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.convertAndSend(data, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties msgProperties = message.getMessageProperties();
                msgProperties.setCorrelationId("2");
                //设置过期时间:4s
                msgProperties.setExpiration("4000");
                //设置持久化
                msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
        }, new CorrelationData(CorrelationData));
    }

    @SneakyThrows
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println();
        if (ack) i.compareAndSet(i.get(), 0);
        else {
            if (i.get() < 3) {
                Thread.sleep(1000);
                Order data = JSONObject.parseObject(correlationData.getId(), Order.class);
                System.err.println(i.get()+"重新发送下单消息给MQ:" + data);
                this.senMsgToRabbitMq(data);
                i.compareAndSet(i.get(), i.get() + 1);
                System.err.println(cause);
            } else {
                System.err.println("rabbitmq始终接受不到消息,将错误日志发送给管理员");
                System.err.println("修改定时任务状态为:失效");
            }
        }
    }


    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息主体 message : " + message);
        System.out.println("消息主体 message : " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }
}

当MQ中交换机都没有的时候,测试如下触发ConfirmCallback回调函数,会重试三次发送消息

当MQ存在对应的交换机,但是对应的队列不存在的时候,测试结果如下,会触发ReturnCallback 回调函数

MQ消息消费确认

就第二个问题来讨论,关于消费消息失败的问题,其实 MQ 自带消息消费 ACK 机制,如果我们项目中没有对 MQ 额外的配置,默认是自动进行 ACK,所谓的 ACK 就是对消息的签收,下面来介绍常用的
API介绍。

channel.basicNack(deliveryTag, multiple, requeue);

参数一:当前消息的一个标记、类型Long(递增)
参数二:是否批量处理数据、(<=当前消息标记的数据)
参数三:是否需要重新放回队列

举个例子:
channel.basicNack(6, false, false); 的意思就是丢弃6消息
channel.basicNack(6, true, false); 的意思就是丢弃6以及6之前的消息
channel.basicNack(6, false, true); 的意思就是将6消息重新放入队列
channel.basicNack(6, true, true); 的意思就是将6以及6之前的消息重新放入队列

//可以选择是否签收当前消息、签收<=当前消息标记的所有消息
channel.basicAck(deliveryTag, multiple);
//阉割了批处理、其他和basicNack用法一致
channel.basicReject(deliveryTag, requeue);

实战写法:下单失败会进行重试三次下单、记得 finally 一定要对消息进行处理,不然会产生MQ消息堆砌的问题

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
@Data
@Slf4j
@Component
public class TopicCustomer {
    /**
     * 监听延时任务  NewsImpl.senMsgToRabbitMq(order.setPrice(100),200L);
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "cTopicQueue"),
                    exchange = @Exchange(value = "cTopicExchange", type = "topic"),
                    key = "user.live")
    })
    public void receivel(String msg, Channel channel, Message message) throws IOException {
        Boolean flag = false;
        try {
            for (int i = 0; i < 3; i++) {
                try {
                    System.err.println("第: " + i + "次下订单:" + msg);
                    flag = false;
                    if (flag) {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        System.err.println("第: " + i + "次下订单:" + msg + "成功");
                        break;
                    }
                    System.err.println("第: " + i + "次下订单:" + msg + "失败");
                } catch (Exception e) {
                    System.err.println("下单过程中出现异常。开始重试三次下单" + e.getCause());
                    continue;
                }
            }
        } catch (Exception e) {
            System.err.println("定时任务执行失败");
        } finally {
            if (!flag) channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.err.println("丢弃消息");
        }
    }

MQ消息持久化

就开篇第三个问题来分析,如果 Rabbitmq 服务挂掉了怎么办?其实服务挂了我们能做到最大的补救措施就是,快速重启服务、然后恢复生产数据。想明白了这点就很好办了。在初始化我们的交换机、队列、消息的时候通通设置持久化。

@Configuration
public class rabbitmqConfig {
    @Bean
    public TopicExchange cdTopicExchange() {
        TopicExchange topicExchange = new TopicExchange("cdTopicExchange", true, false, null);
        return topicExchange;
    }

    /**
     * 死信队列
     * 消息超时发送给cTopicExchange,相当于转发。指定路由key、以及交换机
     */
    @Bean
    public Queue cdTopicQueue() {
        HashMap<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", "cTopicExchange");
        params.put("x-dead-letter-routing-key", "user.live");
        Queue queue = new Queue("cdTopicQueue", true, false, false, params);
        return queue;
    }

    @Bean
    public Binding cdBind() {
        Binding binding = BindingBuilder.bind(cdTopicQueue()).to(cdTopicExchange()).with("user.dead");
        return binding;
    }


    @Bean
    public TopicExchange cTopicExchange() {
        TopicExchange topicExchange = new TopicExchange("cTopicExchange", true, false, null);
        return topicExchange;
    }

    @Bean
    public Queue cTopicQueue() {
        Queue queue = new Queue("cTopicQueue", true, false, false);
        return queue;
    }

    @Bean
    public Binding cBind() {
        Binding binding = BindingBuilder.bind(cTopicQueue()).to(cTopicExchange()).with("user.live");
        return binding;
    }
 }

全文 OVER -----------我现在是一名大四的学生啦哈哈哈哈哈,文章如有写得欠佳的地方欢迎大家斧正,

以上是关于怎样保证Rabbitmq的可靠性?的主要内容,如果未能解决你的问题,请参考以下文章

Java面试——RabbitMQ系列总结

Java面试——RabbitMQ系列总结

RabbitMQ学习总结(10)—— RabbitMQ如何保证消息的可靠性

如何保证消息队列的可靠性传输?

rabbitmq如何保证消息可靠性

RabbitMq高级之如何保证消息发送可靠性