hystrix总结之请求批量执行

Posted zhangwanhua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hystrix总结之请求批量执行相关的知识,希望对你有一定的参考价值。

  hystrix可以将同一个命令的多次执行合并到一起执行。

public class HelloWorldCommandCollapser extends HystrixCollapser<List<String>,String,String> {
    private String name;
    public HelloWorldCommandCollapser(String name){
        this.name = name;
    }
    @Override
    public String getRequestArgument() {
        return name;
    }
    @Override
    protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
        return new BatchHystrixCommand(collapsedRequests);
    }
    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
        int i =0;
        for(CollapsedRequest collapsedRequest:collapsedRequests){
            collapsedRequest.setResponse(batchResponse.get(i));
            i++;
        }
    }
    private class BatchHystrixCommand extends HystrixCommand{
        private Collection<CollapsedRequest<String, String>> collapsedRequests;
        public BatchHystrixCommand(Collection<CollapsedRequest<String, String>> collapsedRequests){
            super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
            this.collapsedRequests =collapsedRequests;
        }
        @Override
        protected Object run() throws Exception {
            List<String> result = new ArrayList<String>();
            for(CollapsedRequest collapsedRequest:collapsedRequests){
                result.add("helloworld:"+collapsedRequest.getArgument());
            }
            return result;
        }
    }

  方法调用

HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try{
            String result1 = new HelloWorldCommandCollapser("one").execute();
            String result2 = new HelloWorldCommandCollapser("two").execute();
            String result3 = new HelloWorldCommandCollapser("three").execute();
            String result4 = new HelloWorldCommandCollapser("four").execute();
            String result5 = new HelloWorldCommandCollapser("five").execute();
            String result6 = new HelloWorldCommandCollapser("six").execute();
            System.out.println(result1);
            System.out.println(result2);
            System.out.println(result3);
            System.out.println(result4);
            System.out.println(result5);
            System.out.println(result6);
        }finally {
            context.shutdown();
        }

  继承HystrixCollapser的命令,命令将会被集合到一起,当数量或时间到达设定的触发点时,统一执行。

  getRequestArgument 获取请求参数,命令执行时,实际是将该方法的参数设置到批量执行对象中。

  createCommand 批量执行对象通过该方法获得实际执行批量的命令,并返回结果。

  mapResponseToRequests 批量执行对象获得执行结果后,将结果与请求进行匹配。

  本质原理如下:

  当执行继承HystrixCollapser方法时,命令不会被实际执行,会获取getRequestArgument获得执行参数,添加到批量执行的对象中去。

public Observable<ResponseType> toObservable(Scheduler observeOn) {
        return Observable.defer(new Func0<Observable<ResponseType>>() {
            @Override
            public Observable<ResponseType> call() {
               ...
                RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
                Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
...
                return response;
            }
        });
    }

  RequestCollapser是批量执行的对象,它有两种作用域,一个是全局范围,一个是一个请求范围内。全局范围通过今天变量实现,一个请求范围通过HystrixRequestVariableHolder实现。  

  当向RequestCollapser添加参数时,当参数到达一定数量时,就会执行批量。

public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
        ...
        while (true) {
            final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
            ...final Observable<ResponseType> response;
            if (arg != null) {
                response = b.offer(arg);
            } else {
                response = b.offer( (RequestArgumentType) NULL_SENTINEL);
            }
            //如果到达一定数量,respose返回null
            if (response != null) {
                return response;
            } else {
                //执行批量
                createNewBatchAndExecutePreviousIfNeeded(b);
            }
        }
    }

  RequestCollapser内部有一个定时器,每个一定时间就会批量执行并返回结果。  

private class CollapsedTask implements TimerListener {
        final Callable<Void> callableWithContextOfParent;
        CollapsedTask() {
            callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    ...
            RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
            if (currentBatch != null && currentBatch.getSize() > 0) {
              createNewBatchAndExecutePreviousIfNeeded(currentBatch);
            }
            ... } }); } @Override public void tick() { ...
        callableWithContextOfParent.call();
       ...
} @Override public int getIntervalTimeInMilliseconds() { return properties.timerDelayInMilliseconds().get(); } }

  批量执行

public void executeBatchIfNotAlreadyStarted() {
     ...
            try {
              Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
                    try {
                        Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);//获取批量执行结果
              //批量执行结果映射到执行请求中
                        commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
               ...
}).doOnCompleted(new Action0() {                ... }).subscribe(); } catch (Exception e) { ... } } } catch (Exception e) { ... } finally { batchLock.writeLock().unlock(); } } }

 










以上是关于hystrix总结之请求批量执行的主要内容,如果未能解决你的问题,请参考以下文章

hystrix总结之异常降级

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的缓存请求执行运作原理

SpringCloud(Hoxton.SR3)基础篇:第五章Hystrix-request collapsing(请求合并)

hystrix文档翻译之工作原理

hystrix熔断器之熔断实现

深入浅出SpringCloud原理及实战「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的请求合并机制实现原理分析