4Nacos 配置中心源码解析之 服务端启动

Posted carl-zhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4Nacos 配置中心源码解析之 服务端启动相关的知识,希望对你有一定的参考价值。

上一篇文章中我们使用 -Dnacos.standalone=true本地启动了 Nacos 服务器,并且可以在 http://localhost:8848/nacos 通过 nacos/nacos 用户名密码就可以访问 nacos 控制页面。下面我们就来大体看一下 Nacos 在启动的时候干了哪些核心的事。

1、Nacos 认证服务

Nacos 中的 nacos-console 项目依赖了与配置中心相关的以下几个模块:

  • nacos-config:配置中心项目模块
  • nacos-plugin-default-impl:Nacos 插件模块:主要是用户认证授权相关操作,这里使用的 Spring Security 来做安全认证。相关的配置类为:NacosAuthConfig

1.1 内存数据库加载数据

JVM 启动参数添加 -Dnacos.standalone=true,这个时候在 DynamicDataSource#getDataSource 就会初始化 LocalDataSourceServiceImpl

DynamicDataSource#getDataSource()

    public synchronized DataSourceService getDataSource() 
        try 
            
            // Embedded storage is used by default in stand-alone mode
            // In cluster mode, external databases are used by default
            
            if (PropertyUtil.isEmbeddedStorage()) 
                if (localDataSourceService == null) 
                    localDataSourceService = new LocalDataSourceServiceImpl();
                    localDataSourceService.init();
                
                return localDataSourceService;
             else 
                if (basicDataSourceService == null) 
                    basicDataSourceService = new ExternalDataSourceServiceImpl();
                    basicDataSourceService.init();
                
                return basicDataSourceService;
            
         catch (Exception e) 
            throw new RuntimeException(e);
        
    

LocalDataSourceServiceImpl#initialize 会使用 derby 这个内存数据库来保存数据。保存数据地址为:$user.home/nacos/data/derby-data

然后调用 LocalDataSourceServiceImpl#reload 加载 console/resources/META-INF/schema.sql 里面的数据到内存数据库当中。其中就包括我们登陆使用的 nacos/nacos 用户名密码

1.2 nacos plugin 暴露用户认证

plugin-default-impl 模块中的com.alibaba.nacos.plugin.auth.impl.controller 包里面包含了以下几个 Http 服务:

  • PermissionController:权限操作服务:获取所有权限、添加角色权限、删除角色权限等接口
  • RoleController:角色操作服务:获取所有角色、查询角色、添加角色、删除角色等接口
  • UserController:用户操作服务:包括创建用户、删除用户、修改用户、获取所有用户、用户登陆等接口

Nacos 使用的 Spring Seurity 做的权限管理,默认支持 Nacos 类型权限也就是数据库管理权限以及 Ldap 权限管理。默认使用数据库 RBAC 权限管理。 Spring Security Web 配置类为:NacosAuthConfig。

1.3 用户登陆时序图


以上就是 Nacos 登陆认证时序图。用户的具体的信息会缓存到 ConcurrentHashMap 当中。通过 Spring 定时器 NacosUserDetailsServiceImpl#reload 定时从数据库刷新缓存中的数据。因为我们在初始化数据库的时候会调用以下 SQL 语句保存数据到内存数据库 derby 当中。

CREATE TABLE users (
	username varchar(50) NOT NULL PRIMARY KEY,
	password varchar(500) NOT NULL,
	enabled boolean NOT NULL DEFAULT true
);

INSERT INTO users (username, password, enabled) VALUES 
('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);

这条数据就是加载到NacosUserDetailsServiceImpl 的缓存 Map 中,这样我们就可以使用 nacos/nacos 用户名密码进行授权登陆了。

2、暴露 Http 服务给控制台

下面我们来讨论一下 console 服务启动之后暴露了哪些 http 服务给控制台,在这里我们排除 naming 服务只讨论配置服务相关的 rest 服务.

2.1 console 服务

我们先来看一下 console 服务中的 ConsoleConfig 这个配置类。

ConsoleConfig.java

@Component
@EnableScheduling
@PropertySource("/application.properties")
public class ConsoleConfig 
    
    @Autowired
    private ControllerMethodsCache methodsCache;
    
    /**
     * Init.
     */
    @PostConstruct
    public void init() 
        methodsCache.initClassMethod("com.alibaba.nacos.core.controller");
        methodsCache.initClassMethod("com.alibaba.nacos.naming.controllers");
        methodsCache.initClassMethod("com.alibaba.nacos.config.server.controller");
        methodsCache.initClassMethod("com.alibaba.nacos.console.controller");
    
    
    @Bean
    public CorsFilter corsFilter() 
        CorsConfiguration config = new CorsConfiguration();
        config.setAllowCredentials(true);
        config.addAllowedOrigin("*");
        config.addAllowedHeader("*");
        config.setMaxAge(18000L);
        config.addAllowedMethod("*");
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", config);
        return new CorsFilter(source);
    
    
    @Bean
    public XssFilter xssFilter() 
        return new XssFilter();
    
    
    @Bean
    public Jackson2ObjectMapperBuilderCustomizer jacksonObjectMapperCustomization() 
        return jacksonObjectMapperBuilder -> jacksonObjectMapperBuilder.timeZone(ZoneId.systemDefault().toString());
    

首先配置两个 Filter 过滤器:CorsFilter 是为了解决 console 前后端分享接口调用跨域问题;XssFilter 为了解决跨域攻击问题。

ConsoleConfig#init 方法是解析传入包里面的 Controller@RequestMapping 方法缓存 http 请求与 @ReqeustMapping 的方法映射。这样在 Filter 里面就可以过滤掉非法请求。

下面我们来分析一下 console 模块下的 Controller:

  • HealthController:Nacos 健康的操作,比如:服务健康检测,查看 Config 与 Naming 是否对外服务。
  • NamespaceController:Nacos 命名空间相关的操作,比如:获取所有命名空间信息、获取命名空间详情、创建命名空间、修改命名空间、删除命名空间等。
  • ServerStateController:Nacos 服务状态的操作,比如:查看服务状态

2.2 core 服务

下面我们来分析一下 core 模块下的 Controller:

  • NacosClusterController:Nacos 集群相关的操作,比如:获取自身节点信息,获取所有节点信息、获取所有节点地址、查看当前节点是否健康等。
  • ServerLoaderController:Nacos 连接相关的操作,比如:获取当前服务的 Client 连接信息、获取当前服务的服务状态、当前服务重新连接客户端、获取当前服务指标等。
  • CoreOpsController:Nacos 基础操作,比如:执行 raft 命令、获取 ID 生成规则相关信息、设置日志级别
  • CoreOpsV2Controller:Nacos 基础操作v2,比如:执行 raft 命令、获取 ID 生成规则相关信息、设置日志级别
  • NacosClusterV2Controller:Nacos 集群相关的操作v2,比如:获取自身节点信息,获取所有节点信息、获取所有节点地址等。

2.3 config 服务

下面我们来分析一下 config 模块下的 Controller:

  • CapacityController:获取租户的配置能力、修改租户的配置能力
  • ClientMetricsController:获取配置中心集群的指标信息、获取当前节点所有客户端的指标
  • CommunicationController:把指定的配置保存到本地文件当中、获取 grpc 连接信息、获取 http 长轮训状态
  • ConfigController:发布配置信息、获取配置信息、删除配置信息、查询配置信息等
  • ConfigOpsController:把当前所有配置保存到本地文件当中、设置日志级别、使用 derby 内存数据库动态传入 SQL 操作、通过文件动态上传配置
  • HealthController:判断当前节点配置中心的健康情况
  • HistoryController:分页查询配置的历史记录、ID查询配置的历史记录详情、ID查询配置的上一个历史记录详情、根据命名空间查询配置的历史记录
  • ListenerController:根据 IP 获取所有的订单信息

3、暴露 Grpc 服务给客户端

console 模块启动的时候会启动两个 GRPC 服务端:一个是 SDK 使用;一个 Cluster 使用。它们两个暴露的服务都是一样的只不过是暴露的端口不一样:

  • SDK 服务:暴露端口 9848,Nacos 的启动端口 8848 + 1000(GrpcSdkServer#rpcPortOffset),
  • Cluster 服务:暴露端口 9849,Nacos 的启动端口 8848 + 1001(GrpcClusterServer#rpcPortOffset),

GRPC 类结构如下:

  • GrpcClusterServer:Cluster 偏移端口、获取 Cluster RPC 线程池
  • GrpcSdkServer:Sdk 偏移端口、获取 Sdk RPC 线程池
  • BaseGrpcServer:服务启动的具体实现
  • BaseRpcServer:RPC 基类实现,注册 Payload(GRPC请求响应类)、RPC服务启动

3.1 注册 Payload

Nacos 暴露的 Grpc 服务的请求参数与响应参数都基类都是 Payload,在基类 BaseRpcServer 中通过 static 代码块注册 Payload。

BaseRpcServer.java

    static 
        PayloadRegistry.init();
    

通过 Java 的 SPI 机制分别在 client 模块加载到 ClientPayloadPackageProvider 以及 config 加载到 ConfigPayloadPackageProvider

client/META-INF/services/com.alibaba.nacos.common.remote.PayloadPackageProvider

public class ClientPayloadPackageProvider implements PayloadPackageProvider 
    
    private final Set<String> scanPackage = new HashSet<>();
    
    
        scanPackage.add("com.alibaba.nacos.api.naming.remote.request");
        scanPackage.add("com.alibaba.nacos.api.remote.request");
        scanPackage.add("com.alibaba.nacos.api.config.remote.request");
        scanPackage.add("com.alibaba.nacos.api.naming.remote.response");
        scanPackage.add("com.alibaba.nacos.api.config.remote.response");
        scanPackage.add("com.alibaba.nacos.api.remote.response");
    
    
    @Override
    public Set<String> getScanPackage() 
        return scanPackage;
    

下面是 config 的配置:

config/META-INF/services/com.alibaba.nacos.common.remote.PayloadPackageProvider

public class ConfigPayloadPackageProvider implements PayloadPackageProvider 
    
    private final Set<String> scanPackage = new HashSet<>();
    
    
        scanPackage.add("com.alibaba.nacos.api.remote.request");
        scanPackage.add("com.alibaba.nacos.api.remote.response");
        scanPackage.add("com.alibaba.nacos.api.config.remote.request");
        scanPackage.add("com.alibaba.nacos.api.config.remote.response");
    
    
    @Override
    public Set<String> getScanPackage() 
        return scanPackage;
    


然后把这两个类所定义扫描的包以类名以及类 Class 注册到 PayloadRegistry.REGISTRY_REQUEST 当中。当进行 GRPC 调用的时候提供给 GrpcUtils 进行请求响应对象转换。

3 .2 启动 grpc 服务

BaseRpcServer.java

    @PostConstruct
    public void start() throws Exception 
        String serverName = getClass().getSimpleName();
        Loggers.REMOTE.info("Nacos  Rpc server starting at port ", serverName, getServicePort());
        
        startServer();
    
        Loggers.REMOTE.info("Nacos  Rpc server started at port ", serverName, getServicePort());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> 
            Loggers.REMOTE.info("Nacos  Rpc server stopping", serverName);
            try 
                BaseRpcServer.this.stopServer();
                Loggers.REMOTE.info("Nacos  Rpc server stopped successfully...", serverName);
             catch (Exception e) 
                Loggers.REMOTE.error("Nacos  Rpc server stopped fail...", serverName, e);
            
        ));

    

这个类里面的逻辑比较简单:

  • 定义了服务启动接口 startServer(),由子类实现
  • 注册一个服务停止后调用的钩子函数 stopServer()

BaseGrpcServer.java

    @Override
    public void startServer() throws Exception 
        final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
        
        // #1
        ServerInterceptor serverInterceptor = new ServerInterceptor() 
            @Override
            public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                    ServerCallHandler<T, S> next) 
                Context ctx = Context.current()
                        .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                        .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                        .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                        .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
                if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) 
                    Channel internalChannel = getInternalChannel(call);
                    ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
                
                return Contexts.interceptCall(ctx, call, headers, next);
            
        ;
        
        // #2
        addServices(handlerRegistry, serverInterceptor);
        
        // #3
        server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
                .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
                .compressorRegistry(CompressorRegistry.getDefaultInstance())
                .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
                .addTransportFilter(new ServerTransportFilter() 
                    @Override
                    public Attributes transportReady(Attributes transportAttrs) 
                        InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                                .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                        InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                                .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                        int remotePort = remoteAddress.getPort();
                        int localPort = localAddress.getPort();
                        String remoteIp = remoteAddress.getAddress().getHostAddress();
                        Attributes attrWrapper = transportAttrs.toBuilder()
                                .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                                .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                                .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                        String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                        Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId =  ", connectionId);
                        return attrWrapper;
                        
                    
                    
                    @Override
                    public void transportTerminated(Attributes transportAttrs) 
                        String connectionId = null;
                        try 
                            connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                         catch (Exception e) 
                            // Ignore
                        
                        if (StringUtils.isNotBlank(connectionId)) 
                            Loggers.REMOTE_DIGEST
                                    .info("Connection transportTerminated,connectionId =  ", connectionId);
                            connectionManager.unregister(connectionId);
                        
                    
                ).build();
        
        // #4
        server.start();
    
  • #1 其实是定义一个服务器拦截器,用于设置 connection id 等信息
  • #2 往 MutableHandlerRegistry 动态添加 GRPC 方法定义
  • #3 通过上面的参数构建 GRPC 的 Server。
  • #4 启动 GRPC 服务

下面我们来看一下 GRPC 是动态添加 GRPC 方法到 Server 里面的。

BaseGrpcServer.java

    private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) 
        
        // unary common call register.
        final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
                .setType(MethodDescriptor.MethodType.UNARY)
                .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
                .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
                .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
        
        final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
                .asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
        
        final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
                .addMethod(unaryPayloadMethod, payloadHandler).build();
        handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
        
        // bi stream register.
        final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
                (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
        
        final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
                .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                        .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
                .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
                .setResponseMarshaller(ProtoUtils.marshaller(Payload

以上是关于4Nacos 配置中心源码解析之 服务端启动的主要内容,如果未能解决你的问题,请参考以下文章

4Nacos 配置中心源码解析之 服务端启动

zookeeper源码之服务端启动模块

tars源码解析1--服务端启动

tars源码解析1--服务端启动

netty服务端启动--ServerBootstrap源码解析

Netty源码分析之服务端启动