通过实现HystrixCommand或者HystrixObservableCommand的getCacheKey方法,可以启动缓存。
public class CommandUsingRequestCache extends HystrixCommand<Boolean> { private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }
执行命令
@Test public void testWithoutCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }
Hystrix通过getCacheKey方法来获取缓存中的值,缓存值的生命周期为一个请求。
本质上,在toObservable方法中,在执行前添加了从缓存中获取缓存的逻辑,在执行后,将返回结果存入缓存的逻辑。
public Observable<R> toObservable() { ... return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; if (requestCacheEnabled && cacheKey != null) { HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); ... } else { afterCache = hystrixObservable; } ... } }); }
HystrixCachedObservable内部使用type(命令类型:HystrixCommandKey为1,HystrixCollapserKey为2),commandKey的name,HystrixConcurrencyStrategy作为缓存的key
private static class RequestCacheKey { private final short type; private final String key; private final HystrixConcurrencyStrategy concurrencyStrategy; ... } private static class ValueCacheKey { private final RequestCacheKey rvKey; private final String valueCacheKey; ... }
HystrixCachedObservable的值存储一个Observable,并且通过HystrixCachedObservable进行封装。
protected HystrixCachedObservable(final Observable<R> originalObservable) { ReplaySubject<R> replaySubject = ReplaySubject.create(); this.originalSubscription = originalObservable .subscribe(replaySubject); this.cachedObservable = replaySubject .doOnUnsubscribe(new Action0() { @Override public void call() { outstandingSubscriptions--; if (outstandingSubscriptions == 0) { originalSubscription.unsubscribe(); } } }) .doOnSubscribe(new Action0() { @Override public void call() { outstandingSubscriptions++; } }); }
HystrixCachedObservable内部使用一个Map作为缓存, 这个Map存储在一个HystrixRequestVariableHolder中,所以它的生命周期为一个请求内部。
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>();
<T> HystrixCachedObservable<T> get(String cacheKey) { ValueCacheKey key = getRequestCacheKey(cacheKey); if (key != null) { ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy); if (cacheInstance == null) { throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?"); } /* look for the stored value */ return (HystrixCachedObservable<T>) cacheInstance.get(key); } return null; }
如果异常 是否也缓存?
是的,fallback的异常也会被缓存下来。
缓存的有效范围?
为同一个请求内。