源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理相关的知识,希望对你有一定的参考价值。
源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理
我们了解到Skywalking的agent在进行启动初始化的时候会对服务进行注册,对应的逻辑是ServiceAndEndpointRegisterClient的run()方法
服务注册
ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceRegister()方法进行注册
RegisterServiceHandler的doServiceRegister()方法:
@Override public void doServiceRegister(Services request, StreamObserver<ServiceRegisterMapping> responseObserver)
ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder();
request.getServicesList().forEach(service ->
String serviceName = service.getServiceName();
if (logger.isDebugEnabled())
logger.debug("Register service, service code: ", serviceName);
int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null);
if (serviceId != Const.NONE)
KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
builder.addServices(value);
);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
- 从请求中获取服务名
- 调用serviceInventoryRegister的getOrCreate()方法根据服务名获取服务的id
- 将服务id返回给Agent
获取服务ID
serviceInventoryRegister的getOrCreate()
@Override public int getOrCreate(String serviceName, JsonObject properties)
int serviceId = getServiceInventoryCache().getServiceId(serviceName);
if (serviceId == Const.NONE)
ServiceInventory serviceInventory = new ServiceInventory();
serviceInventory.setName(serviceName);
serviceInventory.setAddressId(Const.NONE);
serviceInventory.setIsAddress(BooleanUtils.FALSE);
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingServiceId(Const.NONE);
serviceInventory.setLastUpdateTime(now);
serviceInventory.setProperties(properties);
InventoryStreamProcessor.getInstance().in(serviceInventory);
return serviceId;
- 获取ServiceInventoryCache缓存实例,调用ServiceInventoryCache对象的getServiceId()方法获取serviceId,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInventoryCacheEsDAO的get()方法,也就是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
- 如果serviceId为空,创建ServiceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成serviceId中,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建serviceId。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。
从ServiceAndEndpointRegisterClient的run()方法的逻辑我们知道如果serviceId返回0的话,会创建线程不断重试,直到服务Id都不会空
服务实例注册
ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceInstanceRegister()方法进行服务实例注册
RegisterServiceHandler的doServiceInstanceRegister()方法:
- 遍历请求的服务实例的集合,根据请求中的服务ID从缓存中获取ServiceInventory对象,这个对象是服务的相关信息
- 整合request中的host_name、os_name、ip地址、语言、进程ID等参数,放入JsonObject对象中
- 根据服务名、进程ID和主机名构建服务实例名称
- 根据请求参数调用ServiceInstanceInventoryRegister的getOrCreate()方法获取服务实例的id
- 将服务实例Id返回给Agent
获取服务实例的ID
ServiceInstanceInventoryRegister的getOrCreate()方法:
@Override public int getOrCreate(int serviceId, String serviceInstanceName, String uuid, long registerTime,
JsonObject properties)
if (logger.isDebugEnabled())
logger.debug("Get or create service instance by service instance name, service id: , service instance name: ,uuid: , registerTime: ", serviceId, serviceInstanceName, uuid, registerTime);
int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, uuid);
if (serviceInstanceId == Const.NONE)
ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
serviceInstanceInventory.setServiceId(serviceId);
serviceInstanceInventory.setName(serviceInstanceName);
serviceInstanceInventory.setInstanceUUID(uuid);
serviceInstanceInventory.setIsAddress(BooleanUtils.FALSE);
serviceInstanceInventory.setAddressId(Const.NONE);
serviceInstanceInventory.setMappingServiceInstanceId(Const.NONE);
serviceInstanceInventory.setRegisterTime(registerTime);
serviceInstanceInventory.setHeartbeatTime(registerTime);
serviceInstanceInventory.setProperties(properties);
InventoryStreamProcessor.getInstance().in(serviceInstanceInventory);
return serviceInstanceId;
- 获取ServiceInstanceInventoryCache缓存实例,调用ServiceInstanceInventoryCache对象的getServiceInstanceId()方法获取服务实例ID,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInstanceInventoryCacheDAO的get()方法,通过是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
- 如果serviceId为空,创建ServiceInstanceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成服务实例ID,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建服务实例ID。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。
同样的,我们看ServiceAndEndpointRegisterClient的run()方法中的逻辑,如果服务实例ID为空的话会一直进行重试,直到服务实例ID不为空
总结
这篇文章主要介绍了服务端对服务注册和服务实例注册的处理逻辑,这两个方法的整体逻辑是非相似,获取服务Id或者服务实例ID都是先从缓存中获取,缓存中没有就从es数据库中查询,查询到放入缓存,如果es数据库中也没有就会创建异步操作的InventoryStreamProcessor创建服务ID或服务实例ID
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话:
- 欢迎关注我❤️,点赞
以上是关于源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理的主要内容,如果未能解决你的问题,请参考以下文章
源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理