Nacos源码分析.黑马跟学笔记
Posted 心向阳光的天域
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码分析.黑马跟学笔记相关的知识,希望对你有一定的参考价值。
Nacos源码分析
1.下载Nacos源码并运行
要研究Nacos源码自然不能用打包好的Nacos服务端jar包来运行,需要下载源码自己编译来运行。
1.1.下载Nacos源码
Nacos的GitHub地址:https://github.com/alibaba/nacos
课前资料中已经提供了下载好的1.4.2版本的Nacos源码:
如果需要研究其他版本的同学,也可以自行下载:
大家找到其release页面:https://github.com/alibaba/nacos/tags,找到其中的1.4.2.版本:
点击进入后,下载Source code(zip):
1.2.导入Demo工程
我们的课前资料提供了一个微服务Demo,包含了服务注册、发现等业务。
导入该项目后,查看其项目结构:
结构说明:
- cloud-source-demo:项目父目录
- cloud-demo:微服务的父工程,管理微服务依赖
- order-service:订单微服务,业务中需要访问user-service,是一个服务消费者
- user-service:用户微服务,对外暴露根据id查询用户的接口,是一个服务提供者
- cloud-demo:微服务的父工程,管理微服务依赖
1.3.导入Nacos源码
将之前下载好的Nacos源码解压到cloud-source-demo项目目录中:
然后,使用IDEA将其作为一个module来导入:
1)选择项目结构选项:
然后点击导入module:
在弹出窗口中,选择nacos源码目录:
然后选择maven模块,finish:
最后,点击OK即可:
导入后的项目结构:
1.4.proto编译
Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化。并将对应的proto文件定义在了consistency这个子模块中:
我们需要先将proto文件编译为对应的Java代码。
1.4.1.什么是protobuf
protobuf的全称是Protocol Buffer,是Google提供的一种数据序列化协议,这是Google官方的定义:
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
可以简单理解为,是一种跨语言、跨平台的数据传输格式。与json的功能类似,但是无论是性能,还是数据大小都比json要好很多。
protobuf的之所以可以跨语言,就是因为数据定义的格式为.proto
格式,需要基于protoc编译为对应的语言。
1.4.2.安装protoc
Protobuf的GitHub地址:https://github.com/protocolbuffers/protobuf/releases
我们可以下载windows版本的来使用:
另外,课前资料也提供了下载好的安装包:
解压到任意非中文目录下,其中的bin目录中的protoc.exe可以帮助我们编译:
然后将这个bin目录配置到你的环境变量path中,可以参考JDK的配置方式:
1.4.3.编译proto
进入nacos-1.4.2的consistency模块下的src/main目录下:
然后打开cmd窗口,运行下面的两个命令:
protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto
如图:
会在nacos的consistency模块中编译出这些java代码:
1.5.运行
nacos服务端的入口是在console模块中的Nacos类:
我们需要让它单机启动:
然后新建一个SpringBootApplication:
然后填写应用信息:
然后运行Nacos这个main函数:
将order-service和user-service服务启动后,可以查看nacos控制台:
2.服务注册
服务注册到Nacos以后,会保存在一个本地注册表中,其结构如下:
首先最外层是一个Map,结构为:Map<String, Map<String, Service>>
:
- key:是namespace_id,起到环境隔离的作用。namespace下可以有多个group
- value:又是一个
Map<String, Service>
,代表分组及组内的服务。一个组内可以有多个服务- key:代表group分组,不过作为key时格式是group_name:service_name
- value:分组下的某一个服务,例如userservice,用户服务。类型为
Service
,内部也包含一个Map<String,Cluster>
,一个服务下可以有多个集群- key:集群名称
- value:
Cluster
类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set<Instance>
,就是该集群下的实例的集合- Instance:实例信息,包含实例的IP、Port、健康状态、权重等等信息
每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。
2.1.服务注册接口
Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。
**接口说明:**注册一个实例到Nacos服务。
请求类型:POST
请求路径:/nacos/v1/ns/instance
请求参数:
名称 | 类型 | 是否必选 | 描述 |
---|---|---|---|
ip | 字符串 | 是 | 服务实例IP |
port | int | 是 | 服务实例port |
namespaceId | 字符串 | 否 | 命名空间ID |
weight | double | 否 | 权重 |
enabled | boolean | 否 | 是否上线 |
healthy | boolean | 否 | 是否健康 |
metadata | 字符串 | 否 | 扩展信息 |
clusterName | 字符串 | 否 | 集群名 |
serviceName | 字符串 | 是 | 服务名 |
groupName | 字符串 | 否 | 分组名 |
ephemeral | boolean | 否 | 是否临时实例 |
错误编码:
错误代码 | 描述 | 语义 |
---|---|---|
400 | Bad Request | 客户端请求中的语法错误 |
403 | Forbidden | 没有权限 |
404 | Not Found | 无法找到资源 |
500 | Internal Server Error | 服务器内部错误 |
200 | OK | 正常 |
2.2.客户端
首先,我们需要找到服务注册的入口。
2.2.1.NacosServiceRegistryAutoConfiguration
因为Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:
spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar
这个包中找到Nacos自动装配信息:
可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration这个类,我们跟入其中。
可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:
2.2.2.NacosAutoServiceRegistration
NacosAutoServiceRegistration
源码如图:
可以看到在初始化时,其父类AbstractAutoServiceRegistration
也被初始化了。
AbstractAutoServiceRegistration
如图:
可以看到它实现了ApplicationListener
接口,监听Spring容器启动过程中的事件。
在监听到WebServerInitializedEvent
(web服务初始化完成)的事件后,执行了bind
方法。
其中的bind方法如下:
public void bind(WebServerInitializedEvent event)
// 获取 ApplicationContext
ApplicationContext context = event.getApplicationContext();
// 判断服务的 namespace,一般都是null
if (context instanceof ConfigurableWebServerApplicationContext)
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace()))
return;
// 记录当前 web 服务的端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 启动当前服务注册流程
this.start();
其中的start方法流程:
public void start()
if (!isEnabled())
if (logger.isDebugEnabled())
logger.debug("Discovery Lifecycle disabled. Not starting");
return;
// 当前服务处于未运行状态时,才进行初始化
if (!this.running.get())
// 发布服务开始注册的事件
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
// ☆☆☆☆开始注册☆☆☆☆
register();
if (shouldRegisterManagement())
registerManagement();
// 发布注册完成事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
// 服务状态设置为运行状态,基于AtomicBoolean
this.running.compareAndSet(false, true);
其中最关键的register()方法就是完成服务注册的关键,代码如下:
protected void register()
this.serviceRegistry.register(getRegistration());
此处的this.serviceRegistry就是NacosServiceRegistry:
2.2.3.NacosServiceRegistry
NacosServiceRegistry
是Spring的ServiceRegistry
接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了register、deregister等方法的声明。
而NacosServiceRegistry
对register
的实现如下:
@Override
public void register(Registration registration)
// 判断serviceId是否为空,也就是spring.application.name不能为空
if (StringUtils.isEmpty(registration.getServiceId()))
log.warn("No service to register for nacos client...");
return;
// 获取Nacos的命名服务,其实就是注册中心服务
NamingService namingService = namingService();
// 获取 serviceId 和 Group
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
// 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等
Instance instance = getNacosInstanceFromRegistration(registration);
try
// 开始注册服务
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, : register finished", group, serviceId,
instance.getIp(), instance.getPort());
catch (Exception e)
if (nacosDiscoveryProperties.isFailFast())
log.error("nacos registry, register failed...,", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
else
log.warn("Failfast is false. register failed...,", serviceId,
registration.toString(), e);
可以看到方法中最终是调用NamingService的registerInstance方法实现注册的。
而NamingService接口的默认实现就是NacosNamingService。
2.2.4.NacosNamingService
NacosNamingService提供了服务注册、订阅等功能。
其中registerInstance就是注册服务实例,源码如下:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException
// 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
NamingUtils.checkInstanceIsLegal(instance);
// 拼接得到新的服务名,格式为:groupName@@serviceId
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否为临时实例,默认为 true。
if (instance.isEphemeral())
// 如果是临时实例,需要定时向 Nacos 服务发送心跳
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
// 发送注册服务实例的请求
serverProxy.registerService(groupedServiceName, groupName, instance);
最终,由NacosProxy的registerService方法,完成服务注册。
代码如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException
NAMING_LOGGER.info("[REGISTER-SERVICE] registering service with instance: ", namespaceId, serviceName,
instance);
// 组织请求参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:
- namespace_id:环境
- service_name:服务名称
- group_name:组名称
- cluster_name:集群名称
- ip: 当前实例的ip地址
- port: 当前实例的端口
而在NacosNamingService的registerInstance方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。
2.2.5.客户端注册的流程图
如图:
2.3.服务端
在nacos-console的模块中,会引入nacos-naming这个模块:
模块结构如下:
其中的com.alibaba.nacos.naming.controllers包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController
类中:
2.3.1.InstanceController
进入InstanceController类,可以看到一个register方法,就是服务注册的方法了:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception
// 尝试获取namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 尝试获取serviceName,其格式为 group_name@@service_name
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 解析出实例信息,封装为Instance对象
final Instance instance = parseInstance(request);
// 注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
这里,进入到了serviceManager.registerInstance()方法中。
2.3.2.ServiceManager
ServiceManager就是Nacos中管理服务、实例信息的核心API,其中就包含Nacos的服务注册表:
而其中的registerInstance方法就是注册服务实例的方法:
/**
* Register an instance to a service in AP mode.
*
* <p>This method creates service or cluster silently if they don't exist.
*
* @param namespaceId id of namespace
* @param serviceName service name
* @param instance instance to register
* @throws Exception any error occurred in the process
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException
// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
// 此时不包含实例信息
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 拿到创建好的service
Service service = getService(namespaceId, serviceName);
// 拿不到则抛异常
if (service == null)
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
// 添加要注册的实例到service中
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
创建好了服务,接下来就要添加实例到服务中:
/**
* Add instance to service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ephemeral whether instance is ephemeral
* @param ips instances
* @throws NacosException nacos exception
*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException
// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取服务
Service service = getService(namespaceId, serviceName);
// 同步锁,避免并发修改的安全问题
synchronized (service)
// 1)获取要更新的实例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 2)封装实例列表到Instances对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 3)完成 注册表更新 以及 Nacos集群的数据同步
consistencyService.put(key, instances);
该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:
- 1)先获取要更新的实例列表,
addIpAddresses(service, ephemeral, ips);
- 2)然后将更新后的数据封装到
Instances
对象中,后面更新注册表时使用 - 3)最后,调用
consistencyService.put()
方法完成Nacos集群的数据同步,保证集群一致性。
注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。
这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。
1)更服务列表
我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);
:
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
继续进入updateIpAddresses
方法:
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosExceptionSpringCloud微服务技术栈.黑马跟学
SpringCloud微服务技术栈.黑马跟学 十二
今日目标
服务异步通信-高级篇
消息队列在使用过程中,面临着很多实际问题需要思考:
1.消息可靠性
消息从发送,到消费者接收,会经理多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
下面我们就通过案例来演示每一个步骤。
首先,导入课前资料提供的demo工程:
项目结构如下:
用docker启动即可
docker start mq
要创建一个队列起名simple.queue
然后在交换机中把amq.topic交换机,和上面创建的队列simple.queue绑定,我们手动配置
进入amq.topic交换机后,绑定队列
绑定后如图:
1.1.生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:
1.1.1.修改配置
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:
simple
:同步等待confirm结果,直到超时correlated
⭐:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
1.1.2.定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
修改publisher服务,添加一个:
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
// 投递失败,记录日志
log.info("消息发送失败,应答码,原因,交换机,路由键,消息",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
);
1.1.3.定义ConfirmCallback
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
public void testSendMessage2SimpleQueue() throws InterruptedException
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result ->
if(result.isAck())
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:", correlationData.getId());
else
// 3.2.nack,消息失败
log.error("消息发送失败, ID:, 原因",correlationData.getId(), result.getReason());
,
ex -> log.error("消息发送异常, ID:, 原因",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一会儿,等待ack回执
Thread.sleep(2000);
全部配置完后,运行测试类SpringAmqpTest.java,这说明消息发送成功
然后呢,我们来一个消息发送失败的情况,我们故意填错交换机的名字
调用后,后台打印日志如下:
然后我们尝试填错,routingKey看一下
报错信息如下:
之后我们恢复代码,都保证正确即可
总结:
SpringAMQP中处理消息确认的几种情况:
● publisher-comfirm:
- 消息成功发送到exchange,返回ack
- 消息发送失败,没有到达交换机,返回nack
- 消息发送过程中出现异常,没有收到回执
● 消息成功发送到exchange, 但没有路由到queue,
- 调用ReturnCallback
1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
1.2.1.交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。
我们通过命令
重启mq
docker restart mq
然后查看队列、交换机的情况,比如我们创建的是持久化队列
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange()
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D
的标示:
1.2.2.队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
我们可以先去mq图形化界面把simple.queue删除
@Bean
public Queue simpleQueue()
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D
的标示:
这些做完后,我们启动ConsumerApplication.java,然后查看mq的图形化界面
交换机是持久的
队列是持久的
1.2.3.消息持久化
首先把consumer服务停了,不要消费我们的消息
我们在mq的图形化界面,点击simple.queue队列,然后编辑消息,点击发送
查看有1条消息
然后我们重启docker中的mq
docker restart mq
然后再回来看mq的图形化界面,发现队列还在,但是消息没了
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代码指定:
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
运行测试类SpringAmqpTest.java之后,查看mq的图形化界面
查看一下具体消息
然后我们重启一下docker的mq容器
docker restart mq
注意:AMQP中创建的交换机、队列、消息默认都是持久的
交换机:
队列:
消息:
1.3.消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto⭐:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
1.3.1.演示none模式
修改consumer服务的application.yml文件,添加下面内容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
修改SpringRabbitListener.java
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg)
log.info("消费者接收到simple.queue的消息:【】", msg);
// 模拟异常
System.out.println(1 / 0);
log.debug("消息处理完成!");
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
dubug启动Consumer
发现消息还没接收呢,直接就没了
也就是说,消费者虽然接收到了消息,但是假如消费者还没有读取,发生了报错或者宕机,这个消息就会丢失
1.3.2.演示auto模式
再次把确认机制修改为auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 关闭ack
我们去mq的图形化界面创建消息
发送后,我们看到图形化界面中有1条消息
IDEA后台因为我们认为写了1/0的错误算数运算,导致IDEA不停重发请求重试消息的推送,这显然也不符合我们的要求
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
1.4.消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
怎么办呢?
1.4.1.本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 4 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
修改SpringRabbitListener.java
修改为日志打印的形式
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg)
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1 / 0);
log.info("消费者处理消息成功!");
重启consumer服务,重复之前的测试。可以发现:
- 在重试4次后,SpringAMQP会抛出异常
AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
1.4.2.失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机⭐
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange()
return new DirectExchange("error.direct");
@Bean
public Queue errorQueue()
return new Queue("error.queue", true);
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange)
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
完整代码:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig
@Bean
public DirectExchange errorMessageExchange()
return new DirectExchange("error.direct");
@Bean
public Queue errorQueue()
return new Queue("error.queue", true);
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange)
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
以上配置完之后,我们再重复步骤发送消息
发送后我们看到失败交换机有了
队列也有了
看一下IDEA的后台
看一下error.queue中的消息,很清晰把错误栈都输出了
1.5.总结
如何确保RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
2.死信交换机
2.1.初识死信交换机
2.1.1.什么是死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
如图,一个消息被消费者拒绝了,变成了死信:
因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
另外,队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
2.1.2.利用死信交换机接收死信(拓展)
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
我们在consumer服务中,定义一组死信交换机、死信队列:
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2()
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange()
return new DirectExchange("dl.direct", true, false);
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue()
return new Queue("dl.queue", true);
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding()
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
2.1.3.总结
什么样的消息会成为死信?
- 消息被消费者reject或者返回nack
- 消息超时未消费
- 队列满了
死信交换机的使用场景是什么?
- 如果队列绑定了死信交换机,死信会投递到死信交换机;
- 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
2.2.TTL
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间
2.2.1.接收超时死信的死信交换机
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange SpringCloud微服务技术栈.黑马跟学
微服务架构 *2.3 Spring Cloud 启动及加载配置文件源码分析(以 Nacos 为例)