Eureka源码分析(六) TimedSupervisorTask

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eureka源码分析(六) TimedSupervisorTask相关的知识,希望对你有一定的参考价值。

参考技术A 之前我们分析了eureka的注册服务实例信息,下面我们来分析下eureka的续租。当一个租约到期后,就有两种情况,一种是过期,EurekaServer将下线过期的节点,一种是续租,当EurekaServer检测到节点还能正常通信时,将执行续租的操作。我们知道,检测节点状态是ScheduledExecutorService的schedule方法,那么定时检测节点状态的任务是怎么执行的呢,答案就是TimedSupervisorTask。我们先来看下TimedSupervisorTask都有哪些属性

TimedSupervisorTask 执行时,提交 task 到 executor 执行任务。
当 task 执行正常,TimedSupervisorTask 再次提交自己到scheduler 延迟 timeoutMillis 执行。
当 task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ),再次提交自己到scheduler 延迟执行。
再来看下run方法的具体实现

续租应用实例信息的请求,映射 InstanceResource的renewLease方法,看下具体的实现

调用 AbstractInstanceRegistry的renew方法,续租应用实例信息,看下具体的实现

调用 Lease的renew方法,设置租约最后更新时间( 续租 ),看下具体的实现

整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此不需要加锁。
Eureka续租的操作就完成了。
TimedSupervisorTask的分析就到这里了。

Eureka 客户端源码分析

Eureka作为服务注册中心,主要的功能是服务注册和服务发现,是微服务框架的基础功能和核心功能。

Eureka的使用可参考:

Eureka服务端:Spring Cloud Eureka Server使用(注册中心)

Eureka客户端:Eureka Client的使用

Eureka服务端:Eureka的高可用

 

Eureka分为客户端和服务端,这里主要介绍客户端源码

1、Eureka客户端主要使用EnableDiscoveryClient注解

1)@EnableDiscoveryClient源码如下

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class) //主要是这个注解
public @interface EnableDiscoveryClient {

	/**
	 * If true, the ServiceRegistry will automatically register the local server.
	 * @return - {@code true} if you want to automatically register.
	 */
	boolean autoRegister() default true;

}

  

2)、EnableDiscoveryClientImportSelector

@Order(Ordered.LOWEST_PRECEDENCE - 100)

public class EnableDiscoveryClientImportSelector
 extends SpringFactoryImportSelector<EnableDiscoveryClient> {
 
    
@Override
    
public String[] selectImports(AnnotationMetadata metadata) {
 
        
	// 1.获取需要注册到Spring的类
        
	String[] imports = super.selectImports(metadata);    
	AnnotationAttributes attributes = AnnotationAttributes.fromMap(
 metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
 
        
	boolean autoRegister = attributes.getBoolean("autoRegister");
 
        
	// 2.autoRegister默认为true,同时则注册AutoServiceRegistrationConfiguration类到Spring中
        
	if (autoRegister) {
            
	List<String> importsList = new ArrayList<>(Arrays.asList(imports));
            
	importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
            
	imports = importsList.toArray(new String[0]);
       
 	}
 
        
	return imports;
   
 }
	
...

}

  

3)、 super.selectImports(metadata);

 

SpringFactoryImportSelector的selectImports方法
public abstract class SpringFactoryImportSelector<T>
		implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
	@Override
	public String[] selectImports(AnnotationMetadata metadata) {
		//1、默认isEnabled()为true
		if (!isEnabled()) {
			return new String[0];
		}
		AnnotationAttributes attributes = AnnotationAttributes.fromMap(
				metadata.getAnnotationAttributes(this.annotationClass.getName(), true));

		Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
				+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");

		//主要是这里 Find all possible auto configuration classes, filtering duplicates
		List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
				.loadFactoryNames(this.annotationClass, this.beanClassLoader)));

		if (factories.isEmpty() && !hasDefaultFactory()) {
			throw new IllegalStateException("Annotation @" + getSimpleName()
					+ " found, but there are no implementations. Did you forget to include a starter?");
		}

		if (factories.size() > 1) {
			// there should only ever be one DiscoveryClient, but there might be more than
			// one factory
			this.log.warn("More than one implementation " + "of @" + getSimpleName()
					+ " (now relying on @Conditionals to pick one): " + factories);
		}

		return factories.toArray(new String[factories.size()]);
	}
}

//SpringFactoriesLoader.loadFactoryNames
 public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) {
	//factoryTypeName 值为org.springframework.cloud.client.discovery.EnableDiscoveryClient
        String factoryTypeName = factoryType.getName();
        return (List)loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList());
    }

//SpringFactoriesLoader.loadSpringFactories
    private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) {
        MultiValueMap<String, String> result = (MultiValueMap)cache.get(classLoader);
        if (result != null) {
            return result;
        } else {
            try {
		//1、获取所有META-INF/spring.factories文件
                Enumeration<URL> urls = classLoader != null ? classLoader.getResources("META-INF/spring.factories") : ClassLoader.getSystemResources("META-INF/spring.factories");
                LinkedMultiValueMap result = new LinkedMultiValueMap();
		//遍历所有spring.factories 文件
                while(urls.hasMoreElements()) {
                    URL url = (URL)urls.nextElement();
                    UrlResource resource = new UrlResource(url);
                    Properties properties = PropertiesLoaderUtils.loadProperties(resource);
                    Iterator var6 = properties.entrySet().iterator();

                    while(var6.hasNext()) {
                        Entry<?, ?> entry = (Entry)var6.next();
                        String factoryTypeName = ((String)entry.getKey()).trim();
                        String[] var9 = StringUtils.commaDelimitedListToStringArray((String)entry.getValue());
                        int var10 = var9.length;

                        for(int var11 = 0; var11 < var10; ++var11) {
                            String factoryImplementationName = var9[var11];
                            //3、获取properties中key为EnableDiscoveryClient对应的value值列表。
			    result.add(factoryTypeName, factoryImplementationName.trim());
                        }
                    }
                }

                cache.put(classLoader, result);
                return result;
            } catch (IOException var13) {
                throw new IllegalArgumentException("Unable to load factories from location [META-INF/spring.factories]", var13);
            }
        }
    }

  获取properties中key为EnableDiscoveryClient对应的value值列表。对应的值可以在spring-cloud-netflix-eureka-client-x.x.x.RELEASE.jar中META-INF的spring.factories。

值为:org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

spring.factories的详情如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration  

其实,spring.factories中EnableAutoConfiguration对应的value值列表的类会在SpringBoot项目启动的时候注册到Spring容器中,EurekaClient的关键功能就在EurekaClientAutoConfiguration

 

2、EurekaClientConfigServerAutoConfiguration功能解析

@Configuration( proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass({EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class})
public class EurekaClientConfigServerAutoConfiguration {
    @Autowired( required = false)
    private EurekaInstanceConfig instance;
    @Autowired( required = false)
    private ConfigServerProperties server;

    public EurekaClientConfigServerAutoConfiguration() {
    }

    @PostConstruct
    public void init() {
        if (this.instance != null && this.server != null) {
            String prefix = this.server.getPrefix();
            if (StringUtils.hasText(prefix) && !StringUtils.hasText((String)this.instance.getMetadataMap().get("configPath"))) {
                this.instance.getMetadataMap().put("configPath", prefix);
            }

        }
    }
}

 通过注解@ConditionalOnClass可知, EurekaClientConfigServerAutoConfiguration 类的产生需要EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class 这三个类先产生。

 

我们重点关注EurekaClient ,源码如下

@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {  

 就是一个接口,并定义了默认实现类DiscoveryClient,该接口定义了Eureka客户端主要功能,包括获取服务URL、注册当前服务等功能。

 

3、DiscoveryClient(com.netflix.discovery.DiscoveryClient)

Eureka主要功能有: 服务注册、服务续约、服务下线、服务调用

1) 服务注册(发送注册请求到注册中心)

    boolean register() throws Throwable {
        EurekaHttpResponse httpResponse;
        try {
	    //主要的注册功能在这里,真正的实现在AbstractJerseyEurekaHttpClient.register()
            httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
        } catch (Exception var3) {
            logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
            throw var3;
        }
  	...
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }


//AbstractJerseyEurekaHttpClient.register()
    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
	    //1、构造一个HTTP请求
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
	    //2、发送post请求到serviceUrl,serviceUrl即我们在配置文件中配置的defaultZone
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
           //3、返回响应状态 
	   return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

  

 2)服务续约

本质就是发送当前应用的心跳请求

  boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
	    //1、本质就是发送心跳请求
	    //2、真正实现为AbstractJerseyEurekaHttpClient.sendHeartBeat() 
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
	    //3、如果请求失败,则调用注册服务请求
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }


//AbstractJerseyEurekaHttpClient.sendHeartBeat() 
@Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + ‘/‘ + id;
        ClientResponse response = null;
        try {
	    //将当前实例的元信息(InstanceInfo)以及状态(UP)通过HTTP请求发送到serviceUrl
            WebResource webResource = jerseyClient.resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            response = requestBuilder.put(ClientResponse.class);
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity() &&
                    !HTML.equals(response.getType().getSubtype())) { //don‘t try and deserialize random html errors from the server
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

  

3)服务调用

本质就是获取调用服务名所对应的服务提供者实例信息,包括IP、port等

 

   @Override
    public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure) {
        return getInstancesByVipAddress(vipAddress, secure, instanceRegionChecker.getLocalRegion());
    }

    @Override
    public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
                                                       @Nullable String region) {
        if (vipAddress == null) {
            throw new IllegalArgumentException(
                    "Supplied VIP Address cannot be null");
        }
        Applications applications;
	//1、判断服务提供方是否是当前region,若是的话直接从localRegionApp中获取
        if (instanceRegionChecker.isLocalRegion(region)) {
            applications = this.localRegionApps.get();
        } 
	//2、否则的话从远程region获取
	else {
            applications = remoteRegionVsApps.get(region);
            if (null == applications) {
                logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
                        + "address {}.", region, vipAddress);
                return Collections.emptyList();
            }
        }
        //从applications中获取服务名称对应的实例名称列表
        if (!secure) {
            return applications.getInstancesByVirtualHostName(vipAddress);
        } else {
            return applications.getInstancesBySecureVirtualHostName(vipAddress);

        }

    }

  

4)服务下线

    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }


  //AbstractJerseyEurekaHttpClient.cancel()
    @Override
    public EurekaHttpResponse<Void> cancel(String appName, String id) {
        String urlPath = "apps/" + appName + ‘/‘ + id;
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
	    //本质就是发送delete请求到注册中心
            response = resourceBuilder.delete(ClientResponse.class);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

  

 

以上是关于Eureka源码分析(六) TimedSupervisorTask的主要内容,如果未能解决你的问题,请参考以下文章

Eureka 系列(02)服务发现源码分析

Eureka 客户端源码分析

Eureka源码解析:入口分析

Eureka源码分析

Eureka 系列(02)客户端源码分析

Spring Cloud Eureka 源码分析 —— Client端