SpringCloud源码分析 (Eureka-Server-入口分析和处理Client状态请求)
Posted 959_1x
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud源码分析 (Eureka-Server-入口分析和处理Client状态请求)相关的知识,希望对你有一定的参考价值。
文章目录
1.自动配置入口
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
1.1 @EnableEurekaServer
可以看到该类是一个配置类,在其中创建了Marker实例。
通过上面分析我们知道通过@EnableEurekaServer注解,会创建Marker实例,从而让EurekaServerAutoConfiguration这个自动配置类生效
1.2 EurekaServerAutoConfiguration
该配置类具有一个条件注解:要求必须要有一个EurekaServerMarkerConæguration.Marker实例这个配置类才会起作用。
在这个Eureka Server自动配置类启动后,其会创建一个很重要的实例,InstanceRegistry。该实例及其父类中的很多方法是我们后面要调用到的。
1.3 PeerEurekaNodes
服务器之间拷贝用得到, 代表的是服务端Eureka集群
2.处理客户端状态修改请求
现在我们分析第一个请求,先找到处理器,我们知道SpringCloud用的是jersey框架,与SpringMVC框架不同的是SpringMVC用的是Controller作为处理器,而jersey用的是Resource,所以我们找到InstanceResource:
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp)
try
if (registry.getInstanceByAppAndId(app.getName(), id) == null)
logger.warn("Instance not found: /", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
if (isSuccess)
logger.info("Status updated: - - ", app.getName(), id, newStatus);
return Response.ok().build();
else
logger.warn("Unable to update status: - - ", app.getName(), id, newStatus);
return Response.serverError().build();
catch (Throwable e)
logger.error("Error updating instance for status ", id,
newStatus);
return Response.serverError().build();
2.1 根据微服务名称和instanceId从注册表中获取实例
先看registry.getInstanceByAppAndId方法,根据微服务名称和instanceId查询注册表中的实例:
//AbstractInstanceRegistry.java
public InstanceInfo getInstanceByAppAndId(String appName, String id)
return this.getInstanceByAppAndId(appName, id, true);
//AbstractInstanceRegistry.java
public InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions)
//registry就是我们服务端本地的注册表,双层map
//外层map,key是微服务名称,value是内层map
//内层map,key是InstanceInfo的Id,value是Lease续约对象,包装了InstanceInfo
//先根据微服务名获取内层map
Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (leaseMap != null)
//再根据instanceId获取续约对象
lease = leaseMap.get(id);
if (lease != null
&& (!isLeaseExpirationEnabled() || !lease.isExpired()))
//lease不空,并且:
//isLeaseExpirationEnabled:续约过期是否开启,底层其实是判断是否开启了
// 自我保护机制,如果开启了自我保护机制,这里就会返回false,关闭续约过期
// 则!isLeaseExpirationEnabled()值就是true,不会进行第二个条件判断了
// 也就意味着只要注册表中有这个实例信息,就直接返回,无论它是否过期
//相反如果没有开启自我保护机制,则会开启续约过期,就需要检查当前实例
//的续约信息有没有过期,只会返回没有过期的实例信息
//把lease对象包装成InstanceInfo返回
return decorateInstanceInfo(lease);
else if (includeRemoteRegions)
//上面只要没有找到,如果includeRemoteRegions为true
//则会尝试去远程region的注册表中查找
//从远程Region中获取有没有该实例的InstanceInfo
for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values())
//regionNameVSRemoteRegistry也是一个map
//key是regionName,value是RemoteRegionRegistry,远程注册表
//根据微服务名称获取Application
Application application = remoteRegistry.getApplication(appName);
if (application != null)
//根据instanceId获取对应的InstanceInfo
return application.getByInstanceId(id);
return null;
服务端的注册表
双重Map, 外部的key为微服务名称, value为Map, 内层Map的key为InstanceId, Value为lease, 续约对象
2.2 处理Client状态改变
- 本地状态
- 同步到其他Server
statusUpdate
本地状态改变
//AbstractInstanceRegistry.java
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String lastDirtyTimestamp,
boolean isReplication)
try
read.lock();//更新操作为什么上读锁?
//后面服务端很多流程分析,都会看到修改操作,加上了读锁
//先简单提一下,流程全部分析完,最后还会单独说一下
//读锁对应有一个写锁,读锁之间不互斥,写锁之间、读写锁之间是互斥的。
//如果一个线程持有读锁,其他线程再去获取读锁都是可以的立即获取到的
// 但是如果获取写锁就会阻塞了
//相反,如果一个线程持有写锁,其他线程无论想获取读锁还是写锁都会阻塞。
//所以这里修改操作用读锁,而读取操作却用写锁
//首要原因是因为读的操作只有一个地方,而写的操作有很多地方
//其次是希望写的操作之间不会互斥,可以同时进行,而读的操作进行的时
// 候是不允许其他线程写的,保证读的时候数据的稳定性。
//写的操作可以多个线程同时进行,所以它本身肯定需要保证线程安全
//状态更新计数器+1
STATUS_UPDATE.increment(isReplication);
//根据微服务名称,从注册表中获取内层map
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null)
//根据instanceId获取续约对象
lease = gMap.get(id);
if (lease == null)
return false;
else
//刷新续约时间
//既然服务端已经收到了客户的请求,本身也相当于一次续约心跳
lease.renew();
//获取持有者 instanceInfo
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null)
logger.error("Found Lease without a holder for instance id ", id);
if ((info != null) && !(info.getStatus().equals(newStatus)))
// 状态不一样才进行更新
// Mark service as UP if needed
if (InstanceStatus.UP.equals(newStatus))
// 状态是UP,如果是第一次启动,则记录一下启动的时间戳
lease.serviceUp();
// This is NAC overriden status
// 保存到一个维护覆盖状态的map,key是instanceId
overriddenInstanceStatusMap.put(id, newStatus);
// Set it for transfer of overridden status to replica on
// replica start up 设置它以便在副本启动时将覆盖状态转移到副本
// 为覆盖状态赋值
info.setOverriddenStatus(newStatus);
//复制时间戳
long replicaDirtyTimestamp = 0;
//直接修改status状态,而不标记dirty
info.setStatusWithoutDirty(newStatus);
//如果客户端传过来的lastDirtyTimestamp不为空(客户端的最新修改时间)
if (lastDirtyTimestamp != null)
//赋值给replicaDirtyTimestamp
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
// If the replications dirty timestamp is more than the existing one, just update
// it to the replicas.
// 比较一下客户端传过来的最新修改时间是不是比我服务端记录的还新
// 保存更新的
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp())
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
//设置操作类型为修改(增量更新时看到过)
info.setActionType(ActionType.MODIFIED);
//将其加入到最近更新队列!!!这是一个线程安全的先进先出的队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//更新服务端的最新修改时间
//lastUpdatedTimestamp时间戳,这个时间戳是服务端专门用的
info.setLastUpdatedTimestamp();
//让一些缓存失效,不看了
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
return true;
finally
//最后锁释放
read.unlock();
状态更新操作同步给集群中其他Server
replicateInstanceActionsToPeers()
//PeerAwareInstanceRegistryImpl.java
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
* 将所有实例更改复制到对等eureka节点,但复制到该节点的流量除外。
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node)
//此时action = Action.StatusUpdate
try
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action)
case Cancel:
//下架
node.cancel(appName, id);
break;
case Heartbeat:
//心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
//注册
node.register(info);
break;
case StatusUpdate:
//状态更新
//从注册表获取instanceInfo信息,之前跟过
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
//给节点同步状态更新操作
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
//状态删除
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
catch (Throwable t)
logger.error("Cannot replicate information to for action ", node.getServiceUrl(), action.name(), t);
节点同步状态更新操作
/**
* Send the status update of the instance.
*/
public void statusUpdate(final String appName, final String id,
final InstanceStatus newStatus, final InstanceInfo info)
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
//batchingDispatcher执行器,底层是将任务放入队列,有专门的后台线程循环从队列取任务执行
batchingDispatcher.process(
//参数一:任务ID
taskId("statusUpdate", appName, id),
//参数二:真正要处理的任务
new InstanceReplicationTask(targetHost, Action.StatusUpdate, info, null, false)
@Override
public EurekaHttpResponse<Void> execute()
//直接看statusUpdate方法
return replicationClient.statusUpdate(appName, id, newStatus, info);
,
//参数三:到期时间
expiryTime
);
以上是关于SpringCloud源码分析 (Eureka-Server-入口分析和处理Client状态请求)的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud gateway RequestRateLimiter 源码串联分析
《微服务专题》SpringCloud-Eureka源码分析解读
SpringCloud----Ribbon服务调用,源码分析