4Nacos 配置中心源码解析之 服务端启动
Posted carl-zhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4Nacos 配置中心源码解析之 服务端启动相关的知识,希望对你有一定的参考价值。
上一篇文章中我们使用 -Dnacos.standalone=true
本地启动了 Nacos 服务器,并且可以在 http://localhost:8848/nacos
通过 nacos/nacos
用户名密码就可以访问 nacos 控制页面。下面我们就来大体看一下 Nacos 在启动的时候干了哪些核心的事。
1、Nacos 认证服务
Nacos 中的 nacos-console
项目依赖了与配置中心相关的以下几个模块:
- nacos-config:配置中心项目模块
- nacos-plugin-default-impl:Nacos 插件模块:主要是用户认证授权相关操作,这里使用的 Spring Security 来做安全认证。相关的配置类为:
NacosAuthConfig
1.1 内存数据库加载数据
JVM 启动参数添加 -Dnacos.standalone=true
,这个时候在 DynamicDataSource#getDataSource
就会初始化 LocalDataSourceServiceImpl
。
DynamicDataSource#getDataSource()
public synchronized DataSourceService getDataSource()
try
// Embedded storage is used by default in stand-alone mode
// In cluster mode, external databases are used by default
if (PropertyUtil.isEmbeddedStorage())
if (localDataSourceService == null)
localDataSourceService = new LocalDataSourceServiceImpl();
localDataSourceService.init();
return localDataSourceService;
else
if (basicDataSourceService == null)
basicDataSourceService = new ExternalDataSourceServiceImpl();
basicDataSourceService.init();
return basicDataSourceService;
catch (Exception e)
throw new RuntimeException(e);
LocalDataSourceServiceImpl#initialize
会使用 derby
这个内存数据库来保存数据。保存数据地址为:$user.home/nacos/data/derby-data
。
然后调用 LocalDataSourceServiceImpl#reload
加载 console/resources/META-INF/schema.sql 里面的数据到内存数据库当中。其中就包括我们登陆使用的 nacos/nacos
用户名密码
1.2 nacos plugin 暴露用户认证
在 plugin-default-impl
模块中的com.alibaba.nacos.plugin.auth.impl.controller
包里面包含了以下几个 Http 服务:
PermissionController
:权限操作服务:获取所有权限、添加角色权限、删除角色权限等接口RoleController
:角色操作服务:获取所有角色、查询角色、添加角色、删除角色等接口UserController
:用户操作服务:包括创建用户、删除用户、修改用户、获取所有用户、用户登陆等接口
Nacos 使用的 Spring Seurity 做的权限管理,默认支持 Nacos 类型权限也就是数据库管理权限以及 Ldap 权限管理。默认使用数据库 RBAC 权限管理。 Spring Security Web 配置类为:NacosAuthConfig。
1.3 用户登陆时序图
以上就是 Nacos 登陆认证时序图。用户的具体的信息会缓存到 ConcurrentHashMap
当中。通过 Spring 定时器 NacosUserDetailsServiceImpl#reload
定时从数据库刷新缓存中的数据。因为我们在初始化数据库的时候会调用以下 SQL 语句保存数据到内存数据库 derby
当中。
CREATE TABLE users (
username varchar(50) NOT NULL PRIMARY KEY,
password varchar(500) NOT NULL,
enabled boolean NOT NULL DEFAULT true
);
INSERT INTO users (username, password, enabled) VALUES
('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);
这条数据就是加载到NacosUserDetailsServiceImpl
的缓存 Map 中,这样我们就可以使用 nacos/nacos
用户名密码进行授权登陆了。
2、暴露 Http 服务给控制台
下面我们来讨论一下 console
服务启动之后暴露了哪些 http 服务给控制台,在这里我们排除 naming
服务只讨论配置服务相关的 rest 服务.
2.1 console 服务
我们先来看一下 console
服务中的 ConsoleConfig
这个配置类。
ConsoleConfig.java
@Component
@EnableScheduling
@PropertySource("/application.properties")
public class ConsoleConfig
@Autowired
private ControllerMethodsCache methodsCache;
/**
* Init.
*/
@PostConstruct
public void init()
methodsCache.initClassMethod("com.alibaba.nacos.core.controller");
methodsCache.initClassMethod("com.alibaba.nacos.naming.controllers");
methodsCache.initClassMethod("com.alibaba.nacos.config.server.controller");
methodsCache.initClassMethod("com.alibaba.nacos.console.controller");
@Bean
public CorsFilter corsFilter()
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader("*");
config.setMaxAge(18000L);
config.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", config);
return new CorsFilter(source);
@Bean
public XssFilter xssFilter()
return new XssFilter();
@Bean
public Jackson2ObjectMapperBuilderCustomizer jacksonObjectMapperCustomization()
return jacksonObjectMapperBuilder -> jacksonObjectMapperBuilder.timeZone(ZoneId.systemDefault().toString());
首先配置两个 Filter 过滤器:CorsFilter
是为了解决 console 前后端分享接口调用跨域问题;XssFilter
为了解决跨域攻击问题。
ConsoleConfig#init
方法是解析传入包里面的 Controller
中 @RequestMapping
方法缓存 http 请求与 @ReqeustMapping
的方法映射。这样在 Filter 里面就可以过滤掉非法请求。
下面我们来分析一下 console
模块下的 Controller:
HealthController
:Nacos 健康的操作,比如:服务健康检测,查看 Config 与 Naming 是否对外服务。NamespaceController
:Nacos 命名空间相关的操作,比如:获取所有命名空间信息、获取命名空间详情、创建命名空间、修改命名空间、删除命名空间等。ServerStateController
:Nacos 服务状态的操作,比如:查看服务状态
2.2 core 服务
下面我们来分析一下 core
模块下的 Controller:
NacosClusterController
:Nacos 集群相关的操作,比如:获取自身节点信息,获取所有节点信息、获取所有节点地址、查看当前节点是否健康等。ServerLoaderController
:Nacos 连接相关的操作,比如:获取当前服务的 Client 连接信息、获取当前服务的服务状态、当前服务重新连接客户端、获取当前服务指标等。CoreOpsController
:Nacos 基础操作,比如:执行 raft 命令、获取 ID 生成规则相关信息、设置日志级别CoreOpsV2Controller
:Nacos 基础操作v2,比如:执行 raft 命令、获取 ID 生成规则相关信息、设置日志级别NacosClusterV2Controller
:Nacos 集群相关的操作v2,比如:获取自身节点信息,获取所有节点信息、获取所有节点地址等。
2.3 config 服务
下面我们来分析一下 config
模块下的 Controller:
CapacityController
:获取租户的配置能力、修改租户的配置能力ClientMetricsController
:获取配置中心集群的指标信息、获取当前节点所有客户端的指标CommunicationController
:把指定的配置保存到本地文件当中、获取 grpc 连接信息、获取 http 长轮训状态ConfigController
:发布配置信息、获取配置信息、删除配置信息、查询配置信息等ConfigOpsController
:把当前所有配置保存到本地文件当中、设置日志级别、使用 derby 内存数据库动态传入 SQL 操作、通过文件动态上传配置HealthController
:判断当前节点配置中心的健康情况HistoryController
:分页查询配置的历史记录、ID查询配置的历史记录详情、ID查询配置的上一个历史记录详情、根据命名空间查询配置的历史记录ListenerController
:根据 IP 获取所有的订单信息
3、暴露 Grpc 服务给客户端
在 console
模块启动的时候会启动两个 GRPC 服务端:一个是 SDK 使用;一个 Cluster 使用。它们两个暴露的服务都是一样的只不过是暴露的端口不一样:
- SDK 服务:暴露端口 9848,Nacos 的启动端口 8848 + 1000(
GrpcSdkServer#rpcPortOffset
), - Cluster 服务:暴露端口 9849,Nacos 的启动端口 8848 + 1001(
GrpcClusterServer#rpcPortOffset
),
GRPC 类结构如下:
GrpcClusterServer
:Cluster 偏移端口、获取 Cluster RPC 线程池GrpcSdkServer
:Sdk 偏移端口、获取 Sdk RPC 线程池BaseGrpcServer
:服务启动的具体实现BaseRpcServer
:RPC 基类实现,注册 Payload(GRPC请求响应类)、RPC服务启动
3.1 注册 Payload
Nacos 暴露的 Grpc 服务的请求参数与响应参数都基类都是 Payload,在基类 BaseRpcServer
中通过 static 代码块注册 Payload。
BaseRpcServer.java
static
PayloadRegistry.init();
通过 Java 的 SPI 机制分别在 client
模块加载到 ClientPayloadPackageProvider
以及 config
加载到 ConfigPayloadPackageProvider
。
client/META-INF/services/com.alibaba.nacos.common.remote.PayloadPackageProvider
public class ClientPayloadPackageProvider implements PayloadPackageProvider
private final Set<String> scanPackage = new HashSet<>();
scanPackage.add("com.alibaba.nacos.api.naming.remote.request");
scanPackage.add("com.alibaba.nacos.api.remote.request");
scanPackage.add("com.alibaba.nacos.api.config.remote.request");
scanPackage.add("com.alibaba.nacos.api.naming.remote.response");
scanPackage.add("com.alibaba.nacos.api.config.remote.response");
scanPackage.add("com.alibaba.nacos.api.remote.response");
@Override
public Set<String> getScanPackage()
return scanPackage;
下面是 config 的配置:
config/META-INF/services/com.alibaba.nacos.common.remote.PayloadPackageProvider
public class ConfigPayloadPackageProvider implements PayloadPackageProvider
private final Set<String> scanPackage = new HashSet<>();
scanPackage.add("com.alibaba.nacos.api.remote.request");
scanPackage.add("com.alibaba.nacos.api.remote.response");
scanPackage.add("com.alibaba.nacos.api.config.remote.request");
scanPackage.add("com.alibaba.nacos.api.config.remote.response");
@Override
public Set<String> getScanPackage()
return scanPackage;
然后把这两个类所定义扫描的包以类名以及类 Class 注册到 PayloadRegistry.REGISTRY_REQUEST
当中。当进行 GRPC 调用的时候提供给 GrpcUtils
进行请求响应对象转换。
3 .2 启动 grpc 服务
BaseRpcServer.java
@PostConstruct
public void start() throws Exception
String serverName = getClass().getSimpleName();
Loggers.REMOTE.info("Nacos Rpc server starting at port ", serverName, getServicePort());
startServer();
Loggers.REMOTE.info("Nacos Rpc server started at port ", serverName, getServicePort());
Runtime.getRuntime().addShutdownHook(new Thread(() ->
Loggers.REMOTE.info("Nacos Rpc server stopping", serverName);
try
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos Rpc server stopped successfully...", serverName);
catch (Exception e)
Loggers.REMOTE.error("Nacos Rpc server stopped fail...", serverName, e);
));
这个类里面的逻辑比较简单:
- 定义了服务启动接口
startServer()
,由子类实现 - 注册一个服务停止后调用的钩子函数
stopServer()
BaseGrpcServer.java
@Override
public void startServer() throws Exception
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// #1
ServerInterceptor serverInterceptor = new ServerInterceptor()
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next)
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName()))
Channel internalChannel = getInternalChannel(call);
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
return Contexts.interceptCall(ctx, call, headers, next);
;
// #2
addServices(handlerRegistry, serverInterceptor);
// #3
server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
.maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new ServerTransportFilter()
@Override
public Attributes transportReady(Attributes transportAttrs)
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
int remotePort = remoteAddress.getPort();
int localPort = localAddress.getPort();
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = ", connectionId);
return attrWrapper;
@Override
public void transportTerminated(Attributes transportAttrs)
String connectionId = null;
try
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
catch (Exception e)
// Ignore
if (StringUtils.isNotBlank(connectionId))
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = ", connectionId);
connectionManager.unregister(connectionId);
).build();
// #4
server.start();
- #1 其实是定义一个服务器拦截器,用于设置
connection id
等信息 - #2 往 MutableHandlerRegistry 动态添加 GRPC 方法定义
- #3 通过上面的参数构建 GRPC 的 Server。
- #4 启动 GRPC 服务
下面我们来看一下 GRPC 是动态添加 GRPC 方法到 Server 里面的。
BaseGrpcServer.java
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor)
// unary common call register.
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
.addMethod(unaryPayloadMethod, payloadHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload以上是关于4Nacos 配置中心源码解析之 服务端启动的主要内容,如果未能解决你的问题,请参考以下文章