Hystrix执行原理

Posted 程序员泥瓦匠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hystrix执行原理相关的知识,希望对你有一定的参考价值。

前奏

Hystrix的常规使用姿势

 
   
   
 
  1.    @Test

  2.    public void test_run(){

  3.        String s = new CommandHelloWorld("Bob").execute();

  4.        System.out.println(s);

  5.    }

我们的command在new的时候发生了什么?execute()是如何执行的?execute执行失败或者超时如何fallback?

一、PREPARE 初始化

当我们new XXCommand()的时候,大部分的工作都是在 AbstractCommand完成

 
   
   
 
  1. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,

  2.        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,

  3.        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,

  4.        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {


  5.    this.commandGroup = initGroupKey(group);

  6.    this.commandKey = initCommandKey(key, getClass());

  7.    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);

  8.    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());

  9.    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);

  10.    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);

  11.    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);


  12.    //Strategies from plugins

  13.    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();

  14.    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  15.    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);

  16.    this.executionHook = initExecutionHook(executionHook);


  17.    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);

  18.    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);


  19.    /* fallback semaphore override if applicable */

  20.    this.fallbackSemaphoreOverride = fallbackSemaphore;


  21.    /* execution semaphore override if applicable */

  22.    this.executionSemaphoreOverride = executionSemaphore;

  23. }

可以很清晰的看到,这里面在进行command配置装载、线程池配置装载及线程池的创建、指标搜集器、熔断器的初始化等等。

 
   
   
 
  1. //HystrixCommandMetrics

  2. ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();


  3. //HystrixThreadPoolDefault

  4. final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();


  5. //com.netflix.hystrix.HystrixCircuitBreaker.Factory

  6. private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

除HystrixCommand每次都需要重新建立,其它基本都以commandKey维护着配置,熔断器,指标的单例而线程池则以threadkey进场存储。

我们可以了了解下Hystrix的线程池如何管理 创建线程调用 HystrixThreadPool.Factory.getInstance

 
   
   
 
  1. static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {

  2.    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work

  3.    String key = threadPoolKey.name();


  4.    // this should find it for all but the first time

  5.    HystrixThreadPool previouslyCached = threadPools.get(key);

  6.    if (previouslyCached != null) {

  7.        return previouslyCached;

  8.    }


  9.    // if we get here this is the first time so we need to initialize

  10.    synchronized (HystrixThreadPool.class) {

  11.        if (!threadPools.containsKey(key)) {

  12.            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));

  13.        }

  14.    }

  15.    return threadPools.get(key);

  16. }

从缓存中以threadPoolKey获取线程池,获取不到则 调用 newHystrixThreadPoolDefault新建

 
   
   
 
  1. public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {

  2.    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);

  3.    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  4.    this.queueSize = properties.maxQueueSize().get();


  5.    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,

  6.            concurrencyStrategy.getThreadPool(threadPoolKey, properties),

  7.            properties);

  8.    this.threadPool = this.metrics.getThreadPool();

  9.    this.queue = this.threadPool.getQueue();


  10.    /* strategy: HystrixMetricsPublisherThreadPool */

  11.    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);

  12. }

注意

this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);

其中 concurrencyStrategy.getThreadPool, HystrixConcurrencyStrategy就是hystrix的线程创建策略者

真正的创建线程执行 HystrixConcurrencyStrategy#getThreadPool

 
   
   
 
  1. public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {

  2.    .....各种配置,此处代码省略......


  3.    if (allowMaximumSizeToDivergeFromCoreSize) {

  4.        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();

  5.        if (dynamicCoreSize > dynamicMaximumSize) {

  6.            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +

  7.                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +

  8.                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");

  9.            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

  10.        } else {

  11.            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

  12.        }

  13.    } else {

  14.        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

  15.    }

  16. }

这里调用java JUC原生的 ThreadPoolExecutor创建线程

二、Observable 大串烧

Hystrix的执行利用RxJava,组合了很多的Observable,形成一个Observable,和传统的调用链相比更加简洁。

三、各色Observable显神通

3.1.command 状态位

Hystrix执行原理

  1. toObservable 第一个observable,在下一个chain之前,会更改HystrixCommand状态位OBSERVABLE_CHAIN_CREATED

  2. toObservable doOnTerminate,探测到terminate时,会将HystrixCommand更改为 TERMINAL

  3. executeCommandWithSpecifiedIsolation在开始执行的时候会更改HystrixCommand更改为 USER_CODE_EXECUTED

  4. toObservable doOnUnsubscribe,探测到terminate时,会将HystrixCommand更改为 UNSUBSCRIBED

3.2.executeCommandWithSpecifiedIsolation

分配执行线程,维护线程状态

 
   
   
 
  1. private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

  2.    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {

  3.        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)

  4.        return Observable.defer(new Func0<Observable<R>>() {

  5.            @Override

  6.            public Observable<R> call() {

  7.                .....省略干扰代码.....

  8.                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {

  9.                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

  10.                }


  11.                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {

  12.                        // the command timed out in the wrapping thread so we will return immediately

  13.                        // and not increment any of the counters below or other such logic

  14.                        return Observable.error(new RuntimeException("timed out before executing run()"));

  15.                    }


  16.                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {


  17.                    try {

  18.                       .....省略干扰代码.....


  19.                        return getUserExecutionObservable(_cmd);

  20.                    } catch (Throwable ex) {

  21.                        return Observable.error(ex);

  22.                    }

  23.                } else {

  24.                    //command has already been unsubscribed, so return immediately

  25.                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));

  26.                }

  27.            }

  28.        }).doOnTerminate(new Action0() {

  29.            @Override

  30.            public void call() {

  31.                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {

  32.                    handleThreadEnd(_cmd);

  33.                }

  34.                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {

  35.                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)

  36.                }

  37.                //if it was unsubscribed, then other cleanup handled it

  38.            }

  39.        }).doOnUnsubscribe(new Action0() {

  40.            @Override

  41.            public void call() {

  42.                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {

  43.                    handleThreadEnd(_cmd);

  44.                }

  45.                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {

  46.                    //if it was never started and was cancelled, then no need to clean up

  47.                }

  48.                //if it was terminal, then other cleanup handled it

  49.            }

  50.        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

  51.            @Override

  52.            public Boolean call() {

  53.                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

  54.            }

  55.        }));

  56.    } else {

  57.        .....省略干扰代码.....

  58.    }

  59. }

具体逻辑 1.判断隔离策略,如果是Semaphore 信号量则在当前线程上执行,否则进入线程分配逻辑 2.更改HystrixCommand的状态 USER_CODE_EXECUTED 3.判断HystrixCommand超时状态,如果已经超时则抛出异常 4.更改当前command的线程执行状态为 STARTED 5.调用 getUserExecutionObservable 执行具体逻辑 6. doOnTerminate 当Observale执行完毕后(HystrixCommand可能失败也可能执行成功),此时的线程状态可能有两种分别是 STARTEDNOT_USING_THREAD , 然后更改线程状态为 TERMINAL 7. doOnUnsubscribe 当Observable被取消订阅,更改线程状态为 TERMINAL 8. subscribeOn 指定scheduler,这里Hystrix实现了自己的scheduler,在scheduler的worker指定线程池,在配置线程之前会重新加载线程池配置(这里是Rxjava的东西,暂时大家可以粗略的认为这里就是指定线程池,然后把要执行的任务扔到这个线程池里)

 
   
   
 
  1. @Override

  2. public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {

  3.    touchConfig();

  4.    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

  5. }


  6. // allow us to change things via fast-properties by setting it each time

  7. private void touchConfig() {

  8.    final int dynamicCoreSize = properties.coreSize().get();

  9.    final int configuredMaximumSize = properties.maximumSize().get();

  10.    int dynamicMaximumSize = properties.actualMaximumSize();

  11.    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();

  12.    boolean maxTooLow = false;


  13.    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {

  14.        //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum

  15.        dynamicMaximumSize = dynamicCoreSize;

  16.        maxTooLow = true;

  17.    }


  18.    // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.

  19.    if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {

  20.        if (maxTooLow) {

  21.            logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +

  22.                    dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ".  Maximum size will be set to " +

  23.                    dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");

  24.        }

  25.        threadPool.setCorePoolSize(dynamicCoreSize);

  26.        threadPool.setMaximumPoolSize(dynamicMaximumSize);

  27.    }


  28.    threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);

  29. }

touchConfig 执行具体的线程池参数调整。

从上面的过程也能发现,该observable也是维护线程状态的地方,线程的状态变更见下图

3.3.getUserExecutionObservable

执行具体业务逻辑

 
   
   
 
  1. private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {

  2.    Observable<R> userObservable;


  3.    try {

  4.        userObservable = getExecutionObservable();

  5.    } catch (Throwable ex) {

  6.        // the run() method is a user provided implementation so can throw instead of using Observable.onError

  7.        // so we catch it here and turn it into Observable.error

  8.        userObservable = Observable.error(ex);

  9.    }


  10.    return userObservable

  11.            .lift(new ExecutionHookApplication(_cmd))

  12.            .lift(new DeprecatedOnRunHookApplication(_cmd));

  13. }

userObservable=getExecutionObservable(); 由HystrixCommand自己实现

 
   
   
 
  1. //HystrixCommand

  2. final protected Observable<R> getExecutionObservable() {

  3.    return Observable.defer(new Func0<Observable<R>>() {

  4.        @Override

  5.        public Observable<R> call() {

  6.            try {

  7.                return Observable.just(run());

  8.            } catch (Throwable ex) {

  9.                return Observable.error(ex);

  10.            }

  11.        }

  12.    }).doOnSubscribe(new Action0() {

  13.        @Override

  14.        public void call() {

  15.            // Save thread on which we get subscribed so that we can interrupt it later if needed

  16.            executionThread.set(Thread.currentThread());

  17.        }

  18.    });

  19. }

这里看到 run()应该就明白了,就是我们自己的业务代码 CommandHelloWorld去实现的。

3.4.getFallbackOrThrowException

当executeCommandWithSpecifiedIsolation探测到异常时触发该Observable。getFallbackOrThrowException里具体fallback执行看 executeCommandAndObserve。

 
   
   
 
  1. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

  2.    .....省略干扰代码.....

  3.    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {

  4.        .....省略干扰代码.....

  5.    };


  6.    .....省略干扰代码.....


  7.    Observable<R> execution;

  8.    if (properties.executionTimeoutEnabled().get()) {

  9.        execution = executeCommandWithSpecifiedIsolation(_cmd)

  10.                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

  11.    } else {

  12.        execution = executeCommandWithSpecifiedIsolation(_cmd);

  13.    }


  14.    return execution.doOnNext(markEmits)

  15.            .doOnCompleted(markOnCompleted)

  16.            .onErrorResumeNext(handleFallback)

  17.            .doOnEach(setRequestContext);

  18. }

doErrorResumeNext 会触发下一个 handleFallback。

 
   
   
 
  1. private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {

  2.    ....省略干扰代码....


  3.    if (isUnrecoverable(originalException)) {

  4.        ....省略干扰代码....

  5.    } else {

  6.        ....省略干扰代码....


  7.        if (properties.fallbackEnabled().get()) {


  8.            ....省略干扰代码....


  9.            Observable<R> fallbackExecutionChain;


  10.            // acquire a permit

  11.            if (fallbackSemaphore.tryAcquire()) {

  12.                try {

  13.                    if (isFallbackUserDefined()) {

  14.                        executionHook.onFallbackStart(this);

  15.                        fallbackExecutionChain = getFallbackObservable();

  16.                    } else {

  17.                        //same logic as above without the hook invocation

  18.                        fallbackExecutionChain = getFallbackObservable();

  19.                    }

  20.                } catch (Throwable ex) {

  21.                    //If hook or user-fallback throws, then use that as the result of the fallback lookup

  22.                    fallbackExecutionChain = Observable.error(ex);

  23.                }


  24.                return fallbackExecutionChain

  25.                        .doOnEach(setRequestContext)

  26.                        .lift(new FallbackHookApplication(_cmd))

  27.                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))

  28.                        .doOnNext(markFallbackEmit)

  29.                        .doOnCompleted(markFallbackCompleted)

  30.                        .onErrorResumeNext(handleFallbackError)

  31.                        .doOnTerminate(singleSemaphoreRelease)

  32.                        .doOnUnsubscribe(singleSemaphoreRelease);

  33.            } else {

  34.               return handleFallbackRejectionByEmittingError();

  35.            }

  36.        } else {

  37.            return handleFallbackDisabledByEmittingError(originalException, failureType, message);

  38.        }

  39.    }

  40. }

这里优先几个步骤 1.判断异常是否是能走fallback处理,不能则抛出HystrixRuntimeException 2.判断配置是否开启允许fallback,开启,则进入 getFallbackObservable(),而该方法具体有HystrixCommand实现,调用的则是用户的Command的fallback方法,如果调用方没有覆盖该方法,则会执行HystrixCommand的fallback方法,抛出未定义fallback方法的异常

 
   
   
 
  1. protected R getFallback() {

  2.    throw new UnsupportedOperationException("No fallback available.");

  3. }


  4. @Override

  5. final protected Observable<R> getFallbackObservable() {

  6.    return Observable.defer(new Func0<Observable<R>>() {

  7.        @Override

  8.        public Observable<R> call() {

  9.            try {

  10.               //调用方 fallback逻辑

  11.                return Observable.just(getFallback());

  12.            } catch (Throwable ex) {

  13.                return Observable.error(ex);

  14.            }

  15.        }

  16.    });

  17. }


相关热门推荐文章:




长按二维码,扫扫关注哦

✬如果你喜欢这篇文章,欢迎分享和点赞✬

以上是关于Hystrix执行原理的主要内容,如果未能解决你的问题,请参考以下文章

Hystrix执行原理

springcloud报错-------关于 hystrix 的异常 FallbackDefinitionException:fallback method wasn't found(代码片段

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的Command创建和执行实现原理分析

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的缓存请求执行运作原理

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的回退降级实现方案和机制

Hystrix熔断原理