SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)相关的知识,希望对你有一定的参考价值。

承接上文的对应的Eureka的上篇介绍,我们开始介绍,详见 [【SpringCloud技术专题】「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(上)]

原理回顾

  1. Eureka Server 提供服务注册服务,各个节点启动后,会在Eureka Server中进行注册,这样Eureka Server中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。
  2. Eureka Client 是一个Java 客户端,用于简化与Eureka Server的交互,客户端同时也具备一个内置的、使用轮询负载算法的负载均衡器。
  3. 在应用启动后,将会向Eureka Server发送心跳(默认周期为30秒),如果Eureka Server在多个心跳周期(默认3个心跳周期=90秒)没有收到某个节点的心跳,Eureka Server将会从服务注册表中把这个服务节点移除。
  4. 高可用情况下的:Eureka Server之间将会通过复制的方式完成数据的同步;
  5. Eureka Client具有缓存的机制,即使所有的Eureka Server 都挂掉的话,客户端依然可以利用缓存中的信息消费其它服务的API;

EurekaServer 启动流程分析

EurekaServer 处理服务注册、集群数据复制

EurekaClient 是如何注册到 EurekaServer 的?

刚才在org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每个方法都打了一个断点,而且现在EurekaServer已经处于Debug运行状态,那么我们就随便找一个被 @EnableEurekaClient 的微服务启动试试微服务来试试吧,直接Run。

  • 当启动后,就一定会调用注册register方法,那么就接着往下看,拭目以待;
实例注册方法机制
InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法进断点了。
  • InstanceRegistry.register顺着堆栈信息往上看,是 ApplicationResource.addInstance 方法被调用了,分析addInstance;

ApplicationResource 类

主要是处理接收 Http 的服务请求。

@POST
@Consumes("application/json", "application/xml")
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) 
    logger.debug("Registering instance  (replication=)", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    if (isBlank(info.getId())) 
        return Response.status(400).entity("Missing instanceId").build();
     else if (isBlank(info.getHostName())) 
        return Response.status(400).entity("Missing hostname").build();
     else if (isBlank(info.getAppName())) 
        return Response.status(400).entity("Missing appName").build();
     else if (!appName.equals(info.getAppName())) 
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
     else if (info.getDataCenterInfo() == null) 
        return Response.status(400).entity("Missing dataCenterInfo").build();
     else if (info.getDataCenterInfo().getName() == null) 
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) 
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) 
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) 
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
             else if (dataCenterInfo instanceof AmazonInfo) 
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) 
                 amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                
             else 
                logger.warn("Registering DataCenterInfo of type  without an appropriate id", dataCenterInfo.getClass());
            
        
    
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible

  • 这里的写法貌似看起来和我们之前 Controller 的 RESTFUL 写法有点不一样,仔细一看,原来是Jersey RESTful 框架,是一个产品级的RESTful service 和 client 框架。与Struts类似,它同样可以和hibernate,spring框架整合。

  • 看到 registry.register(info, “true”.equals(isReplication)); 注册啊,原来EurekaClient客户端启动后会调用会通过Http(s)请求,直接调到ApplicationResource.addInstance 方法,只要是和注册有关的,都会调用这个方法。

  • 接着我们深入 registry.register(info, “true”.equals(isReplication)) 查看;

@Override
public void register(final InstanceInfo info, final boolean isReplication) 
	handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
	super.register(info, isReplication);

  • handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;
private void handleRegistration(InstanceInfo info, int leaseDuration,
		boolean isReplication) 
	log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
			+ ", leaseDuration " + leaseDuration + ", isReplication "
			+ isReplication);
	publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
			isReplication));

  • 然后通过 ApplicationContext 发布了一个事件 EurekaInstanceRegisteredEvent 服务注册事件,可以给 EurekaInstanceRegisteredEvent 添加监听事件,那么用户就可以在此刻实现自己想要的一些业务逻辑。
  • 然后我们再来看看 super.register(info, isReplication) 方法,该方法是 InstanceRegistry 的父类 PeerAwareInstanceRegistryImpl 的方法。

服务户厕机制

进入PeerAwareInstanceRegistryImpl 类的 register(final InstanceInfo info, final boolean isReplication) 方法;

@Override
public void register(final InstanceInfo info, final boolean isReplication) 
	// 注释:续约时间,默认时间是常量值 90 秒
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
	// 注释:续约时间,当然也可以从配置文件中取出来,所以说续约时间值也是可以让我们自己自定义配置的
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) 
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    
	// 注释:将注册方的信息写入 EurekaServer 的注册表,父类为 AbstractInstanceRegistry
    super.register(info, leaseDuration, isReplication);
	// 注释:EurekaServer 节点之间的数据同步,复制到其他Peer
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);

进入super.register(info, leaseDuration, isReplication),如何写入EurekaServer 的注册表的,进入AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 
    try 
        read.lock();
		// 注释:registry 这个变量,就是我们所谓的注册表,注册表是保存在内存中的;
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) 
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) 
                gMap = gNewMap;
            
        
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) 
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing=, provided=", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) 
                logger.warn("There is an existing lease and the existing lease's dirty timestamp  is greater" +
                        " than the one that is being registered ", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            
         else 
            // The lease does not exist and hence it is a new registration
            synchronized (lock) 
                if (this.expectedNumberOfRenewsPerMin > 0) 
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                
            
            logger.debug("No previous lease information found; it is new registration");
        
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) 
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) 
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) 
            logger.debug("Found overridden status  for instance . Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) 
                logger.info("Not found overridden id  and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            
        
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) 
            logger.info("Storing overridden status  from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) 
            lease.serviceUp();
        
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance / with status  (replication=)",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
     finally 
        read.unlock();
    

  • 发现这个方法有点长,大致阅读,主要更新了注册表的时间之外,还更新了缓存等其它东西,大家有兴趣的可以深究阅读该方法;

集群之间的复制

replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) 
    Stopwatch tracer = action.getTimer().start();
    try 
        if (isReplication) 
            numberOfReplicationsLastMin.increment();
        
        // If it is a replication already, do not replicate again as this will create a poison replication
		// 注释:如果已经复制过,就不再复制  
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) 
            return;
        
		// 遍历Eureka Server集群中的所有节点,进行复制操作 
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) 
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) 
                continue;
            
			// 没有复制过,遍历Eureka Server集群中的node节点,依次操作,包括取消、注册、心跳、状态更新等。
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        
     finally 
        tracer.stop();
    

  • 每当有注册请求,首先更新 EurekaServer 的注册表,然后再将信息同步到其它EurekaServer的节点上去;

  • 接下来我们看看 node 节点是如何进行复制操作的,进入 replicateInstanceActionsToPeers 方法。

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) 
    try 
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) 
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case 以上是关于SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)的主要内容,如果未能解决你的问题,请参考以下文章

《微服务专题》SpringCloud-Eureka源码分析解读

SpringCloud专题之开篇及Eureka

SpringBoot 企业级核心技术学习专题

SpringCloud-技术专区-从源码层面让你认识Eureka工作流程和运作机制(下)

SpringCloud-技术专区-从源码层面让你认识Eureka工作流程和运作机制(上)

SpringCloud技术专题「Hystrix技术分析」故障切换的运作流程(含源码)