AbstractCommand执行命令前首先会判断是否开启了timeout。
if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); }
HystrixObservableTimeoutOperator的核心是内部又一个定时器,当达到timeout时间时,将命令状态设置成timeout,向eventNotifier发送timeout事件,取消监听,返回一个HystrixTimeoutException的onError。
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { .... @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); child.add(s); /* * Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext * of the calling thread which doesn‘t exist on the Timer thread. */ final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); TimerListener listener = new TimerListener() { @Override public void tick() { // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed or did not start if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); timeoutRunnable.run(); //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } };
...
} }