SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)
Posted 洛神灬殇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)相关的知识,希望对你有一定的参考价值。
承接上文的对应的Eureka的上篇介绍,我们开始介绍,详见 [【SpringCloud技术专题】「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(上)]
原理回顾
- Eureka Server 提供服务注册服务,各个节点启动后,会在Eureka Server中进行注册,这样Eureka Server中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。
- Eureka Client 是一个Java 客户端,用于简化与Eureka Server的交互,客户端同时也具备一个内置的、使用轮询负载算法的负载均衡器。
- 在应用启动后,将会向Eureka Server发送心跳(默认周期为30秒),如果Eureka Server在多个心跳周期(默认3个心跳周期=90秒)没有收到某个节点的心跳,Eureka Server将会从服务注册表中把这个服务节点移除。
- 高可用情况下的:Eureka Server之间将会通过复制的方式完成数据的同步;
- Eureka Client具有缓存的机制,即使所有的Eureka Server 都挂掉的话,客户端依然可以利用缓存中的信息消费其它服务的API;
EurekaServer 启动流程分析
EurekaServer 处理服务注册、集群数据复制
EurekaClient 是如何注册到 EurekaServer 的?
刚才在org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每个方法都打了一个断点,而且现在EurekaServer已经处于Debug运行状态,那么我们就随便找一个被 @EnableEurekaClient 的微服务启动试试微服务来试试吧,直接Run。
- 当启动后,就一定会调用注册register方法,那么就接着往下看,拭目以待;
实例注册方法机制
InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法进断点了。
- InstanceRegistry.register顺着堆栈信息往上看,是 ApplicationResource.addInstance 方法被调用了,分析addInstance;
ApplicationResource 类
主要是处理接收 Http 的服务请求。
@POST
@Consumes("application/json", "application/xml")
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication)
logger.debug("Registering instance (replication=)", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId()))
return Response.status(400).entity("Missing instanceId").build();
else if (isBlank(info.getHostName()))
return Response.status(400).entity("Missing hostname").build();
else if (isBlank(info.getAppName()))
return Response.status(400).entity("Missing appName").build();
else if (!appName.equals(info.getAppName()))
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
else if (info.getDataCenterInfo() == null)
return Response.status(400).entity("Missing dataCenterInfo").build();
else if (info.getDataCenterInfo().getName() == null)
return Response.status(400).entity("Missing dataCenterInfo Name").build();
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier)
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId))
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental)
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
else if (dataCenterInfo instanceof AmazonInfo)
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null)
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
else
logger.warn("Registering DataCenterInfo of type without an appropriate id", dataCenterInfo.getClass());
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
-
这里的写法貌似看起来和我们之前 Controller 的 RESTFUL 写法有点不一样,仔细一看,原来是Jersey RESTful 框架,是一个产品级的RESTful service 和 client 框架。与Struts类似,它同样可以和hibernate,spring框架整合。
-
看到 registry.register(info, “true”.equals(isReplication)); 注册啊,原来EurekaClient客户端启动后会调用会通过Http(s)请求,直接调到ApplicationResource.addInstance 方法,只要是和注册有关的,都会调用这个方法。
-
接着我们深入 registry.register(info, “true”.equals(isReplication)) 查看;
@Override
public void register(final InstanceInfo info, final boolean isReplication)
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
- handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication)
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
- 然后通过 ApplicationContext 发布了一个事件 EurekaInstanceRegisteredEvent 服务注册事件,可以给 EurekaInstanceRegisteredEvent 添加监听事件,那么用户就可以在此刻实现自己想要的一些业务逻辑。
- 然后我们再来看看 super.register(info, isReplication) 方法,该方法是 InstanceRegistry 的父类 PeerAwareInstanceRegistryImpl 的方法。
服务户厕机制
进入PeerAwareInstanceRegistryImpl 类的 register(final InstanceInfo info, final boolean isReplication) 方法;
@Override
public void register(final InstanceInfo info, final boolean isReplication)
// 注释:续约时间,默认时间是常量值 90 秒
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 注释:续约时间,当然也可以从配置文件中取出来,所以说续约时间值也是可以让我们自己自定义配置的
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0)
leaseDuration = info.getLeaseInfo().getDurationInSecs();
// 注释:将注册方的信息写入 EurekaServer 的注册表,父类为 AbstractInstanceRegistry
super.register(info, leaseDuration, isReplication);
// 注释:EurekaServer 节点之间的数据同步,复制到其他Peer
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
进入super.register(info, leaseDuration, isReplication),如何写入EurekaServer 的注册表的,进入AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication)
try
read.lock();
// 注释:registry 这个变量,就是我们所谓的注册表,注册表是保存在内存中的;
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null)
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null)
gMap = gNewMap;
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null))
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing=, provided=", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp)
logger.warn("There is an existing lease and the existing lease's dirty timestamp is greater" +
" than the one that is being registered ", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
else
// The lease does not exist and hence it is a new registration
synchronized (lock)
if (this.expectedNumberOfRenewsPerMin > 0)
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.debug("No previous lease information found; it is new registration");
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null)
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue)
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus()))
logger.debug("Found overridden status for instance . Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId()))
logger.info("Not found overridden id and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null)
logger.info("Storing overridden status from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus()))
lease.serviceUp();
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance / with status (replication=)",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
finally
read.unlock();
- 发现这个方法有点长,大致阅读,主要更新了注册表的时间之外,还更新了缓存等其它东西,大家有兴趣的可以深究阅读该方法;
集群之间的复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication)
Stopwatch tracer = action.getTimer().start();
try
if (isReplication)
numberOfReplicationsLastMin.increment();
// If it is a replication already, do not replicate again as this will create a poison replication
// 注释:如果已经复制过,就不再复制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication)
return;
// 遍历Eureka Server集群中的所有节点,进行复制操作
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes())
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl()))
continue;
// 没有复制过,遍历Eureka Server集群中的node节点,依次操作,包括取消、注册、心跳、状态更新等。
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
finally
tracer.stop();
-
每当有注册请求,首先更新 EurekaServer 的注册表,然后再将信息同步到其它EurekaServer的节点上去;
-
接下来我们看看 node 节点是如何进行复制操作的,进入 replicateInstanceActionsToPeers 方法。
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node)
try
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action)
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case 以上是关于SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)的主要内容,如果未能解决你的问题,请参考以下文章
《微服务专题》SpringCloud-Eureka源码分析解读
SpringCloud-技术专区-从源码层面让你认识Eureka工作流程和运作机制(下)