二十二.SpringCloud源码剖析-Hystrix降级

Posted 墨家巨子@俏如来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了二十二.SpringCloud源码剖析-Hystrix降级相关的知识,希望对你有一定的参考价值。

前言

这篇文章是接上一篇《SpringCloud源码剖析-Hystrix初始化》,继续完成Hystrix未完成的执行流程

HystrixCommand

上回说到,Hystrix通过HystrixCommandApsect 环绕通知@Around切到@HystrixCommand注解的方法。然后会创建 HystrixCommand 去执行Hystrix,在创建HystrixCommand的时候会初始化熔断器和线程池。HystrixCommand创建好之后,如果没有配置observable模式,默认会交给CommandExecutor去执行,我们接着上一张继续看Hystrix的执行流程

HystrixCommandAspect#methodsAnnotatedWithHystrixCommand:

...省略...
 try {
 		//目标方法是否是采用异步回调,默认false
        if (!metaHolder.isObservable()) {
              result = CommandExecutor.execute(invokable, executionType, metaHolder);
          } else {
              result = executeObservable(invokable, executionType, metaHolder);
          }
        } 

跟下去.HystrixCommandAspect#methodsAnnotatedWithHystrixCommand

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
		//ExecutionType有三种类型,同步执行,异步执行,OBSERVABLE异步回调,默认走SYNCHRONOUS
        switch (executionType) {
        	//同步
            case SYNCHRONOUS: {
            	//把 HystrixInvokable  转成 HystrixExecutable 后执行execute
                return castToExecutable(invokable, executionType).execute();
            }
            //异步执行
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            //OBSERVABLE 模式
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
	//转发了一下类型HystrixInvokable  转成 HystrixExecutable
    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
    }

该方法会根据传入的ExecutionType判断执行的方式,ExecutionType中有三种类型

  • ASYNCHRONOUS:异步执行,
  • SYNCHRONOUS:同步执行,
  • OBSERVABLE:异步回调,你可去百度一下Java的Observer,它其实是观察者的实现

默认走同步执行SYNCHRONOUS,先把 HystrixInvokable 转成 HystrixExecutable 后,执行execute 继续跟下去,代码来到:com.netflix.hystrix.HystrixCommand#execute

public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

 public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
    	
       ...省略...
        return f;
    }


execute方法中调用了queue方法,queue方法中用到了观察者模式,在toObservable方法中可以看到hystrix的降级触发流程,在toFuture方法中最终会通过GennericCommand调用CommandAction去执行我们的方法,这里我关注toObservable源码如下;

 public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;
        ...省略...
        //【第二步】回调
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                //【第三步】
                return applyHystrixSemantics(_cmd);
            }
        };
		
		return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                 ...省略...
				//这里走缓存
                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
				//【第一步】
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                                ...省略...

Hystrix大量使用到了类似于JS的回调函数式编程,applyHystrixSemantics 就是一个回调,通过 Observable.defer(applyHystrixSemantics) 去执行。applyHystrixSemantics 中又调用了 applyHystrixSemantics(_cmd);跟进com.netflix.hystrix.AbstractCommand#applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
		//是否允许执行熔断器,方法中判断电路是否打开,方法中会
		//判断circuitBreakerForceOpen强制打开熔断器 和 circuitBreakerForceClosed强制关闭熔断器
		
        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
        	//信号量
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            //信号量释放的回调
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
			//尝试获取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    //创建一个执行结果对象
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    //执行command和监听
                    return executeCommandAndObserve(_cmd)
                    		//当出现错误
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }

这里确定电路未打开的情况下,获取信号量,然后调用com.netflix.hystrix.AbstractCommand#executeCommandAndObserve方法,继续跟下去:


private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        ... 省略部分代码...
		//【重要】这个是处理降级的回调,处理降级的核心流程
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
            //【第二步】降级
            	//标记未成功
                circuitBreaker.markNonSuccess();
                //拿到异常对象
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                //判断错误类型拒绝执行,超时等,做不同的处理
                if (e instanceof RejectedExecutionException) {
               		 //线程池拒绝
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                	//超时
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                	//请求错误
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
					//处理失败
                    return handleFailureViaFallback(e);
                }
            }
        };
        
  return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                //【第一步】这里执行错误,触发降级回调
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);

该方法中 Observable 去执行,如果出现错误会触发handleFallback降级回调,handleFallback#call方法中拿到异常后,进行处理,我们继续跟踪一下:com.netflix.hystrix.AbstractCommand#handleFailureViaFallback方法

private Observable<R> handleFailureViaFallback(Exception underlying) {
        /**
         * All other error handling
         */
        logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);

        // report failure
        eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

        // record the exception
        executionResult = executionResult.setException(underlying);
		//获取fallback或者抛出异常
        return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
    }

继续跟进 com.netflix.hystrix.AbstractCommand#getFallbackOrThrowException

...省略部分代码...
 // acquire a permit
 				//拿到信号量
                if (fallbackSemaphore.tryAcquire()) {
                    try {
                        if (isFallbackUserDefined()) {
                            executionHook.onFallbackStart(this);
                            //【重要】这里在获取降级的执行链
                            fallbackExecutionChain = getFallbackObservable();
                        } else {
                            //same logic as above without the hook invocation
                            fallbackExecutionChain = getFallbackObservable();
                        }
                    } catch (Throwable ex) {
                        //If hook or user-fallback throws, then use that as the result of the fallback lookup
                        fallbackExecutionChain = Observable.error(ex);
                    }
					//执行降级
                    return fallbackExecutionChain
                            .doOnEach(setRequestContext)
                            .lift(new FallbackHookApplication(_cmd))
                            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                            .doOnNext(markFallbackEmit)
                            .doOnCompleted(markFallbackCompleted)
                            .onErrorResumeNext(handleFallbackError)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } else {
                   return handleFallbackRejectionByEmittingError();
                }
            } else {
                return handleFallbackDisabledByEmittingError(originalException, failureType, message);
            }

这里通过 getFallbackObservable()方法拿到降级执行链,getFallbackObservable方法最终调用com.netflix.hystrix.contrib.javanica.command.GenericCommand#getFallback获取降级

 @Override
    protected Object getFallback() {
    //降级方法的CommandAction
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        //执行降级方法
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }

最终通过 commandAction.executeWithArgs 执行降级方法,底层使用反射调用降级方法,然后把结果返回。

文章结束,最后想吐槽一下,Hystrix真的是我看过最难看的源码,到处用到回调编程,代码可读性是真的差,看来停更是有原因的呀。。。。。。。最后。。。喜欢的话给个好评吧。

以上是关于二十二.SpringCloud源码剖析-Hystrix降级的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud Alibaba微服务实战二十二 - 整合Dubbo

企业分布式微服务云SpringCloud SpringBoot mybatis (二十二)Restdoc生成api文档

SpringCloud - Spring Cloud Alibaba 之 Seata分布式事务服务;Seata TC Server集群部署(二十二)

Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)二十二(下单和微信支付)

Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)二十二(下单和微信支付)

Spring源码分析(二十二)功能扩展