深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的故障切换的运作流程原理分析

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的故障切换的运作流程原理分析相关的知识,希望对你有一定的参考价值。

背景介绍

目前对于一些非核心操作,如增减库存后保存操作日志发送异步消息时(具体业务流程),一旦出现MQ服务异常时,会导致接口响应超时,因此可以考虑对非核心操作引入服务降级、服务隔离。

Hystrix说明

官方文档

Hystrix是Netflix开源的一个容灾框架,解决当外部依赖故障时拖垮业务系统、甚至引起雪崩的问题。

为什么需要Hystrix?

  • 在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo等),在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。

  • 当依赖阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性,在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。

例如:一个依赖30个SOA服务的系统,每个服务99.99%可用。

99.99%的30次方 ≈ 99.7%

0.3% 意味着一亿次请求 会有 3,000,00次失败

换算成时间大约每月有2个小时服务不稳定.

随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.

解决问题方案:对依赖做隔离。

Hystrix设计理念

想要知道如何使用,必须先明白其核心设计理念,Hystrix基于命令模式,通过UML图先直观的认识一下这一设计模式

  • 可见,Command是在ReceiverInvoker之间添加的中间层,Command实现了对Receiver的封装

  • API既可以是Invoker又可以是reciever,通过继承Hystrix核心类HystrixCommand来封装这些API(例如,远程接口调用,数据库查询之类可能会产生延时的操作)

  • 就可以为API提供弹性保护了。

Hystrix如何解决依赖隔离

  1. Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。

  2. 可配置依赖调用超时时间,超时时间一般设为比99.5%平均时间略高即可。当调用超时时,直接返回或执行fallback逻辑。

  3. 为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队,加速失败判定时间。

  4. 依赖调用结果分,成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。

  5. 提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行

  6. 提供近实时依赖的统计和监控

Hystrix流程结构解析

流程说明:

  1. 每次调用构建HystrixCommand或者HystrixObservableCommand对象,把依赖调用封装在run()方法中.

  2. 结果是否有缓存如果没有执行execute()/queue做sync或async调用,对应真正的run()/construct()

  3. 判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.

  4. 判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.

  5. 使用HystrixObservableCommand.construct()还是HystrixCommand.run(),运行依赖逻辑

  6. 依赖逻辑调用超时,进入步骤8

  7. 判断逻辑是否调用成功

    • 6a 返回成功调用结果

    • 6b 调用出错,进入步骤8.

  8. 计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.

  9. getFallback()降级逻辑.
    a. 没有实现getFallback的Command将直接抛出异常

    b. fallback降级逻辑调用成功直接返回

    c. 降级逻辑调用失败抛出异常

  10. 返回执行成功结果

以下四种情况将触发getFallback调用:

  1. run()方法抛出非HystrixBadRequestException异常

  2. run()方法调用超时

  3. 熔断器开启短路调用

  4. 线程池/队列/信号量是否跑满

熔断器:Circuit Breaker

每个熔断器默认维护10个bucket,每秒一个bucket,每个bucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断短路。

Hystrix隔离分析

Hystrix隔离方式采用线程/信号的方式,通过隔离限制依赖的并发量和阻塞扩散.

线程隔离

  • 执行依赖代码的线程与请求线程(如:jetty线程)分离,请求线程可以自由控制离开的时间(异步过程)。

  • 通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。

  • 线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。

实际案例:

Netflix公司内部认为线程隔离开销足够小,不会造成重大的成本或性能的影响。Netflix 内部API 每天100亿的HystrixCommand依赖请求使用线程隔,每个应用大约40多个线程池,每个线程池大约5-20个线程。

信号隔离

信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销。

信号量的大小可以动态调整, 线程池大小不可以。

线程隔离与信号隔离区别如下图:

fallback故障切换降级机制

有兴趣的小伙伴可以看看:官方参考文档

源码分析

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

executeCommandAndObserve
    /**
     * This decorates "Hystrix" functionality around the run() Observable.
     * @return R
     */
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) 
        //......
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable,
			Observable<R>>() 
            @Override
            public Observable<R> call(Throwable t) 
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) 
                    return handleThreadPoolRejectionViaFallback(e);
                 else if (t instanceof HystrixTimeoutException) 
                    return handleTimeoutViaFallback();
                 else if (t instanceof HystrixBadRequestException) 
                    return handleBadRequestByEmittingError(e);
                 else 
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain
					 * HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) 
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    
                    return handleFailureViaFallback(e);
                
            
        ;
        //......
        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) 
            execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
         else 
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        
        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    

使用Observable的onErrorResumeNext,里头调用了handleFallback,handleFallback中区分不同的异常来调用不同的fallback。

  • RejectedExecutionException调用handleThreadPoolRejectionViaFallback

  • HystrixTimeoutException调用handleTimeoutViaFallback

  • 非HystrixBadRequestException的调用handleFailureViaFallback

applyHystrixSemantics
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) 
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) 
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() 
                @Override
                public void call() 
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) 
                        executionSemaphore.release();
                    
                
            ;
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() 
                @Override
                public void call(Throwable t) 
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                
            ;
            if (executionSemaphore.tryAcquire()) 
                try 
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                 catch (RuntimeException e) 
                    return Observable.error(e);
                
             else 
                return handleSemaphoreRejectionViaFallback();
            
         else 
            return handleShortCircuitViaFallback();
        
    
  • applyHystrixSemantics方法针对executionSemaphore.tryAcquire()没通过的调用

  • handleSemaphoreRejectionViaFallback

  • applyHystrixSemantics方法针对circuitBreaker.attemptExecution()没通过的调用handleShortCircuitViaFallback()

ViaFallback方法
    private Observable<R> handleSemaphoreRejectionViaFallback() 
        Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
        executionResult = executionResult.setExecutionException(semaphoreRejectionException);
        eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
        logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
        // retrieve a fallback or throw an exception if no fallback available
        return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
                "could not acquire a semaphore for execution", semaphoreRejectionException);
    

    private Observable<R> handleShortCircuitViaFallback() 
        // record that we are returning a short-circuited fallback
        eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
        // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
        Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
        executionResult = executionResult.setExecutionException(shortCircuitException);
        try 
            return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                    "short-circuited", shortCircuitException);
         catch (Exception e) 
            return Observable.error(e);
        
    

    private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) 
        eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
        threadPool.markThreadRejection();
        // use a fallback instead (or throw exception if not implemented)
        return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);
    

    private Observable<R> handleTimeoutViaFallback() 
        return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
    

    private Observable<R> handleFailureViaFallback(Exception underlying) 
        /**
         * All other error handling
         */
        logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);

        // report failure
        eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

        // record the exception
        executionResult = executionResult.setException(underlying);
        return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
    
  • handleSemaphoreRejectionViaFallback、handleShortCircuitViaFallback、handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleFailureViaFallback这几个方法调用了getFallbackOrThrowException
  • 其eventType分别是SEMAPHORE_REJECTED、SHORT_CIRCUITED、THREAD_POOL_REJECTED、TIMEOUT、FAILURE
  • AbstractCommand.getFallbackOrThrowException

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java


    /**
     * Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions.
     * <p>
     * Fallback implementations shouldn't perform anything that can be blocking, but we protect against it anyways in case someone doesn't abide by the contract.
     * <p>
     * If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially
     * all threads to pile up and block.
     *
     * @return K
     * @throws UnsupportedOperationException
     *             if getFallback() not implemented
     * @throws HystrixRuntimeException
     *             if getFallback() fails (throws an Exception) or is rejected by the semaphore
     */
    private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) 
        final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
        // record the executionResult
        // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
        executionResult = executionResult.addEvent((int) latency, eventType);

        if (isUnrecoverable(originalException)) 
            logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);

            /* executionHook for all errors */
            Exception e = wrapWithOnErrorHook(failureType, originalException);
            return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
         else 
            if (isRecoverableError(originalException)) 
                logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
            

            if (properties.fallbackEnabled().get()) 
                /* fallback behavior is permitted so attempt */

                final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() 
                    @Override
                    public void call(Notification<? super R> rNotification) 
                        setRequestContextIfNeeded(requestContext);
                    
                ;

                final Action1<R> markFallbackEmit = new Action1<R>() 
                    @Override
                    public void call(R r) 
                        if (shouldOutputOnNextEvents()) 
                            executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
                        
                    
                ;
                final Action0 markFallbackCompleted = new Action0() 
                    @Override
                    public void call() 
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
                        executionResult = executionResult.addEvent((int) latency, 
						HystrixEventType.FALLBACK_SUCCESS);
                    
                ;
                final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() 
                    @Override
                    public Observable<R> call(Throwable t) 
                        /* executionHook for all errors */
                        Exception e = wrapWithOnErrorHook(failureType, originalException);
                        Exception fe = getExceptionFromThrowable(t);

                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        Exception toEmit;

                        if (fe instanceof UnsupportedOperationException) 
                            logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
                         else 
                            logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                            executionResult = executionResult.addEvent((int以上是关于深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的故障切换的运作流程原理分析的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出SpringCloud原理及实战「Netflix系列之Fegin」打开Fegin之RPC技术的开端,你会使用原生态的Fegin吗?(下)

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的超时机制的原理和实现分析

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析

深入浅出Dubbo3原理及实战「SpringCloud-Alibaba系列」基于Nacos作为注册中心进行发布SpringCloud-alibaba生态的RPC接口实战

深入浅出SpringCloud原理及实战「Netflix系列之Ribbon」针对于负载均衡组件Ribbon的基本参数和实现原理介绍分析

深入浅出SpringCloud原理及实战「SpringCloud-Gateway系列」微服务API网关服务的Gateway全流程开发实践指南(入门篇)