@Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { // 强制打开熔断开关,所有请求都走fallback逻辑 return false ; } if (properties.circuitBreakerForceClosed().get()) { // we still want to allow isOpen() to perform it‘s calculations so we simulate normal behavior isOpen(); // 强制关闭熔断开关,所有都走正常流程 return true ; } //如果没有设置强制关闭和打开,走正常的熔断策略判断,如果满足流量探测的情况可以做一次流量放行 return !isOpen() || allowSingleTest(); } |
@Override public boolean isOpen() { if (circuitOpen.get()) { // 如果开关已经被打开,返回true return true ; } // 得到时间窗口内的健康统计 HealthCounts health = metrics.getHealthCounts(); // 如果在时间窗口之内的访问数小于阈值,直接返回为关闭状态,流量都走正常逻辑,默认值是10s内20个请求数 if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the statisticalWindow so we‘ll return false immediately and not calculate anything return false ; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false ; } else { // //如果在时间窗口内超过了RequestVolumeThreshold的阈值,并且错误率达到了上限,打开熔断开关,并设置打开的时间,方便后续健康探测(allowSingleTest) if (circuitOpen.compareAndSet( false , true )) { // if the previousValue was false then we want to set the currentTime // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have // caused another thread to set it to true already even though we were in the process of doing the same // circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true ; } else { return false ; } } } |
//简单的说就是当当前时间已经超过了上一次熔断打开的时间加上睡眠时间,那么就进行一次流量放行 public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) if the circuit is open // 2) and it‘s been longer than ‘sleepWindow‘ since we opened the circuit if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // We push the ‘circuitOpenedTime‘ ahead by ‘sleepWindow‘ since we have allowed one request to try. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the ‘sleepWindow‘. if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { // if this returns true that means we set the time so we‘ll return true to allow the singleTest // if it returned false it means another thread raced us and allowed the singleTest before we did return true ; } } return false ; } //流量探测成功后会在run方法执行完后,关闭熔断开关,详见executeCommand()方法 public void markSuccess() { if (circuitOpen.get()) { // TODO how can we can do this without resetting the counts so we don‘t lose metrics of short-circuits etc? metrics.resetCounter(); // If we have been ‘open‘ and have a success then we want to close the circuit. This handles the ‘singleTest‘ logic circuitOpen.set( false ); } } |
public HealthCounts getHealthCounts() { // we put an interval between snapshots so high-volume commands don‘t // spend too much unnecessary time calculating metrics in very small time periods long lastTime = lastHealthCountsSnapshot.get(); long currentTime = System.currentTimeMillis(); if (currentTime - lastTime >= properties.metricsHealthSnapshotIntervalInMilliseconds().get() || healthCountsSnapshot == null ) { if (lastHealthCountsSnapshot.compareAndSet(lastTime, currentTime)) { // our thread won setting the snapshot time so we will proceed with generating a new snapshot // losing threads will continue using the old snapshot long success = counter.getRollingSum(HystrixRollingNumberEvent.SUCCESS); //得到时间窗口中成功的总数 long failure = counter.getRollingSum(HystrixRollingNumberEvent.FAILURE); // fallbacks occur on this long timeout = counter.getRollingSum(HystrixRollingNumberEvent.TIMEOUT); // fallbacks occur on this long threadPoolRejected = counter.getRollingSum(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); // fallbacks occur on this long semaphoreRejected = counter.getRollingSum(HystrixRollingNumberEvent.SEMAPHORE_REJECTED); // fallbacks occur on this long shortCircuited = counter.getRollingSum(HystrixRollingNumberEvent.SHORT_CIRCUITED); // fallbacks occur on this long totalCount = failure + success + timeout + threadPoolRejected + shortCircuited + semaphoreRejected; long errorCount = failure + timeout + threadPoolRejected + shortCircuited + semaphoreRejected; int errorPercentage = 0 ; if (totalCount > 0 ) { errorPercentage = ( int ) (( double ) errorCount / totalCount * 100 ); } healthCountsSnapshot = new HealthCounts(totalCount, errorCount, errorPercentage); } } return healthCountsSnapshot; } |
上面代码可以看到,当在判断熔断状态时得到一个HealthCount对象,这个对象中包含success error errorpercent。都是在目前时间窗口中算出来的。从上面的代码可以看到,fallback在哪些地方会执行,分别是:
- run方法抛出异常,执行失败
- run方法超时
- 线程池和semaphore两种方式请求数已满导致的拒绝
- 熔断开关打开