hystrix可以将同一个命令的多次执行合并到一起执行。
public class HelloWorldCommandCollapser extends HystrixCollapser<List<String>,String,String> { private String name; public HelloWorldCommandCollapser(String name){ this.name = name; } @Override public String getRequestArgument() { return name; } @Override protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) { return new BatchHystrixCommand(collapsedRequests); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) { int i =0; for(CollapsedRequest collapsedRequest:collapsedRequests){ collapsedRequest.setResponse(batchResponse.get(i)); i++; } } private class BatchHystrixCommand extends HystrixCommand{ private Collection<CollapsedRequest<String, String>> collapsedRequests; public BatchHystrixCommand(Collection<CollapsedRequest<String, String>> collapsedRequests){ super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.collapsedRequests =collapsedRequests; } @Override protected Object run() throws Exception { List<String> result = new ArrayList<String>(); for(CollapsedRequest collapsedRequest:collapsedRequests){ result.add("helloworld:"+collapsedRequest.getArgument()); } return result; } }
方法调用
HystrixRequestContext context = HystrixRequestContext.initializeContext(); try{ String result1 = new HelloWorldCommandCollapser("one").execute(); String result2 = new HelloWorldCommandCollapser("two").execute(); String result3 = new HelloWorldCommandCollapser("three").execute(); String result4 = new HelloWorldCommandCollapser("four").execute(); String result5 = new HelloWorldCommandCollapser("five").execute(); String result6 = new HelloWorldCommandCollapser("six").execute(); System.out.println(result1); System.out.println(result2); System.out.println(result3); System.out.println(result4); System.out.println(result5); System.out.println(result6); }finally { context.shutdown(); }
继承HystrixCollapser的命令,命令将会被集合到一起,当数量或时间到达设定的触发点时,统一执行。
getRequestArgument 获取请求参数,命令执行时,实际是将该方法的参数设置到批量执行对象中。
createCommand 批量执行对象通过该方法获得实际执行批量的命令,并返回结果。
mapResponseToRequests 批量执行对象获得执行结果后,将结果与请求进行匹配。
本质原理如下:
当执行继承HystrixCollapser方法时,命令不会被实际执行,会获取getRequestArgument获得执行参数,添加到批量执行的对象中去。
public Observable<ResponseType> toObservable(Scheduler observeOn) { return Observable.defer(new Func0<Observable<ResponseType>>() { @Override public Observable<ResponseType> call() { ... RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument()); ... return response; } }); }
RequestCollapser是批量执行的对象,它有两种作用域,一个是全局范围,一个是一个请求范围内。全局范围通过今天变量实现,一个请求范围通过HystrixRequestVariableHolder实现。
当向RequestCollapser添加参数时,当参数到达一定数量时,就会执行批量。
public Observable<ResponseType> submitRequest(final RequestArgumentType arg) { ... while (true) { final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get(); ...final Observable<ResponseType> response; if (arg != null) { response = b.offer(arg); } else { response = b.offer( (RequestArgumentType) NULL_SENTINEL); } //如果到达一定数量,respose返回null if (response != null) { return response; } else { //执行批量 createNewBatchAndExecutePreviousIfNeeded(b); } } }
RequestCollapser内部有一个定时器,每个一定时间就会批量执行并返回结果。
private class CollapsedTask implements TimerListener { final Callable<Void> callableWithContextOfParent; CollapsedTask() { callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() { @Override public Void call() throws Exception { ...
RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
if (currentBatch != null && currentBatch.getSize() > 0) {
createNewBatchAndExecutePreviousIfNeeded(currentBatch);
}
... } }); } @Override public void tick() { ...
callableWithContextOfParent.call();
...
} @Override public int getIntervalTimeInMilliseconds() { return properties.timerDelayInMilliseconds().get(); } }
批量执行
public void executeBatchIfNotAlreadyStarted() { ... try { Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values()); for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) { try { Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);//获取批量执行结果 //批量执行结果映射到执行请求中 commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
... }).doOnCompleted(new Action0() { ... }).subscribe(); } catch (Exception e) { ... } } } catch (Exception e) { ... } finally { batchLock.writeLock().unlock(); } } }