hystrix源码小贴士之命令四种调用方式的异同

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hystrix源码小贴士之命令四种调用方式的异同相关的知识,希望对你有一定的参考价值。

  AbstractCommand有两种种调用方式,分别是observe,toObservable

  toObservable是hystrix创建的一个Observable,当向这个Observable监听时,它会执行run命令,并返回消息。 

 Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

  具体流程可以参见:hystrix源码之AbstractCommand

  observe方法使用ReplaySubject调用并监听了toObservable方法返回的Observable,然后返回该ReplaySubject。

public Observable<R> observe() {
        // us a ReplaySubject to buffer the eagerly subscribed-to Observable
        ReplaySubject<R> subject = ReplaySubject.create();
        // eagerly kick off subscription
        final Subscription sourceSubscription = toObservable().subscribe(subject);
        // return the subject that can be subscribed to later while the execution has already started
        return subject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                sourceSubscription.unsubscribe();
            }
        });
    }

  HystrixObservableCommand继承AbstractCommand,没有提供额外执行方法。

  HystrixCommand继承AbstractCommand,额外提供了两个方法调用,分别为execute,queue。

  queue方法通过异步方式活动结果

public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                     * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                     * issued by different threads, it‘s unclear about what value would be used by the time mayInterruptOnCancel is checked.
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
            
        };

  execute方法使用同步方式获取结果,本质是调用了queue

public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

以上是关于hystrix源码小贴士之命令四种调用方式的异同的主要内容,如果未能解决你的问题,请参考以下文章

hystrix源码小贴士之调用异常处理

hystrix源码小贴士之Yammer Publisher

hystrix源码小贴士之Servo Publisher

hystrix源码小贴士之之hystrix-metrics-event-stream

hystrix源码小贴士之中断

hystrix总结之命令执行