Eureka 源码

Posted chuliang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eureka 源码相关的知识,希望对你有一定的参考价值。

client:

@EnableEurekaClient--->@EnableDiscoveryClient--->@Import EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector--->

spring-cloud-netflix-eureka-client包中有

org.springframework.cloud.client.discovery.EnableDiscoveryClient=

org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration实例化的时候要求注入EurekaClient。

实现类是com.netflix.discovery.DiscoveryClient,其中@Inject注解的构造方法中

fetchRegistry方法发出/apps请求获取全量

initScheduledTasks启动两个主要的定时任务,CacheRefreshThread发出/apps/update请求获取增量,HeartbeatThread发出/apps/appName/appId请求发起心跳。

另外,initScheduledTasks方法还向applicationInfoManager注册了一个监听器statusChangeListener,里面调用instanceInfoReplicator.onDemandUpdate()方法。

CacheRefreshThread和HeartbeatThread这两个定时任务都是延迟30秒执行的,而在这之前,spring工厂在refresh方法中的最后的finishRefresh方法中,对实现lifecycle的bean进行回调,

EurekaDiscoveryClientConfiguration的start方法会让applicationInfoManager通知所有监听器,其中就有上面说的statusChangeListener,在instanceInfoReplicator.onDemandUpdate方法中,

InstanceInfoReplicator.this.run()方法中,discoveryClient.register(),发起/apps/appName的post请求,并在请求体中附上当前服务的InstanceInfo信息,注册自己。


Server:
通常的tomcat+spring的请求处理是:tomcat poller取到数据-----CoyoteAdapter.service-----Engine,Host,Context,Wrapper的Valve依次invoke,最后调用FilterChain的filter链,链的结尾调用servlet,requestMapping获取controller,处理请求。

EurekaServer接受请求跟普通请求的区别在于,不经过servlet,而是在filter链中添加一个自己的filter:ServletContainer,匹配/eureka/**的请求,直接在filter里面处理request和response。

@EnableEurekaServer--->@Import(EurekaServerConfiguration)-->@Bean 返回name为jerseyFilterRegistration的FilterRegistrationBean,jerseyFilterRegistration是一个ServletContextInitializer,springboot的beanFactory在refresh的时候,

会同时启动一个tomcat,生成tomcat的时候,会借助TomcatStarter获取工厂所有的ServletContextInitializer,在tomcat启动之后Engine,Host,Context,Wrapper的startInternal方法依次初始化,其中就会在Context的startInternal方法中,

调用所有ServletContextInitializer的onStartup方法,向ServletContext中注册jerseyFilterRegistration初始化的时候传入的ServletContainer,这个ServletContainer就是处理eurekaServer请求的filter,在doFilter中调用service方法,并且除非满足

特定条件,并不向下调用chain.doFilter方法,也就是不会调用最后的dispatchServlet。

存储服务信息,用的是三个Map或者类似Map的结构:readOnlyCacheMap,readWriteCacheMap,registry。服务名---<appId,instance>。

当服务获取的请求到来的时候(以全量为例),进的是ApplicationsResource的getContainers方法,调用responseCache.get(key),

(注意EurekaServer的存储是分3*2的六种类型,服务名对应/全部服务/增量服务 * zip和没压缩过的full,一共是六种key。而这其中服务名对应的key应该是不存在的,并没有被使用,ribbon获取单独服务的信息,

是先获取全部服务,自己再筛选)

这时候用的是全量+zip的key,先取readOnlyCacheMap中取,取不到去readWriteCacheMap中取。

上面说的是服务获取,再看服务注册,进的是AbstractInstanceRegistry的register方法,在registry中取出服务名对应的map,根据服务id取出相应的租约,如果有了说明已经注册了忽略,如果没有,新生成一个lease。

现在既然多了一个服务信息,那么这种增加要让其他服务知道,所以在register方法中快结束的地方有invalidateCache方法,主要就是把readWriteCacheMap中,代表当前appName和全量,增量的key全部清除。

ResponseCacheImpl在给readWriteCacheMap属性赋初始值的时候,定时执行getCacheUpdateTask,逻辑是对比readWriteCacheMap和readOnlyCacheMap相同的key对应的value,如有不同就把前者的value

替换掉后者的value,上面说过,key有六种,但是appName对应的key并没有被使用,只有全量/增量* zip/full,2*2四种。

数据在两个map(readOnlyCacheMap--RO,readWriteCacheMap--RW)和一个registry数据的流向是这样的:eurekaServer刚启动的时候,RO、RW、registry中都是没有数据的,第一个服务启动,先获取全部服务,

结果为空,再通过注册listener的方式发起注册请求(见Client源码),这个时候registry中是有一条服务数据的,30秒后,client会通过定时任务发起获取服务

(还是全量,因为此时applications.getRegisteredApplications().size() == 0)的请求,对应的key在RO中是没有的,所以拿着key去RW中找,RW中也没有,

顺着responseCache.get--->getValue--->readWriteCacheMap.get--->localCache.getOrLoad--->

loadSync--->loader.load(这个loader是readWriteCacheMap在初始化的时候传入的,load方法中从registry中取出数据)。而3分钟后,getCacheUpdateTask的定时任务执行,对比RW和RO,RO中有的而RW

中没有的(可能是因为register引起的invalidate),重新赋值。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
eureka剔除过期服务逻辑在AbstractInstanceRegistry中的evict方法

@EnableEurekaServer--->@Import(EurekaServerConfiguration.class)--->@Import(EurekaServerInitializerConfiguration.class)

--->实现SmartLifecycle接口,start方法中调用eurekaServerBootstrap.contextInitialized--->initEurekaServerContext()--->this.registry.openForTraffic--->AbstractInstanceRegistry.init

--->evictionTimer.schedule(EvictionTask)--->evict(compensationTimeMs)

进入方法,isLeaseExpirationEnabled方法中,判断eureka.server.enableSelfPreservation是否为true,默认为true,if (!isSelfPreservationModeEnabled()) 这个判断不会进入,会判断

getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold,numberOfRenewsPerMinThreshold是每分钟心跳数的阈值,每次有新服务注册的时候都会计算一次,(每15分钟也有个定时任务计算)

由于expectedNumberOfRenewsPerMin根据默认值计算是2(defaultOpenForTrafficCount * 2),所以注册一次再加二就是4,再乘以0.85的系数取整就是3,而此时只有一个服务,30秒一次的话是每分钟2次心跳,

不满足getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold,这样的话在服务数比较少的时候,大于号是不会成立的,但是如果服务数量比较多,比如6个,那么心跳每分钟12次,

numberOfRenewsPerMinThreshold = (6+1)* 2 * 0.85 取整 = 11,大于号就会成立

服务端遵循的原则就是,谨慎的删除服务(服务比较多的时候,删除比例也有上限),因为删除一个服务,就要重新生成缓存,比较消耗性能,而一个服务的短暂过期可能是因为网络暂时的抖动。

这样就会保留连接不上的服务,客户端那边就要进行处理,比如自己过滤过期服务








 

以上是关于Eureka 源码的主要内容,如果未能解决你的问题,请参考以下文章

注册中心 Eureka 源码解析 —— 调试环境搭建(含源码)

Eureka源码分析-环境构建篇

Eureka源码分析-环境构建篇

Wireshark抓包分析Eureka注册发现协议

源码学习-eurekaIdea导入eureka源码

Eureka源码解析:eureka-client启动原理分析