Eureka 客户端已经成功下线,为啥还会有请求流入?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eureka 客户端已经成功下线,为啥还会有请求流入?相关的知识,希望对你有一定的参考价值。

参考技术A 在微服务内部互相调用时,你是否遇到过,服务明明已经成功下线,在 Eureka UI 界面也看到服务注销了,但还是会有请求流入到下线的服务,短时间内出现服务调用失败。

通过下面这张图来解释下为什么会出现上述问题。

为了保证高可用和高性能,Eureka Server 中设计了三级缓存。

在 Eureka UI 页面看到的信息避开了响应缓存 readOnlyCacheMap,直接从 registry 对象获取的,所以我们能够在 Eureka UI 页面实时的看到注册的新服务。

但是 Eureka Client 拉取的数据是从响应缓存 readOnlyCacheMap 中获取到的,这个数据不是实时拉取的,而是默认每 30 秒更新一次,所以就会出现 UI 页面看到服务已经注册好了,但是调用时却出现没有有效服务的错误。

服务下线也存在同样的问题,服务已经下线了,但是还是有客户端在调用已经下线的服务,这时就会出现连接拒绝的错误。

Spring Cloud 通过负载均衡器 Ribbon 从 Eureka Client 中获取被调用服务实例的信息,然后通过获取到的实例来调用对应的服务,Ribbon 从 Eureka Client 中获取到的服务列表也不是实时的,默认 30s 更新一次。

我们来计算下一个服务成功下线后,极端情况下多久才能被客户端感知到。

极端情况,在服务下线后的 90s 内,流入的请求都会调用失败。这是在服务 graceful shutdown 的前提下,如果服务异常终止或者被 kill -9 强制杀死,这个时间会更长。

在非 graceful shutdown 情况下,客户端不会调用 Eureka API 来更新 registry 注册列表,而是只能等 Eureka Server 的 evict 线程定时清理无效节点,这个周期默认是 60s,客户端默认的续约超时时间是 90s。

续约周期是 30s,在连续 3 次丢失心跳后会被 Eureka Server 的 evict 线程清理,也就是说服务下线后,可能需要延迟 180s 之后,Eureka Server 中的 registry 对象才会被更新。

总的加起来,在非 graceful shutdown 情况下,Ribbon 中的缓存需要 4 分钟左右才会感知到下线的服务,这个情况在生产环境将是非常严重的。我们可以通过参数的调优来缩短这个时间:

下面是服务端和客户端参数的默认值。

Eureka Server 维护了一个最近注销实例的 CircularQueue 循环队列:recentCanceledQueue,和最近注册实例的 CircularQueue 循环队列:recentRegisteredQueue,容量均为 1000,可以在 UI 界面展示:

~ END ~。

Eureka服务下线源码解析

我们知道,在Eureka中,可以使用如下方法使Eureka主动下线,那么本篇文章就来分析一下子这个下线的流程

public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            cancelScheduledTasks();

            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }

            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }

            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            logger.info("Completed shut down of DiscoveryClient");
        }
    }

主要做了这么几件事:

  1. 解除状态监听器
  2. 取消心跳、刷新线程
private void cancelScheduledTasks() {
        if (instanceInfoReplicator != null) {
            instanceInfoReplicator.stop();
        }
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdownNow();
        }
        if (cacheRefreshExecutor != null) {
            cacheRefreshExecutor.shutdownNow();
        }
        if (scheduler != null) {
            scheduler.shutdownNow();
        }
    }
  1. 向服务端发起下线通知
 void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }

    @Override
    public EurekaHttpResponse<Void> cancel(String appName, String id) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder.delete(ClientResponse.class);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
  1. 停止各个监听器
服务端接受下线消息

下线消息的处理在InstanceResource类中

@DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));

            if (isSuccess) {
                logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                return Response.ok().build();
            } else {
                logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }
 public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
        if (super.cancel(appName, id, isReplication)) {
          //往集群同步下线信息
            replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            return true;
        }
        return false;
    }

先看具体的下线逻辑,与租约过期清除的处理逻辑是一致的

   protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
              //删除租约信息
                leaseToCancel = gMap.remove(id);
            }
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
          //删除客户端状态信息
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

其中invalidateCache则是删除当前服务中与该实例相关的缓存

集群的同步下线信息则跟集群信息注册的逻辑差不多

原文地址

以上是关于Eureka 客户端已经成功下线,为啥还会有请求流入?的主要内容,如果未能解决你的问题,请参考以下文章

Eureka总结笔记

EUREKA 删除 or 强制下线/上线 实例

Eureka服务下线后快速感知配置

.net framework已经是4.0了,网站为啥还会有iis短文件名泄漏漏洞

Eureka 参数调优

Eureka剔除和恢复服务