nacos源码分析-服务注册(服务端)
Posted 墨家巨子@俏如来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nacos源码分析-服务注册(服务端)相关的知识,希望对你有一定的参考价值。
安装Nacos源码
上一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3 ,
解压之后使用IDEA工具导入即可。
但是编译过后发现代码会报错,主要是缺少实体类,比如:
安装protobuf
这主要是应该nacos数据通信底层使用到protobuf进行序列化(与JSON类似),是Google提供的一种数据序列化协议
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
所以这里我们需要安装protobuf ,先去下载 https://github.com/protocolbuffers/protobuf/releases,下载window版本如下:
- 下载之后解压
- 然后需要配置环境变量
- 找到consistency模块,进入src/main
- 进入main目录,执行cmd命令
protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto
效果如下:
启动Nacos
找到console控制台,启动Nacos,第一次启动会报错,因为默认是以集群方式启动,会出现jdbc.properties找不到的错误
- 然后指定为单机启动,指定VM参数
- 启动成功
- 访问 http://localhost:8848/nacos/index.html 进入控制台
- 到这里,nacos服务端的源码就启动成功了,那么我们尝试启动nacos-client程序,让他注册到nacos-server
- 查看控制台,nacos-client成功注册到服务端
服务注册
在上一章节《Nacos源码分析-服务注册(客户端)》我们有分析到,nacos-client提交注册的地址是post /nacos/v1/ns/instance
,那么我们在nacos-server源码中找到该接口,它位于 naming 模块中的/controllers包下的InstanceController
接口中。源码如下
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController
@Autowired
private SwitchDomain switchDomain;
@Autowired
private PushService pushService;
@Autowired
private ServiceManager serviceManager;
...省略...
/**
注册一个新的实例
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
//request请求对象中包括了注册的服务的port,namespaceId,groupName,serviceName,ip,集群名等等
public String register(HttpServletRequest request) throws Exception
//拿到注册的服务的:namespaceId,默认是public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//拿到注册的服务的:serviceName服务名会把组名加在前面,比如:DEFAULT_GROUP@@nacos-client
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//检查服务名的格式:groupName@@serviceName
NamingUtils.checkServiceNameFormat(serviceName);
//解析请求参数,封装服务实例对戏,把注册的服务封装为Instance,其中包括IP,端口,服务名等
final Instance instance = parseInstance(request);
//使用ServiceManger注册服务实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
//解析要注册的服务实例
private Instance parseInstance(HttpServletRequest request) throws Exception
//拿到服务名 DEFAULT_GROUP@@nacos-client
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//拿到app,没配置就是:unknown
String app = WebUtils.optional(request, "app", "DEFAULT");
//拿到注册服务的:IP,是否开启服务,权重,健康状况,等封装为Instance 对象
Instance instance = getIpAddress(request);
instance.setApp(app);
instance.setServiceName(serviceName);
// Generate simple instance id first. This value would be updated according to
// 生成实例的ID:192.168.174.1#8080#DEFAULT#DEFAULT_GROUP@@nacos-client
instance.setInstanceId(instance.generateInstanceId());
//设置最后的心跳时间为当前时间
instance.setLastBeat(System.currentTimeMillis());
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
if (StringUtils.isNotEmpty(metadata))
instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
//验证实例
instance.validate();
return instance;
register方法中会从请求对象中拿到注册的参数比如IP,是否开启服务,权重,健康状况等,然后封装为 instance对象,交给 serviceManager.registerInstance 去注册,下面是 serviceManager.registerInstance的源码
缓存和初始化serivce
@Component
public class ServiceManager implements RecordListener<Service>
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
...省略部分代码...
//注册服务实例
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException
//1.会尝试从serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,
// 并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager的一个ConcurrentHashMap中
// 服务注册表的结构是Map<String,Map<String,Service>>
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从注册表中获取服务,注册表是一个Map<String,Map<String,Service>>结构,
// 先根据namespaceId取得到Map<String,Service>,然后再根据serviceName取Service
Service service = getService(namespaceId, serviceName);
//参数无效,没有找到服务
if (service == null)
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
//添加 instance 服务实例到注册表
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
...省略部分代码...
//二.创建service,并初始化
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException
Service service = getService(namespaceId, serviceName);
//如果服务不存在就创建一个service
if (service == null)
Loggers.SRV_LOG.info("creating empty service :", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null)
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
service.validate();
//保存service和初始化service
putServiceAndInit(service);
if (!local)
addOrReplaceService(service);
//保存service和初始化service
private void putServiceAndInit(Service service) throws NacosException
//保存service
putService(service);
service = getService(service.getNamespaceId(), service.getName());
//初始化service
service.init();
//consistencyService.listen实现数据一致性监听
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] ", service.toJson());
//保存service到注册表中
public void putService(Service service)
if (!serviceMap.containsKey(service.getNamespaceId()))
synchronized (putServiceLock)
if (!serviceMap.containsKey(service.getNamespaceId()))
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
//把注册的服务存储到Map中
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
registerInstance做了三个事情
-
通过putService()方法将服务缓存到内存
-
service.init()建立心跳机制
-
consistencyService.listen实现数据一致性监听
registerInstance方法会尝试从ServiceManager#serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager#serviceMap中。
该Map是一个ConcurrentHashMap,结构是Map<String,Map<String,Service>>。第一个Key是NamespaceId 如:public ,第二个key是服务名,如 : DEFAULT_GROUP@@nacos-client
这就是nacos中的的服务注册表,用来存放注册的服务实例的Map.
注意:service和instance的关系是,一个service中包含一个 Map<String, Cluster> , 一个Cluster中包含一个 Set。
- service代表一个服务:比如用户服务
- Cluster代表服务集群,比如2个用户服务形成一个集群
- 而一个集群中有多个服务实例,所以Cluster中有了Set 来保存服务实例
除此之外还会调用 com.alibaba.nacos.naming.core.Service#init 方法对service进行初始化,下面是init方法的源码
public void init()
//clientBeatCheckTask 是一个Runnable,它持有service,它的作用是
//检查并更新临时实例的状态,如果它们已过期,则将其删除
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet())
entry.getValue().setService(this);
entry.getValue().init();
//定时任务:定时检查服务的健康状况,5S一次
public static void scheduleCheck(ClientBeatCheckTask task)
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
service.init 初始化方法中主要是把service封装到 ClientBeatCheckTask 对象中,ClientBeatCheckTask 是一个Runnable线程对象,然后使用定时任务5s执行一次健康检查。 ClientBeatCheckTask 的作用是 : 检查并更新临时实例的状态,如果它们已过期,则将其删除
下面是 com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run 线程对象的源码
public void run()
try
if (!getDistroMapper().responsible(service.getName()))
return;
if (!getSwitchDomain().isHealthCheckEnabled())
return;
//拿到服务中的所有实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances)
//当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut())
if (!instance.isMarked())
if (instance.isHealthy())
//健康状态设置为false
instance.setHealthy(false);
Loggers.EVT_LOG
.info("POS IP-DISABLED valid: :@@, region: , msg: client timeout after , last beat: ",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//发布时间:服务状态改变
getPushService().serviceChanged(service);
//发布时间:服务实例心跳超时事件
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
if (!getGlobalConfig().isExpireInstance())
return;
// then remove obsolete instances:
for (Instance instance : instances)
if (instance.isMarked())
continue;
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout())
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: , ip: ", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
catch (Exception e)
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
run方法中会拿到当前service的所有instance,然后循环 , 如果:当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时,然后会改变instance的Healthy健康状态Wie false; 并抛出 服务实例心跳超时事件
getPushService().serviceChanged(service):方法很有意思,他的作用是通知 nacos-client该服务已经下线(UDP协议 push),这样的话nacos-client就会从本地剔除掉下线的服务。这就是它和eureka不一样的地方,eureka使用的是pull.而 nacos采用pull + push模式。 具体源码见: PushService#onApplicationEvent
public void onApplicationEvent(ServiceChangeEvent event)
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
//使用定时任务 1s 一次
Future future = GlobalExecutor.scheduleUdpSender(() ->
try
//服务改变,添加到 push队列
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients))
return;
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values())
if (client.zombie())
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
以上是关于nacos源码分析-服务注册(服务端)的主要内容,如果未能解决你的问题,请参考以下文章