深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的超时机制的原理和实现分析

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的超时机制的原理和实现分析相关的知识,希望对你有一定的参考价值。

[每日一句]

也许你度过了很糟糕的一天,但这并不代表你会因此度过糟糕的一生。

[温馨提示]

承接第一篇文章🏹【深入浅出SpringCloud原理及实战】「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析

[背景介绍]

  • 分布式系统的规模和复杂度不断增加,随着而来的是对分布式系统可用性的要求越来越高。在各种高可用设计模式中,【熔断、隔离、降级、限流】是经常被使用的。而相关的技术,Hystrix本身早已算不上什么新技术,但它却是最经典的技术体系!。

  • Hystrix以实现熔断降级的设计,从而提高了系统的可用性。

  • Hystrix是一个在调用端上,实现断路器模式,以及隔舱模式,通过避免级联故障,提高系统容错能力,从而实现高可用设计的一个Java服务组件库。

  • Hystrix实现了资源隔离机制

前提介绍

Hystrix的超时检测本质上通过启动单独线程去检测的,线程的执行的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。

Hystrix超时后会抛出一个HystrixTimeoutException的异常。

超时检测逻辑

Hystrix的超时包括注册过程和执行过程两个,注册过程如下:

  • 执行lift(new HystrixObservableTimeoutOperator(_cmd))关联超时检测任务

  • 在HystrixObservableTimeoutOperator类中,new TimerListener()负责创建检测任务,HystrixTimer.getInstance().addTimerListener(listener)负责关联定时任务

    • 在HystrixObservableTimeoutOperator类中,addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行

Hystrix的超时执行过程如下:

  1. 在超时后执行listener.tick()方法后执行类TimerListener的tick方法

  2. 在TimerListener类的tick方法中执行timeoutRunnable.run()后执行HystrixContextRunnable的run方法

  3. 在HystrixContextRunnable类run方法中执行child.onError(new HystrixTimeoutException())实现超时

  4. executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> 

        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) 
            this.originalCommand = originalCommand;
        

        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) 
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);
            //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
            final HystrixRequestContext hystrixRequestContext = 
							HystrixRequestContext.getContextForCurrentThread();
            TimerListener listener = new TimerListener() 
                @Override
                public void tick() 
                  if(originalCommand.isCommandTimedOut
						.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) 
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, 
							originalCommand.commandKey);
                        // shut down the original request
                        s.unsubscribe();
                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(
                                originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() 
                            @Override
                            public void run() 
                                child.onError(new HystrixTimeoutException());
                            
                        );
                        timeoutRunnable.run();
                    
                
                @Override
                public int getIntervalTimeInMilliseconds() 
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                
            ;
            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);
            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() 
                @Override
                public void onCompleted() 
                    if (isNotTimedOut()) 
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    
                
                @Override
                public void onError(Throwable e) 
                    if (isNotTimedOut()) 
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    
                
                @Override
                public void onNext(R v) 
                    if (isNotTimedOut()) 
                        child.onNext(v);
                    
                
                private boolean isNotTimedOut() 
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
 				originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, 
                                                                             TimedOutStatus.COMPLETED);
                
            ;

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);
            return parent;
        
    
    public Reference<TimerListener> addTimerListener(final TimerListener listener) 
        startThreadIfNeeded();
        // add the listener
        Runnable r = new Runnable() 
            @Override
            public void run() 
                try 
                    listener.tick();
                 catch (Exception e) 
                    logger.error("Failed while ticking TimerListener", e);
                
            
        ;
        //这里直接简单粗暴的scheduleAtFixedRate以超时时间作为周期去判断是否执行完成
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, 
                listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), 
				TimeUnit.MILLISECONDS);
        	return new TimerReference(listener, f);
    

	public class HystrixContextRunnable implements Runnable 
	    private final Callable<Void> actual;
    	private final HystrixRequestContext parentThreadState;
	    public HystrixContextRunnable(Runnable actual) 
    	    this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    	
	    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable 
			actual) 
        	this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    	
	    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, 
                                  final HystrixRequestContext hystrixRequestContext, final Runnable actual) 
    	    this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() 
            @Override
            public Void call() throws Exception 
                actual.run();
                return null;
            

        );
        this.parentThreadState = hystrixRequestContext;
    

    @Override
    public void run() 
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try 
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try 
                actual.call();
             catch (Exception e) 
                throw new RuntimeException(e);
            
         finally 
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        
    

以上是关于深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的超时机制的原理和实现分析的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出SpringCloud原理及实战「Netflix系列之Fegin」打开Fegin之RPC技术的开端,你会使用原生态的Fegin吗?(下)

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的超时机制的原理和实现分析

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析

深入浅出Dubbo3原理及实战「SpringCloud-Alibaba系列」基于Nacos作为注册中心进行发布SpringCloud-alibaba生态的RPC接口实战

深入浅出SpringCloud原理及实战「Netflix系列之Ribbon」针对于负载均衡组件Ribbon的基本参数和实现原理介绍分析

深入浅出SpringCloud原理及实战「SpringCloud-Gateway系列」微服务API网关服务的Gateway全流程开发实践指南(入门篇)