源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理相关的知识,希望对你有一定的参考价值。

源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理

我们了解到Skywalking的agent在进行启动初始化的时候会对服务进行注册,对应的逻辑是ServiceAndEndpointRegisterClient的run()方法

服务注册

ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceRegister()方法进行注册

RegisterServiceHandler的doServiceRegister()方法:

@Override public void doServiceRegister(Services request, StreamObserver<ServiceRegisterMapping> responseObserver) 
        ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder();
        request.getServicesList().forEach(service -> 
            String serviceName = service.getServiceName();
            if (logger.isDebugEnabled()) 
                logger.debug("Register service, service code: ", serviceName);
            
            int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null);

            if (serviceId != Const.NONE) 
                KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
                builder.addServices(value);
            
        );

        responseObserver.onNext(builder.build());
        responseObserver.onCompleted();
    
  1. 从请求中获取服务名
  2. 调用serviceInventoryRegister的getOrCreate()方法根据服务名获取服务的id
  3. 将服务id返回给Agent

获取服务ID

serviceInventoryRegister的getOrCreate()

@Override public int getOrCreate(String serviceName, JsonObject properties) 
        int serviceId = getServiceInventoryCache().getServiceId(serviceName);

        if (serviceId == Const.NONE) 
            ServiceInventory serviceInventory = new ServiceInventory();
            serviceInventory.setName(serviceName);
            serviceInventory.setAddressId(Const.NONE);
            serviceInventory.setIsAddress(BooleanUtils.FALSE);

            long now = System.currentTimeMillis();
            serviceInventory.setRegisterTime(now);
            serviceInventory.setHeartbeatTime(now);
            serviceInventory.setMappingServiceId(Const.NONE);
            serviceInventory.setLastUpdateTime(now);
            serviceInventory.setProperties(properties);

            InventoryStreamProcessor.getInstance().in(serviceInventory);
        
        return serviceId;
    
  1. 获取ServiceInventoryCache缓存实例,调用ServiceInventoryCache对象的getServiceId()方法获取serviceId,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInventoryCacheEsDAO的get()方法,也就是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
  2. 如果serviceId为空,创建ServiceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成serviceId中,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建serviceId。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。

从ServiceAndEndpointRegisterClient的run()方法的逻辑我们知道如果serviceId返回0的话,会创建线程不断重试,直到服务Id都不会空

服务实例注册

ServiceAndEndpointRegisterClient的run()方法中会调用RegisterServiceHandler的doServiceInstanceRegister()方法进行服务实例注册

RegisterServiceHandler的doServiceInstanceRegister()方法:

  1. 遍历请求的服务实例的集合,根据请求中的服务ID从缓存中获取ServiceInventory对象,这个对象是服务的相关信息
  2. 整合request中的host_name、os_name、ip地址、语言、进程ID等参数,放入JsonObject对象中
  3. 根据服务名、进程ID和主机名构建服务实例名称
  4. 根据请求参数调用ServiceInstanceInventoryRegister的getOrCreate()方法获取服务实例的id
  5. 将服务实例Id返回给Agent

获取服务实例的ID

ServiceInstanceInventoryRegister的getOrCreate()方法:

@Override public int getOrCreate(int serviceId, String serviceInstanceName, String uuid, long registerTime,
        JsonObject properties) 
        if (logger.isDebugEnabled()) 
            logger.debug("Get or create service instance by service instance name, service id: , service instance name: ,uuid: , registerTime: ", serviceId, serviceInstanceName, uuid, registerTime);
        

        int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, uuid);

        if (serviceInstanceId == Const.NONE) 
            ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
            serviceInstanceInventory.setServiceId(serviceId);
            serviceInstanceInventory.setName(serviceInstanceName);
            serviceInstanceInventory.setInstanceUUID(uuid);
            serviceInstanceInventory.setIsAddress(BooleanUtils.FALSE);
            serviceInstanceInventory.setAddressId(Const.NONE);
            serviceInstanceInventory.setMappingServiceInstanceId(Const.NONE);

            serviceInstanceInventory.setRegisterTime(registerTime);
            serviceInstanceInventory.setHeartbeatTime(registerTime);

            serviceInstanceInventory.setProperties(properties);

            InventoryStreamProcessor.getInstance().in(serviceInstanceInventory);
        
        return serviceInstanceId;
    
  1. 获取ServiceInstanceInventoryCache缓存实例,调用ServiceInstanceInventoryCache对象的getServiceInstanceId()方法获取服务实例ID,这个方法中会先从缓存中查找,找到直接返回,没有找到就会调用ServiceInstanceInventoryCacheDAO的get()方法,通过是通过ElasticSearch客户端查询根据指定的Document Id进行查询sequence,返回结果并放入缓存,下次从缓存中直接获取到。
  2. 如果serviceId为空,创建ServiceInstanceInventory对象,设置相关参数,调用InventoryStreamProcessor实例的in()方法对该服务生成服务实例ID,放入缓存中下次直接能取到,底层其实是调用的DataCarrier的produce()方法将数据放入缓存中,等待消费者消费创建服务实例ID。InventoryStreamProcessor是异步的过程,不会阻塞当前方法。

同样的,我们看ServiceAndEndpointRegisterClient的run()方法中的逻辑,如果服务实例ID为空的话会一直进行重试,直到服务实例ID不为空

总结

这篇文章主要介绍了服务端对服务注册和服务实例注册的处理逻辑,这两个方法的整体逻辑是非相似,获取服务Id或者服务实例ID都是先从缓存中获取,缓存中没有就从es数据库中查询,查询到放入缓存,如果es数据库中也没有就会创建异步操作的InventoryStreamProcessor创建服务ID或服务实例ID

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

  1. 欢迎关注我❤️,点赞

    以上是关于源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理的主要内容,如果未能解决你的问题,请参考以下文章

    源码角度了解Skywalking之服务端OAP对服务注册与服务实例注册的处理

    源码角度了解Skywalking之建立连接与服务注册

    源码角度了解Skywalking之SPI在SKywalking中应用

    源码角度了解Skywalking之告警机制是怎么实现的

    源码角度了解Skywalking之Trace可以跨线程吗

    源码角度了解Skywalking之@Trace注解的原理