深入 Eureka 服务注册 源码分析
Posted sharedCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入 Eureka 服务注册 源码分析相关的知识,希望对你有一定的参考价值。
Eureka-Client注册服务
啥时候会注册
在两种情况下客户端会主动向服务端发送自己的注册信息
1.当客户端的instance信息发生改变时,Eureka-Client和Server端信息不一致时
2.当客户端刚刚启动的时候。
定时器注册
com.netflix.discovery.DiscoveryClient ,使用的@Inject //google guice 注入遵循 JSR-330规范
private void initScheduledTasks() {
//省略, 刷新缓存的定时器
if (clientConfig.shouldRegisterWithEureka()) {
//省略, 发送心跳的定时器
// 监听instance的状态变更
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 注册监听
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启实例信息复制器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
initScheduledTasks() 方法是在DiscoverClient的构造函数初始化的时候被调用。
主要作用就是:
1.开启缓刷新定时器
2.开启发送心跳的定时器
3.开启实例instance状态变更监听
4.开启应用状态复制器(主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册)
//InstanceInfoReplicator.java
class InstanceInfoReplicator implements Runnable {
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
// 首次进来设置一下。
instanceInfo.setIsDirty(); // for initial register
// 开启定时线程 , 每停顿initialDelayMs秒执行一次该任务。 服务注册也是由这个任务完成
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
// show down
public void stop() {
scheduler.shutdownNow();
started.set(false);
}
// 这个方法主要是在上面提到的监听器里面被调用。
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
// 这个地方用来获取定时线程的执行Future,如果该线程还没有执行完毕,则取消掉,释放资源,因为下面也会执行run方法
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
public void run() {
try {
// 刷新实例信息。
discoveryClient.refreshInstanceInfo();
// 判断实例信息是否不一致
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 注册自己的服务
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
从上面可以看到InstanceInfoReplicator是一个负责服务注册的线程任务, 有两个地方可以执行这个任务
1.定时线程,每40秒执行一次。
2.当instance的状态发生变更(除去DOWN这个状态)的时候,会有statusChangeListener 这个监听器监听到
去执行服务注册。
PS: 现在网上有些文章,说服务启动后,要隔40秒才会去注册自己的服务,这个说法是错误的。 当应用刚刚启动的时候,注册服务不是依赖那个定时线程去跑的,而是在EurekaAutoServiceRegistration这个类里面
自动注册
在EurekaClientAutoConfiguration这个自动配置类里面,有下面这一段代码
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
上面这段代码,很简单,就是实例化了一个Bean,主要是这个Bean实现了SmartLifecycle, 有这个接口标识的在spring 容器加载完所有的Bean之后会执行该类的start方法, 下面可以详细看看这个代码。
//EurekaAutoServiceRegistration.java
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);
private AtomicBoolean running = new AtomicBoolean(false);
private int order = 0;
private AtomicInteger port = new AtomicInteger(0);
private ApplicationContext context;
private EurekaServiceRegistry serviceRegistry;
private EurekaRegistration registration;
public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry, EurekaRegistration registration) {
this.context = context;
this.serviceRegistry = serviceRegistry;
this.registration = registration;
}
@Override
public void start() {
// 端口配置
if (this.port.get() != 0 && this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
// 没有在运行
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// 重点就这这里,这里是主动去注册。
this.serviceRegistry.register(this.registration);
// 发布 节点注册事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
// EurekaServiceRegistry.java
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getInstanceConfig().getAppname()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
// 设置当前实例的instanceStatus, 一旦这个实例的状态发生改变,
// 只要状态不是DOWN,那么就会被监听器监听到,最终执行服务注册
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 健康检查器不为空
if (reg.getHealthCheckHandler() != null) {
// 设置进去
reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
}
}
总结: 服务注册分为两种,
第一种: 当应用启动的时候,如果应用开启了自动注册(默认开启), 那么在自动配置类加载的时候,会通过EurekaAutoServiceRegistration实例化的时候,去改变instance的status, 最终被监听器监听到,执行服务注册的代码
第二种: 主要应用于启动之后,当应用的信息发生改变之后,每40每秒执行一次的线程,检测到了,也会自动去注册一次。
DiscoveryClient.register()
//DiscoveryClient.java
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
// 发起HTTP请求
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
使用的Jersey框架来完成http的请求调用
//AbstractJerseyEurekaHttpClient.java
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册。
Eureka-Server接收注册
ApplicationResource
接下来可以看一下,服务端是接收到请求之后是如何处理的。
程序入口: com.netflix.eureka.resources.ApplicationResource.addInstance()
//ApplicationResource.java
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 参数校验,不符合验证规则的,返回400状态码,此处不做详解
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 重点在这里
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
PeerAwareInstanceRegistryImpl
上面的register方法,最终调用的是PeerAwareInstanceRegistryImpl的方法
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 租约的过期时间,默认90秒,也就是说当服务端超过90秒没有收到客户端的心跳,则主动剔除该节点。
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
// 如果客户端自定义了,那么以客户端为准
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 节点注册
super.register(info, leaseDuration, isReplication);
// 复制到同等服务节点上去
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
由上面可以知道,调用的是父类的register方法, 其父类是AbstractInstanceRegistry ,在了解具体的注册方法之前,
需要先了解一下Lease这个对象,因为Eureka-Server最终处理注册信息的时候,都会转化为这个对象来处理。
Lease
public class Lease<T> {
enum Action {
Register, Cancel, Renew
};
public static final int DEFAULT_DURATION_IN_SECS = 90;
private T holder;
private long evictionTimestamp;
private long registrationTimestamp;
private long serviceUpTimestamp;
private volatile long lastUpdateTimestamp;
private long duration;
public Lease(T r, int durationInSecs) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
//durationInSecs为秒单位, 换算成毫秒
duration = (durationInSecs * 1000);
}
// 客户端续约时,更新最后的更新时间 , 用当前系统加上过期的时间
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
// 服务下线时,更新服务下线时间
public void cancel() {
if (evictionTimestamp <= 0) {
evictionTimestamp = System.currentTimeMillis();
}
}
public void serviceUp() {
if (serviceUpTimestamp == 0) {
serviceUpTimestamp = System.currentTimeMillis();
}
}
public void setServiceUpTimestamp(long serviceUpTimestamp) {
this.serviceUpTimestamp = serviceUpTimestamp;
}
public boolean isExpired() {
return isExpired(0l);
}
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
}
DEFAULT_DURATION_IN_SECS : 租约过期的时间常量,默认未90秒,也就说90秒没有心跳过来,那么这边将会自动剔除该节点holder :这个租约是属于谁的, 目前占用这个属性的是 instanceInfo,也就是客户端实例信息。evictionTimestamp : 租约是啥时候过期的,当服务下线的时候,会过来更新这个时间戳registrationTimestamp : 租约的注册时间
serviceUpTimestamp :服务启动时间 ,当客户端在注册的时候,instanceInfo的status 为UP的时候,则更新这个时间戳
public void serviceUp() {
if (serviceUpTimestamp == 0) {
serviceUpTimestamp = System.currentTimeMillis();
}
}
lastUpdateTimestamp :最后更新时间,每次续约的时候,都会更新这个时间戳,在判断实例
是否过期时,需要用到这个属性。
duration:过期时间,毫秒单位
AbstractInstanceRegistry
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 上只读锁
read.lock();
// 从本地MAP里面获取当前实例的信息。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 增加注册次数到监控信息里面去。
REGISTER.increment(isReplication);
if (gMap == null) {
// 如果第一次进来,那么gMap为空,则创建一个ConcurrentHashMap放入到registry里面去
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// putIfAbsent方法主要是在向ConcurrentHashMap中添加键—值对的时候,它会先判断该键值对是否已经存在。
// 如果不存在(新的entry),那么会向map中添加该键值对,并返回null。
// 如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
// 表明map中确实不存在,则设置gMap为最新创建的那个
gMap = gNewMap;
}
}
// 从MAP中查询已经存在的Lease信息 (比如第二次来)
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 当Lease的对象不为空时。
if (existingLease != null && (existingLease.getHolder() != null)) {
// 当instance已经存在是,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // server
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // client
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 这里只有当existinglease不存在时,才会进来。 像那种恢复心跳,信息过期的,都不会进入这里。
// Eureka-Server的自我保护机制做的操作,为每分钟最大续约数+2 ,同时重新计算每分钟最小续约数
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 构建一个最新的Lease信息
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 当原来存在Lease的信息时,设置他的serviceUpTimestamp, 保证服务开启的时间一直是第一次的那个
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 放入本地Map中
gMap.put(registrant.getId(), lease);
// 添加到最近的注册队列里面去,以时间戳作为Key, 名称作为value,主要是为了运维界面的统计数据。
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 分析instanceStatus
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 得到instanceStatus,判断是否是UP状态,
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 设置注册类型为添加
registrant.setActionType(ActionType.ADDED);
// 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取、
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清理缓存 ,传入的参数为key
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
以上是关于深入 Eureka 服务注册 源码分析的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(上)