你好Hystrix八:Hystrix执行流程分析-toObservable

Posted 你好JAVA

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了你好Hystrix八:Hystrix执行流程分析-toObservable相关的知识,希望对你有一定的参考价值。

前言

我们这一节的内容就是对Hystrix的请求流程做一个分析,把前面的内容串联起来。

HystrixObservable

该接口提供了两个方法,这两个方法大致提供了一致的功能,都是返回一个Observable对象。

//提供饥饿模式的Observable 立马回执行 HystrixCommand#queue()/execute()命令
Observable<R> observe();
//提供lazy/defer延迟模式的Observable实例,只有在订阅了Observable之后,才惰性的开始执行命令
//所有执行方法的基石 一切都源于它
Observable<R> toObservable();

toObservable

AbstractCommand继承HystrixObservable 所以自然也就实现了HystrixObservable的两个方法。

public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// ...由于代码太长所以省略了变量的定义.....
//下面这个Func0表达式是整个执行的核心 这里只是定义
final Func0<Observable<R>> applyHystrixSemantics = () -> {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
};

//构造一个Observable
return Observable.defer(() -> {
/**
* commandState 初始状态是NOT_STARTED 将commandState改成OBSERVABLE_CHAIN_CREATED 设置失败就抛出异常
*/

if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
//记录命令开始执行时间戳
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
//可以通过hystrix.command.default.requestCache.enabled来配置请求缓存
//这个可以对请求结果按照一定的策略缓存 具体的逻辑可以在子类中定义
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
//如果开启缓存(开启的条件不仅要配置 而且子类要重写getCacheKey方法)
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
//如果缓存不为空就返回缓存的值
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//下面这个表达式是Hystrix执行的核心的
//applyHystrixSemantics在前面定义的一个Func0(返回一个Observable)
//wrapWithAllOnNextHooks 执行钩子方法 onComplete 和 onEmit 这些方法是在
//HystrixCommandExecutionHook中定义的。
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
//到这里 hystrixObservable就是包装之后的流对象了 下面的逻辑就是对
//hystrixObservable进行包装缓存 为了下次命中缓存直接返回
Observable<R> afterCache;
if (requestCacheEnabled && cacheKey != null) {
//这里就会通过 HystrixCachedObservable 来包装结果 包装成一个
//HystrixCommandResponseFromCache 这部分内容我们在 缓存的地方有介绍
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
//这里的doOnTerminate、doOnUnsubscribe、doOnCompleted在前面都有定义
//主要是对各种状态的更改 以及钩子方法的应用 主要的逻辑还是在applyHystrixSemantics中
//到这里的Observable已经是构造好了的对象
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
});
}

上面的方法的核心逻辑在applyHystrixSemantics方法里面。

applyHystrixSemantics

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//执行命令开始执行的钩子方法 可能有人会问 前面绑定了那么多的钩子方法 这里怎么才开始
//start 因为前面绑定但是并没有执行。 当有订阅者订阅 这里才是开始执行的代码逻辑
executionHook.onStart(_cmd);
/**
* 尝试执行 如果断路器是打开的状态 通过这个操作可以将其变为半开的状态
* 如果断路器打开 这里直接进入else 进入 fallback
*/

if (circuitBreaker.attemptExecution()) {
/**
* 如果是信号量隔离 返回TryableSemaphoreActual 根据设置的并发量来判断是否
* 能执行 如果不能执行 进入fallback
* 如果是线程池隔离 返回TryableSemaphoreNoOp 直接返回true没有任何操作
*
*/

final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};

final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};

//如果是线程池隔离这里永远是 true
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
/**
* executeCommandAndObserve 方法是核心方法
*/

return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//信号量执行的时候并发太大直接回退
return handleSemaphoreRejectionViaFallback();
}
} else {
//熔断开启直接回退
return handleShortCircuitViaFallback();
}
}

继续跟进到 executeCommandAndObserve方法中:

executeCommandAndObserve

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();


//主要是来对HystrixCommand和HystrixObservableCommand记录的事件是不同的
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
//HystrixCommand返回的是false 所以不会记录onNext产生的事件流
//HystrixObservableCommand则相反
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
//HystrixCommand 对commandIsScalar的实现返回的是true
//所以在执行完onNext之后 断路器就会记录成功的状态 发送成功的事件流
//而HystrixObservableCommand则相反 因为onNext方法执行完成之后
//需要等到onComplete才会记录成功状态 发送成功事件流
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};

//这个变量就是 onComplete需要执行的 所以这里的判断和markEmits是相反的 主要是为了
//兼容HystrixObservableCommand和HystrixCommand对成功行为的判断标准
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};

//执行失败的逻辑定义
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
//熔断器标记失败
circuitBreaker.markNonSuccess();
//根据异常类型来进入具体的fallback逻辑
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};

//这个操作也是为了父子线程公用一份HystrixRequestContext
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};

Observable<R> execution;
/**
* 如果超时开启 使用HystrixObservableTimeoutOperator来对Observable做超时处理
* 所以不管是信号量隔离还是线程池隔离都会走该逻辑进行超时控制
*/

if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}

return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

上面的方法主要是定义执行成功的处理逻辑和执行失败的处理逻辑以及超时的处理逻辑。

总结

大致的流程主要是上面所述的三个方法toObservableapplyHystrixSemanticsexecuteCommandAndObserve。在分析源码的时候紧靠着三个方法就能把整个流程理顺,并且能把之前的知识点联系起来。


以上是关于你好Hystrix八:Hystrix执行流程分析-toObservable的主要内容,如果未能解决你的问题,请参考以下文章

聊聊Hystrix 命令执行流程

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的Command创建和执行实现原理分析

四.Hystrix执行流程

你好Hystrix六:Hystrix结果缓存-HystrixCachedObservable

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的故障切换的运作流程原理分析

深入 Hystrix 执行时内部原理