前言
上一篇讲解了hystrix四个接口的关系和调用流程,这一节主要讲解一下在住流程中熔断策略是怎样的,是如何判断应用是否该进行熔断的
强制开关
在流量进来的时候,会经过这个方法来判断现在的熔断开关状态,如果为true则允许流量通过,如果为false则进入fallback阶段
@Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { // 强制打开熔断开关,所有请求都走fallback逻辑 return false ; } if (properties.circuitBreakerForceClosed().get()) { // we still want to allow isOpen() to perform it‘s calculations so we simulate normal behavior isOpen(); // 强制关闭熔断开关,所有都走正常流程 return true ; } //如果没有设置强制关闭和打开,走正常的熔断策略判断,如果满足流量探测的情况可以做一次流量放行 return !isOpen() || allowSingleTest(); } |
正常策略
在没有强制开关的前提下,会进入正常的熔断开关判断流程
@Override public boolean isOpen() { if (circuitOpen.get()) { // 如果开关已经被打开,返回true return true ; } // 得到时间窗口内的健康统计 HealthCounts health = metrics.getHealthCounts(); // 如果在时间窗口之内的访问数小于阈值,直接返回为关闭状态,流量都走正常逻辑,默认值是10s内20个请求数 if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the statisticalWindow so we‘ll return false immediately and not calculate anything return false ; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false ; } else { // //如果在时间窗口内超过了RequestVolumeThreshold的阈值,并且错误率达到了上限,打开熔断开关,并设置打开的时间,方便后续健康探测(allowSingleTest) if (circuitOpen.compareAndSet( false , true )) { // if the previousValue was false then we want to set the currentTime // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have // caused another thread to set it to true already even though we were in the process of doing the same // circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true ; } else { return false ; } } } |
流量探测
如果熔断开关打开的时间已经超过了睡眠时间大小,那么应该进行一次流量探测,探测后端服务是否已经恢复
//简单的说就是当当前时间已经超过了上一次熔断打开的时间加上睡眠时间,那么就进行一次流量放行 public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) if the circuit is open // 2) and it‘s been longer than ‘sleepWindow‘ since we opened the circuit if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // We push the ‘circuitOpenedTime‘ ahead by ‘sleepWindow‘ since we have allowed one request to try. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the ‘sleepWindow‘. if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { // if this returns true that means we set the time so we‘ll return true to allow the singleTest // if it returned false it means another thread raced us and allowed the singleTest before we did return true ; } } return false ; } //流量探测成功后会在run方法执行完后,关闭熔断开关,详见executeCommand()方法 public void markSuccess() { if (circuitOpen.get()) { // TODO how can we can do this without resetting the counts so we don‘t lose metrics of short-circuits etc? metrics.resetCounter(); // If we have been ‘open‘ and have a success then we want to close the circuit. This handles the ‘singleTest‘ logic circuitOpen.set( false ); } } |
小结
1.可以设置强制关闭,和强制打开两个熔断开关,并且强制打开开关优先
2.强制关闭开关不影响熔断开关的开闭操作,即,就算是强制关闭开关,熔断开关也会正常的因为错误率和阈值等正常开闭,主要是不影响统计上报的数据
3.默认是错误超过50%且10秒内超过20个请求会进行熔断,具体统计见下小节
4.当熔断开关打开之后,在睡眠时间达到之后,会允许探测流量通过,在run方法执行成功之后会关闭熔断,然后正常服务
请求统计方式
metrics.rollingStats.timeInMilliseconds设置时间窗口大小,默认是10s
metrics.rollingStats.numBuckets设置窗口的桶的数量,默认是10个,且时间大小必须能够被桶数量整除
public HealthCounts getHealthCounts() { // we put an interval between snapshots so high-volume commands don‘t // spend too much unnecessary time calculating metrics in very small time periods long lastTime = lastHealthCountsSnapshot.get(); long currentTime = System.currentTimeMillis(); if (currentTime - lastTime >= properties.metricsHealthSnapshotIntervalInMilliseconds().get() || healthCountsSnapshot == null ) { if (lastHealthCountsSnapshot.compareAndSet(lastTime, currentTime)) { // our thread won setting the snapshot time so we will proceed with generating a new snapshot // losing threads will continue using the old snapshot long success = counter.getRollingSum(HystrixRollingNumberEvent.SUCCESS); //得到时间窗口中成功的总数 long failure = counter.getRollingSum(HystrixRollingNumberEvent.FAILURE); // fallbacks occur on this long timeout = counter.getRollingSum(HystrixRollingNumberEvent.TIMEOUT); // fallbacks occur on this long threadPoolRejected = counter.getRollingSum(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); // fallbacks occur on this long semaphoreRejected = counter.getRollingSum(HystrixRollingNumberEvent.SEMAPHORE_REJECTED); // fallbacks occur on this long shortCircuited = counter.getRollingSum(HystrixRollingNumberEvent.SHORT_CIRCUITED); // fallbacks occur on this long totalCount = failure + success + timeout + threadPoolRejected + shortCircuited + semaphoreRejected; long errorCount = failure + timeout + threadPoolRejected + shortCircuited + semaphoreRejected; int errorPercentage = 0 ; if (totalCount > 0 ) { errorPercentage = ( int ) (( double ) errorCount / totalCount * 100 ); } healthCountsSnapshot = new HealthCounts(totalCount, errorCount, errorPercentage); } } return healthCountsSnapshot; } |
上面代码可以看到,当在判断熔断状态时得到一个HealthCount对象,这个对象中包含success error errorpercent。都是在目前时间窗口中算出来的。从上面的代码可以看到,fallback在哪些地方会执行,分别是:
- run方法抛出异常,执行失败
- run方法超时
- 线程池和semaphore两种方式请求数已满导致的拒绝
- 熔断开关打开
使用方可以自定义参数来调整时间和桶的大小来适配自己的业务需求,下面这张图简单的说明了熔断的流程