Eureka中RetryableClientQuarantineRefreshPercentage参数探秘

Posted 程序猿DD

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eureka中RetryableClientQuarantineRefreshPercentage参数探秘相关的知识,希望对你有一定的参考价值。

原文:http://www.spring4all.com/article/180

前言

我们知道Eureka分为两部分,Eureka Server和Eureka Client。Eureka Server充当注册中心的角色,Eureka Client相对于Eureka Server来说是客户端,需要将自身信息注册到注册中心。本文主要介绍的就是在Eureka Client注册到Eureka Server时 RetryableClientQuarantineRefreshPercentage参数的使用技巧。

Eureka Client注册过程分析

 
   
   
 
  1. eureka.client.service-url.defaultZone=

  2. http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka

  • Eureka Clinent注册机制是怎样的? 源码面前一目了然,带着这两个问题我们通过源码来解答这两个问题。Eureka Client在启动的时候注册源码如下: RetryableEurekaHttpClient中的 execut方法

 
   
   
 
  1.   @Override

  2.    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {

  3.        List<EurekaEndpoint> candidateHosts = null;

  4.        int endpointIdx = 0;

  5.        for (int retry = 0; retry < numberOfRetries; retry++) {

  6.            EurekaHttpClient currentHttpClient = delegate.get();

  7.            EurekaEndpoint currentEndpoint = null;

  8.            if (currentHttpClient == null) {

  9.                if (candidateHosts == null) {

  10.                    candidateHosts = getHostCandidates();

  11.                    if (candidateHosts.isEmpty()) {

  12.                        throw new TransportException("There is no known eureka server; cluster server list is empty");

  13.                    }

  14.                }

  15.                if (endpointIdx >= candidateHosts.size()) {

  16.                    throw new TransportException("Cannot execute request on any known server");

  17.                }

  18.                currentEndpoint = candidateHosts.get(endpointIdx++);

  19.                currentHttpClient = clientFactory.newClient(currentEndpoint);

  20.            }

  21.            try {

  22.                EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);

  23.                if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {

  24.                    delegate.set(currentHttpClient);

  25.                    if (retry > 0) {

  26.                        logger.info("Request execution succeeded on retry #{}", retry);

  27.                    }

  28.                    return response;

  29.                }

  30.                logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());

  31.            } catch (Exception e) {

  32.                logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace

  33.            }

  34.            // Connection error or 5xx from the server that must be retried on another server

  35.            delegate.compareAndSet(currentHttpClient, null);

  36.            if (currentEndpoint != null) {

  37.                quarantineSet.add(currentEndpoint);

  38.            }

  39.        }

  40.        throw new TransportException("Retry limit reached; giving up on completing the request");

  41.    }

按照我的理解,代码精简后内容如下:

 
   
   
 
  1. int endpointIdx = 0;

  2. //用来保存所有Eureka Server信息(8761、8762、8763、8764)

  3. List<EurekaEndpoint> candidateHosts = null;

  4. //numberOfRetries的值代码写死默认为3次

  5. for (int retry = 0; retry < numberOfRetries; retry++) {

  6.    /**

  7.     *首次进入循环时,获取全量的Eureka Server信息(8761、8762、8763、8764)

  8.     */

  9.    if (candidateHosts == null) {

  10.        candidateHosts = getHostCandidates();

  11.    }

  12.    /**

  13.     *通过endpointIdx自增,依次获取Eureka Server信息,然后发送

  14.     *注册的Post请求.

  15.     */

  16.    currentEndpoint = candidateHosts.get(endpointIdx++);

  17.    currentHttpClient = clientFactory.newClient(currentEndpoint);

  18.    try {

  19.       /**

  20.        *发送注册的Post请求动作,注意如果成功,则跳出循环,如果失败则

  21.        *根据endpointIdx依次获取下一个Eureka Server.

  22.        */

  23.        response = requestExecutor.execute(currentHttpClient);

  24.        return respones;

  25.    } catch (Exception e) {

  26.        //向注册中心(Eureka Server)发起注册的post出现异常时,打印日志...

  27.    }

  28.    //如果此次注册动作失败,将当前的信息保存到quarantineSet中(一个Set集合)

  29.    if (currentEndpoint != null) {

  30.        quarantineSet.add(currentEndpoint);

  31.    }

  32. }

  33. //如果都失败,则以异常形式抛出...

  34. throw new TransportException("Retry limit reached; giving up on completing the request");

上面代码中还有一个方法很重要就是 List<EurekaEndpoint>candidateHosts=getHostCandidates();接下来看下 getHostCandidates()方法源码

 
   
   
 
  1.    private List<EurekaEndpoint> getHostCandidates() {

  2.        List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();

  3.        quarantineSet.retainAll(candidateHosts);

  4.        // If enough hosts are bad, we have no choice but start over again

  5.        int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());

  6.        if (quarantineSet.isEmpty()) {

  7.            // no-op

  8.        } else if (quarantineSet.size() >= threshold) {

  9.            logger.debug("Clearing quarantined list of size {}", quarantineSet.size());

  10.            quarantineSet.clear();

  11.        } else {

  12.            List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());

  13.            for (EurekaEndpoint endpoint : candidateHosts) {

  14.                if (!quarantineSet.contains(endpoint)) {

  15.                    remainingHosts.add(endpoint);

  16.                }

  17.            }

  18.            candidateHosts = remainingHosts;

  19.        }

  20.        return candidateHosts;

  21.    }

按照我的理解,将代码精简下,只包括关键逻辑,内容如下:

 
   
   
 
  1. private List<EurekaEndpoint> getHostCandidates() {

  2.    /**

  3.     * 获取所有defaultZone配置的注册中心信息(Eureka Server),

  4.     * 在本文例子中代表4个(8761、8762、8763、8764)Eureka Server

  5.     */

  6.    List candidateHosts = clusterResolver.getClusterEndpoints();

  7.    /**

  8.     * quarantineSet这个Set集合中保存的是不可用的Eureka Server

  9.     * 此处是拿不可用的Eureka Server与全量的Eureka Server取交集

  10.     */

  11.    quarantineSet.retainAll(candidateHosts);

  12.    /**

  13.     * 根据RetryableClientQuarantineRefreshPercentage参数计算阈值

  14.     * 该阈值后续会和quarantineSet中保存的不可用的Eureka Server个数

  15.     * 作比较,从而判断是否返回全量的Eureka Server还是过滤掉不可用的

  16.     * Eureka Server。

  17.     */

  18.    int threshold =

  19.       (int) (

  20.        candidateHosts.size()

  21.              *

  22.        transportConfig.getRetryableClientQuarantineRefreshPercentage()

  23.        );

  24.    if (quarantineSet.isEmpty()) {

  25.        /**

  26.         * 首次进入的时候,此时quarantineSet为空,直接返回全量的

  27.         * Eureka Server列表

  28.         */

  29.    } else if (quarantineSet.size() >= threshold) {

  30.        /**

  31.         * 将不可用的Eureka Server与threshold值相比较,如果不可

  32.         * 用的Eureka Server个数大于阈值,则将之间保存的Eureka

  33.         * Server内容直接清空,并返回全量的Eureka Server列表。

  34.         */

  35.        quarantineSet.clear();

  36.    } else {

  37.        /**

  38.         * 通过quarantineSet集合保存不可用的Eureka Server来过滤

  39.         * 全量的EurekaServer,从而获取此次Eureka Client要注册要

  40.         */

  41.        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());

  42.        for (EurekaEndpoint endpoint : candidateHosts) {

  43.            if (!quarantineSet.contains(endpoint)) {

  44.                remainingHosts.add(endpoint);

  45.            }

  46.        }

  47.        candidateHosts = remainingHosts;

  48.    }

  49.    return candidateHosts;

  50. }

通过源码分析,我们现在初步知道,当Eureka Client向Eureka Server发起注册请求的时候(根据defaultZone寻找Eureka Server列表),如果有一次请求注册成功,那么后续就不会在向其他Eureka Server发起注册请求。以本文为例,注册中心有四个(8761、8762、8763、8764)。如果8761对应的Eureka Server服务的状态是UP,那么Eureka Client向该注册中心注册成功后,不会再向(8762、8763、8764)对应的Eureka Server发起注册请求(对应程序是在for循环中直接return respones)。

说到这里又引出来另外一个问题,如果8761这个Eureka Server是down掉的呢? 根据源码我们可知Eureka Client首次会向8761这个Server发起注册请求,如果该Server的状态是down,那么它会将该Server保存到quarantineSet这个Set集合中,然后再次访问8762这个Eureka Server,如果8762这个Server的状态依旧是down,它也会把这个Server保存到quarantineSet这个Set集合中,然后继续访问8763这个Server,如果8763这个Server的状态依旧是down,此时除了会将其保存到quarantineSet这个Set集合中之外,还会跳出本次循环。从而结束此次注册过程。

说道这里有人要问接下来会不会向8764这个Server发起注册,答案是否定的,因为循环的次数默认是3次。所以即使8764这个Server的状态是UP,它也不会接收到来自Eureka Client发起的注册信息。

Eureka Client向Eureka Server发起注册信息的过程除了在Eureka Client启动的时候触发,还有另外一种方式,就是后台定时任务。 假设我们上面描述的场景是在Eureka Client启动的时候,因为在启动的时候注册这个过程全部失败了,当后台定时任务执行时,还会进入该注册流程。注意此时quarantineSet的值为3(8761、8762、8763之前注册失败的Eureka Server)。 所以当程序再次进入 getHostCandidates()方法时, if(quarantineSet.isEmpty())这个方法是不满足的,接下来会走 elseif(quarantineSet.size()>=threshold)这个判断,如果这个判断成立,那么会将quarantineSet集合清空,同时返回全量的Eureka Server列表,如果这个判断不成立,会拿quarantineSet集合中保存的内容去过滤Eureka Server的全量列表。以本文为例:

  • quarantineSet中保存的是(8761、8762、8763)三个Eureka Server

  • Eureka Server全量列表的内容是(8761、8762、8763、8764)四个Eureka Server 过滤后返回的结果为8764这个Eureka Server。

在本文的例子中8761、8762、8763这三个Eureka Server的状态是down而8764这个Eureka Server的状态是UP,我们其实是想走到最后的else分支,从而完成过滤操作,并最终得到8764这个Server,遗憾的是它并不会走到这个分支,而是被上面的 elseif(quarantineSet.size()>=threshold)这个分支所拦截,返回的依旧是全量的Eureka Server列表。这样造成的后果就是Eureka Client依旧会依次向(8761、8762、8763)这三个down的Eureka Server发起注册请求。 那么问题的关键在哪里呢?问题的关键就是threshold这个值的由来,因为此时quarantineSet.size()的值为3,而3这个值大于threshold,从而导致,会将quarantineSet集合清空,返回全量的Server列表。 我们知道threshold这个值是根据全量的Eureka Server列表乘以一个可配置的参数计算出来的,在本文的例子当中,我的properties文件中除了defaultZone之外并没有配置这个参数,那么也就是说这个参数是有默认值的,通过源码我们了解到,这个默认值是0.66。具体源码如下:

 
   
   
 
  1. final class PropertyBasedTransportConfigConstants {

  2.    /**

  3.     *省略部分源码

  4.     */

  5.    static class Values {

  6.        static final int SESSION_RECONNECT_INTERVAL = 20*60;

  7.        //默认值为0.66

  8.        static final double QUARANTINE_REFRESH_PERCENTAGE = 0.66;

  9.        static final int DATA_STALENESS_TRHESHOLD = 5*60;

  10.        static final int ASYNC_RESOLVER_REFRESH_INTERVAL = 5*60*1000;

  11.        static final int ASYNC_RESOLVER_WARMUP_TIMEOUT = 5000;

  12.        static final int ASYNC_EXECUTOR_THREADPOOL_SIZE = 5;

  13.    }

  14. }

 
   
   
 
  1. /**

  2. *@return the percentage of the full endpoints set above which the  

  3. *quarantine set is cleared in the range [0, 1.0]

  4. */

  5. double getRetryableClientQuarantineRefreshPercentage();

看到这里就不难理解了,因为这个值是0.66而此时全量的Eureka Server值为4。计算之后的值为2,而由于注册的for循环为3次,所以当第二次发起注册流程的时候quarantineSet的值始终大于threshold。这样就会导致一个问题,就是如果8761、8762、8763一直是down即使8764一直是好的,那么Eureka Client也不会注册成功。而且这个参数值的区间为0到1.

既然通过源码分析我们找到了问题根源,其实对应的我们也找到了解决这个问题的办法,就是对应把这个参数值调大些。 这个值在properties中对应的写法如下:

 
   
   
 
  1. eureka.client.transport.retryableClientQuarantineRefreshPercentage = xxx

接下来我们修改下properties文件,修改后的内容如下:

 
   
   
 
  1. eureka.client.service-url.defaultZone=

  2. http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka

  3. eureka.client.transport.retryableClientQuarantineRefreshPercentage=1

接下来按照这个配置再次回顾下上面的流程:

  • Eureka Client启动时进行注册(8761、8762、8763的状态是down),所以此时quarantineSet的值为3.

  • 接下来在定时任务中又触发注册事件,此时因为参数的值从0.66调整为1。所以计算出的threshold的值为4。而此时quarantineSet的值为3。所以不会进入到 elseif(quarantineSet.size()>=threshold)分支,而是会进入最后的esle分支。

  • 在else分支中会完成过滤功能,最终返回的list中的结果只有一个就是8764这个Eureka Server。

  • Eureka Client向8764这个Eureka Server发起注册请求,得到成功相应,并返回。

遗留问题

说道这里我们感觉好像是解决了这个问题,那么问一个问题,这个参数值可以设置的无限大吗? 比如我将这个参数值设置为10,虽然javaDoc中说明这个参数值的范围在0-1之间,但是并没有说明如果将这个参数调整大于1会出现什么情况。接下来按照上面的流程我们分析下: 之前我们分析的流程中的前提是8761、8762、8763这三台Server的状态是down而8764这个server的状态是up,现在我们修改下这个前提。 假设一开始8761、8762、8763、8764这四台Eureka Server的状态都是down。 -Eureka Client启动时进行注册(8761、8762、8763的状态是down),所以此时quarantineSet的值为3.

  • 接下来在定时任务中又触发注册事件,此时因为参数的值从0.66调整为10。所以计算出的threshold的值为40。而此时quarantineSet的值为3。所以不会进入到 elseif(quarantineSet.size()>=threshold)分支,而是会进入最后的esle分支。

  • 在else分支中会完成过滤功能,最终返回的list中的结果只有一个就是8764这个Eureka Server。

  • Eureka Client向8764这个Eureka Server发起注册请求,因为此时8764的状态也是down导致注册失败,此时quarantineSet中的内容是(8761、8762、8763、8764)

  • 当定时任务再次触发时 if(quarantineSet.isEmpty())这个分支不会进入,因为此时quarantineSet的值为4

  • elseif(quarantineSet.size()>=threshold)这分支也不会进入因为threshold的值为40

  • 最终会进入else分支,这个分支原本的含义是想通过quarantineSet来充当过滤器,从全量的Eureka Server中过滤掉之前状态为down的Eureka Server,但是由于quarantineSet的值现在已经是全量,导致过滤后的结果返回的是一个空的list。即使此时Eureka Server列表(8761、8762、8763、8764)任何一个Server的状态变为UP,该Eureka Client也不可能完成注册事件。

解决办法

上面出现的那个问题,根本原因个人认为是由于 eureka.client.transport.retryableClientQuarantineRefreshPercentage参数过大而源码中没有校验,从而导致没有进入 elseif(quarantineSet.size()>=threshold)的逻辑分支,因为此时如果quarantineSet中的值已经达到了所有Eureka Server列表,那么此时我们希望的是将这个Set集合清空,从而再次返回全量的Eureka Server列表,也就是说再重新来一次注册流程。 所以基于上面的分析,个人认为在源码的 getHostCandidates增加下校验,具体代码如下:

 
   
   
 
  1. private List<EurekaEndpoint> getHostCandidates() {

  2.    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();

  3.    quarantineSet.retainAll(candidateHosts);

  4.    // If enough hosts are bad, we have no choice but start over again

  5.    int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());

  6.    /**

  7.     * 增加判断如果threshold的值过大,即超过Eureka Server

  8.     * 列表的数量,那么将其再次赋值,赋值的内容为Eureka Server

  9.     * 列表的数量。

  10.     */

  11.    if (threshold > candidateHosts.size()) {

  12.          threshold = candidateHosts.size();

  13.    }

  14.    if (quarantineSet.isEmpty()) {

  15.        // no-op

  16.    } else if (quarantineSet.size() >= threshold) {

  17.        logger.debug("Clearing quarantined list of size {}", quarantineSet.size());

  18.        quarantineSet.clear();

  19.    } else {

  20.        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());

  21.        for (EurekaEndpoint endpoint : candidateHosts) {

  22.            if (!quarantineSet.contains(endpoint)) {

  23.                remainingHosts.add(endpoint);

  24.            }

  25.        }

  26.        candidateHosts = remainingHosts;

  27.    }

  28.    return candidateHosts;

  29. }

以上内容就是个人对 eureka.client.transport.retryableClientQuarantineRefreshPercentage的理解,由于本人知识水平有限,对此问题也可能理解不正确,还请大家多多留言讨论。

这个问题本人也在Eureka官方gitHub提交iussue,具体内容如下:https://github.com/Netflix/eureka/issues/1012

最后感谢Spring4all社区提供这个平台,能让大家交流学习Spring相关知识。

推荐阅读






以上是关于Eureka中RetryableClientQuarantineRefreshPercentage参数探秘的主要内容,如果未能解决你的问题,请参考以下文章

spring boot 1.5.2中eureka客户端如何找到eureka server?

服务注册组件eureka

Spring Cloud中,Eureka常见问题总结

Eureka学习:搭建eureka-server将user-service注册到eureka-server中

Eureka搭建

Eureka与zookeeper