RabbitMQ面试问题
Posted T_karine
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ面试问题相关的知识,希望对你有一定的参考价值。
1、MQ如何避免消息堆积,使MQ提高消费者的速度?
提高消费者速率(集群)、消费者批量获取消息。
2、MQ如何避免消费者重复消费(幂等问题)
基于Redis的setNx命令。
基于数据库表字段的唯一性约束。
3、MQ服务器宕机之后,如何保证消息不丢失?
持久化机制
4、MQ接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失?
不会,消息确认机制。必须要消费者成功消费消息,再通知给MQ服务器端删除该消息。当消息在指定时间内,没有被消费掉,会将消息转移到死信交换机分发给死信队列,给死信消费者进行消费。
5、MQ如何保证消息顺序一致性?
绑定同一个消费者和队列
6、MQ推与取架构模型
MQ 服务器与消费者建立长连接后,MQ 服务器会主动推数据给到消费者
当消费者第一次启动的时候,会主动找MQ 服务器拉数据
7、MQ如何实现高并发思想?
MQ消费者根据自身能力情况 ,拉取mq服务器端消息消费。默认的情况下是取出一条消息,可根据能力设置一次获取的消息数量。
8、MQ死信队列原理?
8.1 死信队列产生的背景
1)消息投递到MQ中存放的消息已经过期 ,消费者没有及时的消费消息,即消息存放到MQ服务器中过期,会转移到(备胎)死信队列存放。
2)队列达到最大的长度 (队列容器已经满了),会转移到备胎死信队列存放。
3)消费者消费多次消息失败,就会转移存放到死信队列中。
8.2 死信队列的架构原理
死信队列和普通队列区别不是很大,都有自己独立的交换机和路由key、队列和消费者。
1)生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到普通队列中缓存起来,普通队列对应有自己独立普通消费者。
2)如果生产者投递消息到普通队列中时,出现8.1中的几种情况,会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机将消息分发给对应的死信(备胎)队列,再由对应的死信(备胎)消费者进行消费。
8.3 RabbitMQ应用场景
基于RabbitMQ设计订单30分钟超时处理
基于死信队列。消息生产者发送消息时,设置消息有效时间为30分钟,如果30分钟内消息没有被消费,消息将被转移到死信交换机,分发给死信队列,死信消费者对消息进行消费。
8.4 实现方式
1)配置文件application.yml
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /meiteVirtualHosts
server:
port: 8080
###模拟演示死信队列
mayikt:
dlx:
exchange: mayikt_dlx_exchange
queue: mayikt_order_dlx_queue
routingKey: dlx
###备胎交换机
order:
exchange: mayikt_order_exchange
queue: mayikt_order_queue
routingKey: mayikt.order
2)引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
3)配置类DeadLetterMQConfig.java
绑定死信交换机和普通队列。默认创建交换机和队列,不需要在RabbitMQ控制台创建。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class DeadLetterMQConfig
/**
* 订单交换机
*/
@Value("$mayikt.order.exchange")
private String orderExchange;
/**
* 订单队列
*/
@Value("$mayikt.order.queue")
private String orderQueue;
/**
* 订单路由key
*/
@Value("$mayikt.order.routingKey")
private String orderRoutingKey;
/**
* 死信交换机
*/
@Value("$mayikt.dlx.exchange")
private String dlxExchange;
/**
* 死信队列
*/
@Value("$mayikt.dlx.queue")
private String dlxQueue;
/**
* 死信路由
*/
@Value("$mayikt.dlx.routingKey")
private String dlxRoutingKey;
/**
* 声明死信交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange()
return new DirectExchange(dlxExchange);
/**
* 声明死信队列
*
* @return Queue
*/
@Bean
public Queue dlxQueue()
return new Queue(dlxQueue);
/**
* 声明订单业务交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange orderExchange()
return new DirectExchange(orderExchange);
/**
* 声明订单队列
*
* @return Queue
*/
@Bean
public Queue orderQueue()
// 订单队列绑定我们的死信交换机
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange", dlxExchange);
arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
return new Queue(orderQueue, true, false, false, arguments);
/**
* 绑定死信队列到死信交换机
*
* @return Binding
*/
@Bean
public Binding binding()
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
/**
* 绑定订单队列到订单交换机
*
* @return Binding
*/
@Bean
public Binding orderBinding()
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
4)生产者
设置过期时间
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderProducer
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 订单交换机
*/
@Value("$mayikt.order.exchange")
private String orderExchange;
/**
* 订单路由key
*/
@Value("$mayikt.order.routingKey")
private String orderRoutingKey;
@RequestMapping("/sendOrder")
public String sendOrder()
String msg = "牛逼";
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, message ->
// 设置消息过期时间 10秒过期
message.getMessageProperties().setExpiration("10000");
return message;
);
return "success";
5)死信消费者
订单30分钟超时,即无消费者消费。
package com.mayikt.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderDlxConsumer
/**
* 死信队列监听队列回调的方法
*
* @param msg
*/
@RabbitListener(queues = "mayikt_order_dlx_queue")
public void orderConsumer(String msg)
log.info(">死信队列消费订单消息:msg<<", msg);
9、RabbitMQ消息幂等性问题
9.1 在什么情况下消费者需要实现重试策略?
1)消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?
【调用第三方接口、网络波动问题、暂时调用不了、网络连接】
该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。需在配置文件中设置重试次数和重试时间间隔。
2)消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?
该情况下是不需要实现重试策略,因为代码原因抛出异常需要重新发布版本才能解决的,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期通过定时任务或者人工补偿形式。
9.2 RabbitMQ消息自动重试机制
当消费者执行业务代码的时候,如果抛出异常、消息未被及时消费的情况下,MQ会自动触发重试机制,默认的情况下RabbitMQ是无限次数的重试,需要人为指定重试次数(详见配置文件)。
9.3 开启重试机制(配置文件)
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /meite_rabbitmq
listener:
simple:
retry:
####开启消费者(程序出现异常的情况下会)进行重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
9.4 消费者如如何避免重复消费问题?
【Mq在重试的过程中,有可能会引发消费者重复消费的问题】
生产者在投递消息的时候,生成一个全局唯一id,放在消息中。消费者获取到我们该消息,可以根据该全局唯一id实现去重复。用全局id提前查询,如果存在的情况下,代码就无需往下执行。(不严谨、不推荐)
【限制重试次数】设置每次重试之间要有一定间隔时间
在数据库层面解决消息重复消费,保证数据唯一性,比如加上唯一id。
业务逻辑是做insert操作时,使用唯一主键约束
业务逻辑是做update操作时,使用乐观锁
代码实现
@Slf4j
@Component
@RabbitListener(queues = "fanout_order_queue")
public class FanoutOrderConsumer
@Autowired
private OrderManager orderManager;
@Autowired
private OrderMapper orderMapper;
@RabbitHandler
public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException
// try
log.info(">>orderEntity:<<", orderEntity.toString());
String orderId = orderEntity.getOrderId();
if (StringUtils.isEmpty(orderId))
log.error(">>orderId is null<<");
return;
OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
if (dbOrderEntity != null)
log.info(">>该订单已经被消费过,无需重复消费!<<");
// 无需继续重试
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
int result = orderManager.addOrder(orderEntity);
log.info(">>插入数据库中数据成功<<");
if (result >= 0)
// 开启消息确认机制
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// int i = 1 / 0;
// catch (Exception e)
// // 将失败的消息记录下来,后期采用人工补偿的形式
//
以上是关于RabbitMQ面试问题的主要内容,如果未能解决你的问题,请参考以下文章