Nacos源码分析专题-服务心跳
Posted IT-老牛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码分析专题-服务心跳相关的知识,希望对你有一定的参考价值。
文章目录
1. 引言
Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:
spring:
application:
name: order-service
cloud:
nacos:
discovery:
ephemeral: false # 设置实例为永久实例。true:临时; false:永久
server-addr: 192.168.150.1:8845
临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。
其中Nacos提供的心跳的API接口为:
接口描述:发送某个实例的心跳
请求类型:PUT
请求路径:
/nacos/v1/ns/instance/beat
请求参数:
名称 | 类型 | 是否必选 | 描述 |
---|---|---|---|
serviceName | 字符串 | 是 | 服务名 |
groupName | 字符串 | 否 | 分组名 |
ephemeral | boolean | 否 | 是否临时实例 |
beat | 字符串 | JSON格式字符串 | 是 |
错误编码:
错误代码 | 描述 | 语义 |
---|---|---|
400 | Bad Request | 客户端请求中的语法错误 |
403 | Forbidden | 没有权限 |
404 | Not Found | 无法找到资源 |
500 | Internal Server Error | 服务器内部错误 |
200 | OK | 正常 |
2. 客户端
在服务注册这一节中,我们说过NacosNamingService
这个类实现了服务的注册,同时也实现了服务心跳:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否是临时实例。
if (instance.isEphemeral())
// 如果是临时实例,则构建心跳信息BeatInfo
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳任务
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
serverProxy.registerService(groupedServiceName, groupName, instance);
2.1 BeatInfo
这里的BeanInfo
就包含心跳需要的各种信息:
2.2. BeatReactor
而BeatReactor
这个类则维护了一个线程池:
当调用BeatReactor
的.addBeatInfo(groupedServiceName, beatInfo)
方法时,就会执行心跳:
public void addBeatInfo(String serviceName, BeatInfo beatInfo)
NAMING_LOGGER.info("[BEAT] adding beat: to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null)
existBeat.setStopped(true);
dom2Beat.put(key, beatInfo);
// 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod()
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
心跳周期的默认值在com.alibaba.nacos.api.common.Constants
类中:
可以看到是5秒,默认5秒一次心跳。
2.3. BeatTask
心跳的任务封装在BeatTask
这个类中,是一个Runnable
,其run方法如下:
@Override
public void run()
if (beatInfo.isStopped())
return;
// 获取心跳周期
long nextTime = beatInfo.getPeriod();
try
// 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED))
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0)
nextTime = interval;
// 判断心跳结果
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE))
code = result.get(CommonParams.CODE).asInt();
if (code == NamingResponseCode.RESOURCE_NOT_FOUND)
// 如果失败,则需要 重新注册实例
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
catch (Exception ignore)
catch (NacosException ex)
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: , code: , msg: ",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
catch (Exception unknownEx)
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: , unknown exception msg: ",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
finally
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
2.4. 发送心跳
最终心跳的发送还是通过NamingProxy
的sendBeat
方法来实现:
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException
if (NAMING_LOGGER.isDebugEnabled())
NAMING_LOGGER.debug("[BEAT] sending beat to server: ", namespaceId, beatInfo.toString());
// 组织请求参数
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled)
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 发送请求,这个地址就是:/v1/ns/instance/beat
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
3.服务端
对于临时实例,服务端代码分两部分:
1)
InstanceController
提供了一个接口,处理客户端的心跳请求
2)定时检测实例心跳是否按期执行
3.1.InstanceController
与服务注册时一样,在nacos-naming
模块中的InstanceController
类中,定义了一个方法用来处理心跳请求:
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception
// 解析心跳的请求参数
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat))
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null)
if (StringUtils.isNotBlank(clientBeat.getCluster()))
clusterName = clientBeat.getCluster();
else
// fix #2533
clientBeat.setCluster(clusterName);
ip = clientBeat.getIp();
port = clientBeat.getPort();
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: , serviceName: ", clientBeat, serviceName);
// 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息
// 从Nacos的注册表中 获取实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 如果获取失败,说明心跳失败,实例尚未注册
if (instance == null)
if (clientBeat == null)
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: , serviceName: ", clientBeat, serviceName);
// 这里重新注册一个实例
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
// 尝试基于namespaceId和serviceName从 注册表中获取Service服务
Service service = serviceManager.getService(namespaceId, serviceName);
// 如果不存在,说明服务不存在,返回404
if (service == null)
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
if (clientBeat == null)
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
// 如果心跳没问题,开始处理心跳结果
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL))
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给Service
类处理这次心跳请求。调用了Service
的processClientBeat
方法
3.2.处理心跳请求
查看Service
的service.processClientBeat(clientBeat)
;方法:
public void processClientBeat(final RsInfo rsInfo)
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
可以看到心跳信息被封装到了 ClientBeatProcessor
类中,交给了HealthCheckReactor
处理,HealthCheckReactor
就是对线程池的封装,不用过多查看。
关键的业务逻辑都在ClientBeatProcessor
这个类中,它是一个Runnable
,其中的run
方法如下:
@Override
public void run()
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled())
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: ", rsInfo.toString());
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
// 获取集群信息
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取集群中的所有实例信息
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances)
// 找到心跳的这个实例
if (instance.getIp().equals(ip) && instance.getPort() == port)
if (Loggers.EVT_LOG.isDebugEnabled())
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: ", rsInfo.toString());
// 更新实例的最后一次心跳时间 lastBeat
instance.setLastBeat(以上是关于Nacos源码分析专题-服务心跳的主要内容,如果未能解决你的问题,请参考以下文章