Nacos源码分析.黑马跟学笔记

Posted 心向阳光的天域

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码分析.黑马跟学笔记相关的知识,希望对你有一定的参考价值。

Nacos源码分析

1.下载Nacos源码并运行

要研究Nacos源码自然不能用打包好的Nacos服务端jar包来运行,需要下载源码自己编译来运行。

1.1.下载Nacos源码

Nacos的GitHub地址:https://github.com/alibaba/nacos

课前资料中已经提供了下载好的1.4.2版本的Nacos源码:

如果需要研究其他版本的同学,也可以自行下载:

大家找到其release页面:https://github.com/alibaba/nacos/tags,找到其中的1.4.2.版本:

点击进入后,下载Source code(zip):

1.2.导入Demo工程

我们的课前资料提供了一个微服务Demo,包含了服务注册、发现等业务。

导入该项目后,查看其项目结构:

结构说明:

  • cloud-source-demo:项目父目录
    • cloud-demo:微服务的父工程,管理微服务依赖
      • order-service:订单微服务,业务中需要访问user-service,是一个服务消费者
      • user-service:用户微服务,对外暴露根据id查询用户的接口,是一个服务提供者

1.3.导入Nacos源码

将之前下载好的Nacos源码解压到cloud-source-demo项目目录中:

然后,使用IDEA将其作为一个module来导入:

1)选择项目结构选项:

然后点击导入module:

在弹出窗口中,选择nacos源码目录:

然后选择maven模块,finish:

最后,点击OK即可:

导入后的项目结构:

1.4.proto编译

Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化。并将对应的proto文件定义在了consistency这个子模块中:

我们需要先将proto文件编译为对应的Java代码。

1.4.1.什么是protobuf

protobuf的全称是Protocol Buffer,是Google提供的一种数据序列化协议,这是Google官方的定义:

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

可以简单理解为,是一种跨语言、跨平台的数据传输格式。与json的功能类似,但是无论是性能,还是数据大小都比json要好很多。

protobuf的之所以可以跨语言,就是因为数据定义的格式为.proto格式,需要基于protoc编译为对应的语言。

1.4.2.安装protoc

Protobuf的GitHub地址:https://github.com/protocolbuffers/protobuf/releases

我们可以下载windows版本的来使用:

另外,课前资料也提供了下载好的安装包:

解压到任意非中文目录下,其中的bin目录中的protoc.exe可以帮助我们编译:

然后将这个bin目录配置到你的环境变量path中,可以参考JDK的配置方式:

1.4.3.编译proto

进入nacos-1.4.2的consistency模块下的src/main目录下:

然后打开cmd窗口,运行下面的两个命令:

protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto

如图:

会在nacos的consistency模块中编译出这些java代码:

1.5.运行

nacos服务端的入口是在console模块中的Nacos类:

我们需要让它单机启动:

然后新建一个SpringBootApplication:

然后填写应用信息:

然后运行Nacos这个main函数:

将order-service和user-service服务启动后,可以查看nacos控制台:

2.服务注册

服务注册到Nacos以后,会保存在一个本地注册表中,其结构如下:

首先最外层是一个Map,结构为:Map<String, Map<String, Service>>

  • key:是namespace_id,起到环境隔离的作用。namespace下可以有多个group
  • value:又是一个Map<String, Service>,代表分组及组内的服务。一个组内可以有多个服务
    • key:代表group分组,不过作为key时格式是group_name:service_name
    • value:分组下的某一个服务,例如userservice,用户服务。类型为Service,内部也包含一个Map<String,Cluster>,一个服务下可以有多个集群
      • key:集群名称
      • value:Cluster类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set<Instance>,就是该集群下的实例的集合
        • Instance:实例信息,包含实例的IP、Port、健康状态、权重等等信息

每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。

2.1.服务注册接口

Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。

**接口说明:**注册一个实例到Nacos服务。

请求类型POST

请求路径/nacos/v1/ns/instance

请求参数

名称类型是否必选描述
ip字符串服务实例IP
portint服务实例port
namespaceId字符串命名空间ID
weightdouble权重
enabledboolean是否上线
healthyboolean是否健康
metadata字符串扩展信息
clusterName字符串集群名
serviceName字符串服务名
groupName字符串分组名
ephemeralboolean是否临时实例

错误编码

错误代码描述语义
400Bad Request客户端请求中的语法错误
403Forbidden没有权限
404Not Found无法找到资源
500Internal Server Error服务器内部错误
200OK正常

2.2.客户端

首先,我们需要找到服务注册的入口。

2.2.1.NacosServiceRegistryAutoConfiguration

因为Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:

spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar

这个包中找到Nacos自动装配信息:

可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration这个类,我们跟入其中。

可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:

2.2.2.NacosAutoServiceRegistration

NacosAutoServiceRegistration源码如图:

可以看到在初始化时,其父类AbstractAutoServiceRegistration也被初始化了。

AbstractAutoServiceRegistration如图:

可以看到它实现了ApplicationListener接口,监听Spring容器启动过程中的事件。

在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法。

其中的bind方法如下:

public void bind(WebServerInitializedEvent event) 
    // 获取 ApplicationContext
    ApplicationContext context = event.getApplicationContext();
    // 判断服务的 namespace,一般都是null
    if (context instanceof ConfigurableWebServerApplicationContext) 
        if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                                .getServerNamespace())) 
            return;
        
    
    // 记录当前 web 服务的端口
    this.port.compareAndSet(0, event.getWebServer().getPort());
    // 启动当前服务注册流程
    this.start();

其中的start方法流程:

public void start() 
		if (!isEnabled()) 
			if (logger.isDebugEnabled()) 
				logger.debug("Discovery Lifecycle disabled. Not starting");
			
			return;
		

		// 当前服务处于未运行状态时,才进行初始化
		if (!this.running.get()) 
            // 发布服务开始注册的事件
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
            // ☆☆☆☆开始注册☆☆☆☆
			register();
			if (shouldRegisterManagement()) 
				registerManagement();
			
            // 发布注册完成事件
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
            // 服务状态设置为运行状态,基于AtomicBoolean
			this.running.compareAndSet(false, true);
		

	

其中最关键的register()方法就是完成服务注册的关键,代码如下:

protected void register() 
    this.serviceRegistry.register(getRegistration());

此处的this.serviceRegistry就是NacosServiceRegistry:

2.2.3.NacosServiceRegistry

NacosServiceRegistry是Spring的ServiceRegistry接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了register、deregister等方法的声明。

NacosServiceRegistryregister的实现如下:

@Override
public void register(Registration registration) 
	// 判断serviceId是否为空,也就是spring.application.name不能为空
    if (StringUtils.isEmpty(registration.getServiceId())) 
        log.warn("No service to register for nacos client...");
        return;
    
    // 获取Nacos的命名服务,其实就是注册中心服务
    NamingService namingService = namingService();
    // 获取 serviceId 和 Group
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();
	// 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等
    Instance instance = getNacosInstanceFromRegistration(registration);

    try 
        // 开始注册服务
        namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry,   : register finished", group, serviceId,
                 instance.getIp(), instance.getPort());
    
    catch (Exception e) 
        if (nacosDiscoveryProperties.isFailFast()) 
            log.error("nacos registry,  register failed...,", serviceId,
                      registration.toString(), e);
            rethrowRuntimeException(e);
        
        else 
            log.warn("Failfast is false.  register failed...,", serviceId,
                     registration.toString(), e);
        
    

可以看到方法中最终是调用NamingService的registerInstance方法实现注册的。

而NamingService接口的默认实现就是NacosNamingService。

2.2.4.NacosNamingService

NacosNamingService提供了服务注册、订阅等功能。

其中registerInstance就是注册服务实例,源码如下:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException 
    // 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
    NamingUtils.checkInstanceIsLegal(instance);
    // 拼接得到新的服务名,格式为:groupName@@serviceId
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判断是否为临时实例,默认为 true。
    if (instance.isEphemeral()) 
        // 如果是临时实例,需要定时向 Nacos 服务发送心跳
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    
    // 发送注册服务实例的请求
    serverProxy.registerService(groupedServiceName, groupName, instance);

最终,由NacosProxy的registerService方法,完成服务注册。

代码如下:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException 

    NAMING_LOGGER.info("[REGISTER-SERVICE]  registering service  with instance: ", namespaceId, serviceName,
                       instance);
	// 组织请求参数
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
	// 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);


这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:

  • namespace_id:环境
  • service_name:服务名称
  • group_name:组名称
  • cluster_name:集群名称
  • ip: 当前实例的ip地址
  • port: 当前实例的端口

而在NacosNamingService的registerInstance方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。

2.2.5.客户端注册的流程图

如图:

2.3.服务端

在nacos-console的模块中,会引入nacos-naming这个模块:

模块结构如下:

其中的com.alibaba.nacos.naming.controllers包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController类中:

2.3.1.InstanceController

进入InstanceController类,可以看到一个register方法,就是服务注册的方法了:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception 
	// 尝试获取namespaceId
    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 尝试获取serviceName,其格式为 group_name@@service_name
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
	// 解析出实例信息,封装为Instance对象
    final Instance instance = parseInstance(request);
	// 注册实例
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";

这里,进入到了serviceManager.registerInstance()方法中。

2.3.2.ServiceManager

ServiceManager就是Nacos中管理服务、实例信息的核心API,其中就包含Nacos的服务注册表:

而其中的registerInstance方法就是注册服务实例的方法:

/**
     * Register an instance to a service in AP mode.
     *
     * <p>This method creates service or cluster silently if they don't exist.
     *
     * @param namespaceId id of namespace
     * @param serviceName service name
     * @param instance    instance to register
     * @throws Exception any error occurred in the process
     */
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException 
	// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
    // 此时不包含实例信息
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // 拿到创建好的service
    Service service = getService(namespaceId, serviceName);
    // 拿不到则抛异常
    if (service == null) 
        throw new NacosException(NacosException.INVALID_PARAM,
                                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    
    // 添加要注册的实例到service中
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

创建好了服务,接下来就要添加实例到服务中:

/**
     * Add instance to service.
     *
     * @param namespaceId namespace
     * @param serviceName service name
     * @param ephemeral   whether instance is ephemeral
     * @param ips         instances
     * @throws NacosException nacos exception
     */
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException 
	// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 获取服务
    Service service = getService(namespaceId, serviceName);
    // 同步锁,避免并发修改的安全问题
    synchronized (service) 
        // 1)获取要更新的实例列表
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
		// 2)封装实例列表到Instances对象
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
		// 3)完成 注册表更新 以及 Nacos集群的数据同步
        consistencyService.put(key, instances);
    

该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:

  • 1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);
  • 2)然后将更新后的数据封装到Instances对象中,后面更新注册表时使用
  • 3)最后,调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性。

注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。

这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。

1)更服务列表

我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException 
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);

继续进入updateIpAddresses方法:

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
    throws NacosException

SpringCloud微服务技术栈.黑马跟学

SpringCloud微服务技术栈.黑马跟学 十二

今日目标

服务异步通信-高级篇

消息队列在使用过程中,面临着很多实际问题需要思考:

1.消息可靠性

消息从发送,到消费者接收,会经理多个过程:

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

下面我们就通过案例来演示每一个步骤。
首先,导入课前资料提供的demo工程:

项目结构如下:

用docker启动即可

docker start mq

要创建一个队列起名simple.queue

然后在交换机中把amq.topic交换机,和上面创建的队列simple.queue绑定,我们手动配置

进入amq.topic交换机后,绑定队列

绑定后如图:

1.1.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。


注意:

1.1.1.修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated⭐:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

1.1.2.定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> 
            // 投递失败,记录日志
            log.info("消息发送失败,应答码,原因,交换机,路由键,消息",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        );
    

1.1.3.定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:

public void testSendMessage2SimpleQueue() throws InterruptedException 
    // 1.消息体
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> 
            if(result.isAck())
                // 3.1.ack,消息成功
                log.debug("消息发送成功, ID:", correlationData.getId());
            else
                // 3.2.nack,消息失败
                log.error("消息发送失败, ID:, 原因",correlationData.getId(), result.getReason());
            
        ,
        ex -> log.error("消息发送异常, ID:, 原因",correlationData.getId(),ex.getMessage())
    );
    // 4.发送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一会儿,等待ack回执
    Thread.sleep(2000);

全部配置完后,运行测试类SpringAmqpTest.java,这说明消息发送成功

然后呢,我们来一个消息发送失败的情况,我们故意填错交换机的名字

调用后,后台打印日志如下:

然后我们尝试填错,routingKey看一下

报错信息如下:

之后我们恢复代码,都保证正确即可

总结:
SpringAMQP中处理消息确认的几种情况:
● publisher-comfirm:

  • 消息成功发送到exchange,返回ack
  • 消息发送失败,没有到达交换机,返回nack
  • 消息发送过程中出现异常,没有收到回执

● 消息成功发送到exchange, 但没有路由到queue,

  • 调用ReturnCallback

1.2.消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

1.2.1.交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

我们通过命令
重启mq

docker restart mq

然后查看队列、交换机的情况,比如我们创建的是持久化队列

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange()
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:

1.2.2.队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:

我们可以先去mq图形化界面把simple.queue删除

@Bean
public Queue simpleQueue()
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:

这些做完后,我们启动ConsumerApplication.java,然后查看mq的图形化界面
交换机是持久的

队列是持久的

1.2.3.消息持久化

首先把consumer服务停了,不要消费我们的消息
我们在mq的图形化界面,点击simple.queue队列,然后编辑消息,点击发送

查看有1条消息

然后我们重启docker中的mq

docker restart mq

然后再回来看mq的图形化界面,发现队列还在,但是消息没了

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代码指定:

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
运行测试类SpringAmqpTest.java之后,查看mq的图形化界面

查看一下具体消息

然后我们重启一下docker的mq容器

docker restart mq

注意:AMQP中创建的交换机、队列、消息默认都是持久的
交换机:

队列:

消息:

1.3.消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto⭐:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

1.3.1.演示none模式

修改consumer服务的application.yml文件,添加下面内容:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
修改SpringRabbitListener.java

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) 
    log.info("消费者接收到simple.queue的消息:【】", msg);
    // 模拟异常
    System.out.println(1 / 0);
    log.debug("消息处理完成!");

测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
dubug启动Consumer
发现消息还没接收呢,直接就没了


也就是说,消费者虽然接收到了消息,但是假如消费者还没有读取,发生了报错或者宕机,这个消息就会丢失

1.3.2.演示auto模式

再次把确认机制修改为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 关闭ack

我们去mq的图形化界面创建消息

发送后,我们看到图形化界面中有1条消息

IDEA后台因为我们认为写了1/0的错误算数运算,导致IDEA不停重发请求重试消息的推送,这显然也不符合我们的要求

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):

抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

1.4.消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

怎么办呢?

1.4.1.本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 4 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

修改SpringRabbitListener.java
修改为日志打印的形式

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) 
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    

重启consumer服务,重复之前的测试。可以发现:

  • 在重试4次后,SpringAMQP会抛出异常


AmqpRejectAndDontRequeueException,说明本地重试触发了

  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

1.4.2.失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机⭐

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange()
    return new DirectExchange("error.direct");

@Bean
public Queue errorQueue()
    return new Queue("error.queue", true);

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange)
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

完整代码:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
public class ErrorMessageConfig 
    @Bean
    public DirectExchange errorMessageExchange()
        return new DirectExchange("error.direct");
    
    @Bean
    public Queue errorQueue()
        return new Queue("error.queue", true);
    
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange)
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    

以上配置完之后,我们再重复步骤发送消息

发送后我们看到失败交换机有了

队列也有了

看一下IDEA的后台

看一下error.queue中的消息,很清晰把错误栈都输出了

1.5.总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

2.死信交换机

2.1.初识死信交换机

2.1.1.什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信:

因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

2.1.2.利用死信交换机接收死信(拓展)

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

我们在consumer服务中,定义一组死信交换机、死信队列:

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2()
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();

// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange()
    return new DirectExchange("dl.direct", true, false);

// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue()
    return new Queue("dl.queue", true);

// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding()
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");

2.1.3.总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack
  • 消息超时未消费
  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

2.2.TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

2.2.1.接收超时死信的死信交换机

在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange SpringCloud微服务技术栈.黑马跟学

SpringCloud微服务技术栈.黑马跟学

微服务架构 *2.5 Nacos 长轮询定时机制的源码分析

微服务架构 *2.3 Spring Cloud 启动及加载配置文件源码分析(以 Nacos 为例)

微服务架构 | *2.5 Nacos 长轮询定时机制的源码分析

微服务架构 | *2.4 Nacos 获取配置与事件订阅机制的源码分析

(c)2006-2024 SYSTEM All Rights Reserved IT常识