RabbitMQ面试问题

Posted T_karine

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ面试问题相关的知识,希望对你有一定的参考价值。

1、MQ如何避免消息堆积,使MQ提高消费者的速度?

  提高消费者速率(集群)、消费者批量获取消息。

2、MQ如何避免消费者重复消费(幂等问题)

    基于Redis的setNx命令。

    基于数据库表字段的唯一性约束。

3、MQ服务器宕机之后,如何保证消息不丢失?

  持久化机制

4、MQ接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失?

  不会,消息确认机制。必须要消费者成功消费消息,再通知给MQ服务器端删除该消息。当消息在指定时间内,没有被消费掉,会将消息转移到死信交换机分发给死信队列,给死信消费者进行消费。

5、MQ如何保证消息顺序一致性?

  绑定同一个消费者和队列

6、MQ推与取架构模型 

  MQ 服务器与消费者建立长连接后,MQ 服务器会主动推数据给到消费者

  当消费者第一次启动的时候,会主动找MQ 服务器拉数据 

7、MQ如何实现高并发思想?

  MQ消费者根据自身能力情况 ,拉取mq服务器端消息消费。默认的情况下是取出一条消息,可根据能力设置一次获取的消息数量。

8、MQ死信队列原理?

8.1 死信队列产生的背景

       1)消息投递到MQ中存放的消息已经过期 ,消费者没有及时的消费消息,即消息存放到MQ服务器中过期,会转移到(备胎)死信队列存放。
  2)队列达到最大的长度 (队列容器已经满了),会转移到备胎死信队列存放。
  3)消费者消费多次消息失败,就会转移存放到死信队列中。

8.2 死信队列的架构原理

        死信队列和普通队列区别不是很大,都有自己独立的交换机和路由key、队列和消费者。
        1)生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到普通队列中缓存起来,普通队列对应有自己独立普通消费者。
        2)如果生产者投递消息到普通队列中时,出现8.1中的几种情况,会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机将消息分发给对应的死信(备胎)队列,再由对应的死信(备胎)消费者进行消费。

8.3 RabbitMQ应用场景

        基于RabbitMQ设计订单30分钟超时处理

        基于死信队列。消息生产者发送消息时,设置消息有效时间为30分钟,如果30分钟内消息没有被消费,消息将被转移到死信交换机,分发给死信队列,死信消费者对消息进行消费。

8.4 实现方式

1)配置文件application.yml

spring:
  rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: guest
    ####密码
    password: guest
    ### 地址
    virtual-host: /meiteVirtualHosts
server:
  port: 8080
 
###模拟演示死信队列
mayikt:
  dlx:
    exchange: mayikt_dlx_exchange
    queue: mayikt_order_dlx_queue
    routingKey: dlx
  ###备胎交换机
  order:
    exchange: mayikt_order_exchange
    queue: mayikt_order_queue
    routingKey: mayikt.order

 2)引入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>
 
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

3)配置类DeadLetterMQConfig.java

 绑定死信交换机和普通队列。默认创建交换机和队列,不需要在RabbitMQ控制台创建。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.Map;
 
@Component
public class DeadLetterMQConfig 
    /**
     * 订单交换机
     */
    @Value("$mayikt.order.exchange")
    private String orderExchange;
 
    /**
     * 订单队列
     */
    @Value("$mayikt.order.queue")
    private String orderQueue;
 
    /**
     * 订单路由key
     */
    @Value("$mayikt.order.routingKey")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("$mayikt.dlx.exchange")
    private String dlxExchange;
 
    /**
     * 死信队列
     */
    @Value("$mayikt.dlx.queue")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("$mayikt.dlx.routingKey")
    private String dlxRoutingKey;
 
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() 
        return new DirectExchange(dlxExchange);
    
 
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() 
        return new Queue(dlxQueue);
    
 
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() 
        return new DirectExchange(orderExchange);
    
 
    /**
     * 声明订单队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() 
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    
 
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() 
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    
 
 
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() 
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    
      

4)生产者

设置过期时间

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
 
@RestController
public class OrderProducer 
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 订单交换机
     */
    @Value("$mayikt.order.exchange")
    private String orderExchange;
    /**
     * 订单路由key
     */
    @Value("$mayikt.order.routingKey")
    private String orderRoutingKey;
 
    @RequestMapping("/sendOrder")
    public String sendOrder() 
        String msg = "牛逼";
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, message -> 
            // 设置消息过期时间 10秒过期
            message.getMessageProperties().setExpiration("10000");
            return message;
        );
        return "success";
    

5)死信消费者

  订单30分钟超时,即无消费者消费。

package com.mayikt.consumer;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class OrderDlxConsumer 
 
    /**
     * 死信队列监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "mayikt_order_dlx_queue")
    public void orderConsumer(String msg) 
        log.info(">死信队列消费订单消息:msg<<", msg);
    

9、RabbitMQ消息幂等性问题

9.1 在什么情况下消费者需要实现重试策略?

        1)消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?

  【调用第三方接口、网络波动问题、暂时调用不了、网络连接】

   该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。需在配置文件中设置重试次数和重试时间间隔。

        2)消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?

  该情况下是不需要实现重试策略,因为代码原因抛出异常需要重新发布版本才能解决的,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期通过定时任务或者人工补偿形式。

9.2 RabbitMQ消息自动重试机制

  当消费者执行业务代码的时候,如果抛出异常、消息未被及时消费的情况下,MQ会自动触发重试机制,默认的情况下RabbitMQ是无限次数的重试,需要人为指定重试次数(详见配置文件)。

9.3 开启重试机制(配置文件)

spring:
  rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: guest
    ####密码
    password: guest
    ### 地址
    virtual-host: /meite_rabbitmq
    listener:
      simple:
        retry:
          ####开启消费者(程序出现异常的情况下会)进行重试
          enabled: true
          ####最大重试次数
          max-attempts: 5
          ####重试间隔次数
          initial-interval: 3000

9.4 消费者如如何避免重复消费问题?

     【Mq在重试的过程中,有可能会引发消费者重复消费的问题】

  生产者在投递消息的时候,生成一个全局唯一id,放在消息中。消费者获取到我们该消息,可以根据该全局唯一id实现去重复。用全局id提前查询,如果存在的情况下,代码就无需往下执行。(不严谨、不推荐)

  【限制重试次数】设置每次重试之间要有一定间隔时间

  在数据库层面解决消息重复消费,保证数据唯一性,比如加上唯一id。

          业务逻辑是做insert操作时,使用唯一主键约束

          业务逻辑是做update操作时,使用乐观锁

代码实现

@Slf4j
@Component
@RabbitListener(queues = "fanout_order_queue")
public class FanoutOrderConsumer 

    @Autowired
    private OrderManager orderManager;
    @Autowired
    private OrderMapper orderMapper;

    @RabbitHandler
    public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException 
//        try 
        log.info(">>orderEntity:<<", orderEntity.toString());
        String orderId = orderEntity.getOrderId();
        if (StringUtils.isEmpty(orderId)) 
            log.error(">>orderId is null<<");
            return;
        
        OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
        if (dbOrderEntity != null) 
            log.info(">>该订单已经被消费过,无需重复消费!<<");
            // 无需继续重试
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        
        int result = orderManager.addOrder(orderEntity);

        log.info(">>插入数据库中数据成功<<");
        if (result >= 0) 
            // 开启消息确认机制
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        
//        int i = 1 / 0;
//         catch (Exception e) 
//            // 将失败的消息记录下来,后期采用人工补偿的形式
//        
    

以上是关于RabbitMQ面试问题的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 中的消息会过期吗?

RabbitMQ 中的消息会过期吗?

RabbitMQ ACK、NACK、Type、TTL、死信

SpringBoot+RabbitMQ 死信队列

RabbitMQ项目使用之死信队列

Rabbitmq消费失败死信队列