Eureka 客户端已经成功下线,为啥还会有请求流入?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eureka 客户端已经成功下线,为啥还会有请求流入?相关的知识,希望对你有一定的参考价值。
参考技术A 在微服务内部互相调用时,你是否遇到过,服务明明已经成功下线,在 Eureka UI 界面也看到服务注销了,但还是会有请求流入到下线的服务,短时间内出现服务调用失败。通过下面这张图来解释下为什么会出现上述问题。
为了保证高可用和高性能,Eureka Server 中设计了三级缓存。
在 Eureka UI 页面看到的信息避开了响应缓存 readOnlyCacheMap,直接从 registry 对象获取的,所以我们能够在 Eureka UI 页面实时的看到注册的新服务。
但是 Eureka Client 拉取的数据是从响应缓存 readOnlyCacheMap 中获取到的,这个数据不是实时拉取的,而是默认每 30 秒更新一次,所以就会出现 UI 页面看到服务已经注册好了,但是调用时却出现没有有效服务的错误。
服务下线也存在同样的问题,服务已经下线了,但是还是有客户端在调用已经下线的服务,这时就会出现连接拒绝的错误。
Spring Cloud 通过负载均衡器 Ribbon 从 Eureka Client 中获取被调用服务实例的信息,然后通过获取到的实例来调用对应的服务,Ribbon 从 Eureka Client 中获取到的服务列表也不是实时的,默认 30s 更新一次。
我们来计算下一个服务成功下线后,极端情况下多久才能被客户端感知到。
极端情况,在服务下线后的 90s 内,流入的请求都会调用失败。这是在服务 graceful shutdown 的前提下,如果服务异常终止或者被 kill -9 强制杀死,这个时间会更长。
在非 graceful shutdown 情况下,客户端不会调用 Eureka API 来更新 registry 注册列表,而是只能等 Eureka Server 的 evict 线程定时清理无效节点,这个周期默认是 60s,客户端默认的续约超时时间是 90s。
续约周期是 30s,在连续 3 次丢失心跳后会被 Eureka Server 的 evict 线程清理,也就是说服务下线后,可能需要延迟 180s 之后,Eureka Server 中的 registry 对象才会被更新。
总的加起来,在非 graceful shutdown 情况下,Ribbon 中的缓存需要 4 分钟左右才会感知到下线的服务,这个情况在生产环境将是非常严重的。我们可以通过参数的调优来缩短这个时间:
下面是服务端和客户端参数的默认值。
Eureka Server 维护了一个最近注销实例的 CircularQueue 循环队列:recentCanceledQueue,和最近注册实例的 CircularQueue 循环队列:recentRegisteredQueue,容量均为 1000,可以在 UI 界面展示:
~ END ~。
Eureka服务下线源码解析
我们知道,在Eureka中,可以使用如下方法使Eureka主动下线,那么本篇文章就来分析一下子这个下线的流程
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
主要做了这么几件事:
- 解除状态监听器
- 取消心跳、刷新线程
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
- 向服务端发起下线通知
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
- 停止各个监听器
服务端接受下线消息
下线消息的处理在InstanceResource
类中
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
//往集群同步下线信息
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
先看具体的下线逻辑,与租约过期清除的处理逻辑是一致的
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
//删除租约信息
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
//删除客户端状态信息
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
其中invalidateCache
则是删除当前服务中与该实例相关的缓存
集群的同步下线信息则跟集群信息注册的逻辑差不多
以上是关于Eureka 客户端已经成功下线,为啥还会有请求流入?的主要内容,如果未能解决你的问题,请参考以下文章