Eureka源码分析(六) TimedSupervisorTask
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eureka源码分析(六) TimedSupervisorTask相关的知识,希望对你有一定的参考价值。
参考技术A 之前我们分析了eureka的注册服务实例信息,下面我们来分析下eureka的续租。当一个租约到期后,就有两种情况,一种是过期,EurekaServer将下线过期的节点,一种是续租,当EurekaServer检测到节点还能正常通信时,将执行续租的操作。我们知道,检测节点状态是ScheduledExecutorService的schedule方法,那么定时检测节点状态的任务是怎么执行的呢,答案就是TimedSupervisorTask。我们先来看下TimedSupervisorTask都有哪些属性TimedSupervisorTask 执行时,提交 task 到 executor 执行任务。
当 task 执行正常,TimedSupervisorTask 再次提交自己到scheduler 延迟 timeoutMillis 执行。
当 task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ),再次提交自己到scheduler 延迟执行。
再来看下run方法的具体实现
续租应用实例信息的请求,映射 InstanceResource的renewLease方法,看下具体的实现
调用 AbstractInstanceRegistry的renew方法,续租应用实例信息,看下具体的实现
调用 Lease的renew方法,设置租约最后更新时间( 续租 ),看下具体的实现
整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此不需要加锁。
Eureka续租的操作就完成了。
TimedSupervisorTask的分析就到这里了。
Eureka 客户端源码分析
Eureka作为服务注册中心,主要的功能是服务注册和服务发现,是微服务框架的基础功能和核心功能。
Eureka的使用可参考:
Eureka服务端:Spring Cloud Eureka Server使用(注册中心),
Eureka客户端:Eureka Client的使用,
Eureka服务端:Eureka的高可用
Eureka分为客户端和服务端,这里主要介绍客户端源码
1、Eureka客户端主要使用EnableDiscoveryClient注解
1)@EnableDiscoveryClient源码如下
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) //主要是这个注解 public @interface EnableDiscoveryClient { /** * If true, the ServiceRegistry will automatically register the local server. * @return - {@code true} if you want to automatically register. */ boolean autoRegister() default true; }
2)、EnableDiscoveryClientImportSelector
@Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector<EnableDiscoveryClient> { @Override public String[] selectImports(AnnotationMetadata metadata) { // 1.获取需要注册到Spring的类 String[] imports = super.selectImports(metadata); AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(getAnnotationClass().getName(), true)); boolean autoRegister = attributes.getBoolean("autoRegister"); // 2.autoRegister默认为true,同时则注册AutoServiceRegistrationConfiguration类到Spring中 if (autoRegister) { List<String> importsList = new ArrayList<>(Arrays.asList(imports)); importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration"); imports = importsList.toArray(new String[0]); } return imports; } ... }
3)、 super.selectImports(metadata);
SpringFactoryImportSelector的selectImports方法 public abstract class SpringFactoryImportSelector<T> implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware { @Override public String[] selectImports(AnnotationMetadata metadata) { //1、默认isEnabled()为true if (!isEnabled()) { return new String[0]; } AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(this.annotationClass.getName(), true)); Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is " + metadata.getClassName() + " annotated with @" + getSimpleName() + "?"); //主要是这里 Find all possible auto configuration classes, filtering duplicates List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); if (factories.isEmpty() && !hasDefaultFactory()) { throw new IllegalStateException("Annotation @" + getSimpleName() + " found, but there are no implementations. Did you forget to include a starter?"); } if (factories.size() > 1) { // there should only ever be one DiscoveryClient, but there might be more than // one factory this.log.warn("More than one implementation " + "of @" + getSimpleName() + " (now relying on @Conditionals to pick one): " + factories); } return factories.toArray(new String[factories.size()]); } } //SpringFactoriesLoader.loadFactoryNames public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) { //factoryTypeName 值为org.springframework.cloud.client.discovery.EnableDiscoveryClient String factoryTypeName = factoryType.getName(); return (List)loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList()); } //SpringFactoriesLoader.loadSpringFactories private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) { MultiValueMap<String, String> result = (MultiValueMap)cache.get(classLoader); if (result != null) { return result; } else { try { //1、获取所有META-INF/spring.factories文件 Enumeration<URL> urls = classLoader != null ? classLoader.getResources("META-INF/spring.factories") : ClassLoader.getSystemResources("META-INF/spring.factories"); LinkedMultiValueMap result = new LinkedMultiValueMap(); //遍历所有spring.factories 文件 while(urls.hasMoreElements()) { URL url = (URL)urls.nextElement(); UrlResource resource = new UrlResource(url); Properties properties = PropertiesLoaderUtils.loadProperties(resource); Iterator var6 = properties.entrySet().iterator(); while(var6.hasNext()) { Entry<?, ?> entry = (Entry)var6.next(); String factoryTypeName = ((String)entry.getKey()).trim(); String[] var9 = StringUtils.commaDelimitedListToStringArray((String)entry.getValue()); int var10 = var9.length; for(int var11 = 0; var11 < var10; ++var11) { String factoryImplementationName = var9[var11]; //3、获取properties中key为EnableDiscoveryClient对应的value值列表。 result.add(factoryTypeName, factoryImplementationName.trim()); } } } cache.put(classLoader, result); return result; } catch (IOException var13) { throw new IllegalArgumentException("Unable to load factories from location [META-INF/spring.factories]", var13); } } }
获取properties中key为EnableDiscoveryClient对应的value值列表。对应的值可以在spring-cloud-netflix-eureka-client-x.x.x.RELEASE.jar中META-INF的spring.factories。
值为:org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
spring.factories的详情如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
其实,spring.factories中EnableAutoConfiguration对应的value值列表的类会在SpringBoot项目启动的时候注册到Spring容器中,EurekaClient的关键功能就在EurekaClientAutoConfiguration
2、EurekaClientConfigServerAutoConfiguration功能解析
@Configuration( proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass({EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class}) public class EurekaClientConfigServerAutoConfiguration { @Autowired( required = false) private EurekaInstanceConfig instance; @Autowired( required = false) private ConfigServerProperties server; public EurekaClientConfigServerAutoConfiguration() { } @PostConstruct public void init() { if (this.instance != null && this.server != null) { String prefix = this.server.getPrefix(); if (StringUtils.hasText(prefix) && !StringUtils.hasText((String)this.instance.getMetadataMap().get("configPath"))) { this.instance.getMetadataMap().put("configPath", prefix); } } } }
通过注解@ConditionalOnClass可知, EurekaClientConfigServerAutoConfiguration 类的产生需要EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class 这三个类先产生。
我们重点关注EurekaClient ,源码如下
@ImplementedBy(DiscoveryClient.class) public interface EurekaClient extends LookupService {
就是一个接口,并定义了默认实现类DiscoveryClient,该接口定义了Eureka客户端主要功能,包括获取服务URL、注册当前服务等功能。
3、DiscoveryClient(com.netflix.discovery.DiscoveryClient)
Eureka主要功能有: 服务注册、服务续约、服务下线、服务调用
1) 服务注册(发送注册请求到注册中心)
boolean register() throws Throwable { EurekaHttpResponse httpResponse; try { //主要的注册功能在这里,真正的实现在AbstractJerseyEurekaHttpClient.register() httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo); } catch (Exception var3) { logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3}); throw var3; } ... return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); } //AbstractJerseyEurekaHttpClient.register() @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { //1、构造一个HTTP请求 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); //2、发送post请求到serviceUrl,serviceUrl即我们在配置文件中配置的defaultZone response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); //3、返回响应状态 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
2)服务续约
本质就是发送当前应用的心跳请求
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //1、本质就是发送心跳请求 //2、真正实现为AbstractJerseyEurekaHttpClient.sendHeartBeat() httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); //3、如果请求失败,则调用注册服务请求 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } } //AbstractJerseyEurekaHttpClient.sendHeartBeat() @Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + ‘/‘ + id; ClientResponse response = null; try { //将当前实例的元信息(InstanceInfo)以及状态(UP)通过HTTP请求发送到serviceUrl WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity() && !HTML.equals(response.getType().getSubtype())) { //don‘t try and deserialize random html errors from the server eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
3)服务调用
本质就是获取调用服务名所对应的服务提供者实例信息,包括IP、port等
@Override public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure) { return getInstancesByVipAddress(vipAddress, secure, instanceRegionChecker.getLocalRegion()); } @Override public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, @Nullable String region) { if (vipAddress == null) { throw new IllegalArgumentException( "Supplied VIP Address cannot be null"); } Applications applications; //1、判断服务提供方是否是当前region,若是的话直接从localRegionApp中获取 if (instanceRegionChecker.isLocalRegion(region)) { applications = this.localRegionApps.get(); } //2、否则的话从远程region获取 else { applications = remoteRegionVsApps.get(region); if (null == applications) { logger.debug("No applications are defined for region {}, so returning an empty instance list for vip " + "address {}.", region, vipAddress); return Collections.emptyList(); } } //从applications中获取服务名称对应的实例名称列表 if (!secure) { return applications.getInstancesByVirtualHostName(vipAddress); } else { return applications.getInstancesBySecureVirtualHostName(vipAddress); } }
4)服务下线
void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } } } //AbstractJerseyEurekaHttpClient.cancel() @Override public EurekaHttpResponse<Void> cancel(String appName, String id) { String urlPath = "apps/" + appName + ‘/‘ + id; ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); //本质就是发送delete请求到注册中心 response = resourceBuilder.delete(ClientResponse.class); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
以上是关于Eureka源码分析(六) TimedSupervisorTask的主要内容,如果未能解决你的问题,请参考以下文章