dubbo源码阅读-Filter默认实现之CacheFiler

Posted 意犹未尽

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo源码阅读-Filter默认实现之CacheFiler相关的知识,希望对你有一定的参考价值。

API文档

http://dubbo.apache.org/zh-cn/docs/user/demos/result-cache.html

缓存使用例子

可参考:https://blog.csdn.net/hardworking0323/article/details/81293402

CacheFilter

/**
 * CacheFilter
 * group为consumer或者provider 同时 含有cache=的配置的时候 返回此过滤器
 */
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {

    /**
     * CacheFactory$Adaptive 对象。
     * <p>
     * X * 通过 Dubbo SPI 机制,调用 {@link #setCacheFactory(CacheFactory)} 方法,进行注入
     */
    private CacheFactory cacheFactory;

    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //方法开启 Cache 功能 判断method是否配置了cache
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
            //通过cacheFactory获取cache对象
            Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation);
            if (cache != null) {
                String key = StringUtils.toArgumentString(invocation.getArguments());
                Object value = cache.get(key);
                if (value != null) {
                    return new RpcResult(value);
                }
                Result result = invoker.invoke(invocation);
                //没发生异常 存入cache
                if (!result.hasException() && result.getValue() != null) {
                    cache.put(key, result.getValue());
                }
                return result;
            }
        }
        return invoker.invoke(invocation);
    }

}

CacheFactory

接口定义

//默认是lru
@SPI("lru")
public interface CacheFactory {

    //是否带SPI扩展根据cache参数获取
    @Adaptive("cache")
    Cache getCache(URL url, Invocation invocation);

}

AbstractCacheFactory

/**
 * AbstractCacheFactory
 */
public abstract class AbstractCacheFactory implements CacheFactory {

    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();

    /**
     * 模板方法模式 父类通过将cacheKey存入缓存 子类负责创建Cache
     * @param url
     * @param invocation
     * @return
     */
    @Override
    public Cache getCache(URL url, Invocation invocation) {
        url = url.addParameter(Constants.METHOD_KEY, invocation.getMethodName());
        //获取Key 并从缓存获取
        String key = url.toFullString();
        Cache cache = caches.get(key);
        if (cache == null) {
            //交给子类创建缓存
            caches.put(key, createCache(url));
            cache = caches.get(key);
        }
        return cache;
    }

    protected abstract Cache createCache(URL url);

}

Lru实现

基于最近最少使用原则删除多余缓存,保持最热的数据被缓存。

LruCacheFactory

/**
 * LruCacheFactory
 */
public class LruCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new LruCache(url);
    }

}

LruCache

public class LruCache implements Cache {

    /**
     * 缓存集合
     */
    private final Map<Object, Object> store;

    public LruCache(URL url) {
        // `"cache.size"` 配置项,设置缓存大小
        final int max = url.getParameter("cache.size", 1000);
        // 创建 LRUCache 对象 内部采用likendMap实现
        this.store = new LRUCache<Object, Object>(max);
    }

    @Override
    public void put(Object key, Object value) {
        store.put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get(key);
    }

}

ThreadLocal实现

ThreadLocalCacheFactory

/**
 * ThreadLocalCacheFactory
 */
public class ThreadLocalCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new ThreadLocalCache(url);
    }

}

ThreadLocalCache

/**
 * ThreadLocalCache
 */
public class ThreadLocalCache implements Cache {

    private final ThreadLocal<Map<Object, Object>> store;

    public ThreadLocalCache(URL url) {
        this.store = new ThreadLocal<Map<Object, Object>>() {
            @Override
            protected Map<Object, Object> initialValue() {
                return new HashMap<Object, Object>();
            }
        };
    }

    @Override
    public void put(Object key, Object value) {
        store.get().put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get().get(key);
    }

}

JCache实现

与 JSR107 集成,可以桥接各种缓存实现。

https://www.cnblogs.com/MagicAsa/p/10756331.html

JCacheFactory

/**
 * JCacheFactory
 */
public class JCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new JCache(url);
    }

}

 

JCache

public class JCache implements com.alibaba.dubbo.cache.Cache {

    private final Cache<Object, Object> store;

    public JCache(URL url) {
        String method = url.getParameter(Constants.METHOD_KEY, "");
        String key = url.getAddress() + "." + url.getServiceKey() + "." + method;
        // jcache parameter is the full-qualified class name of SPI implementation
        String type = url.getParameter("jcache");

        CachingProvider provider = type == null || type.length() == 0 ? Caching.getCachingProvider() : Caching.getCachingProvider(type);
        CacheManager cacheManager = provider.getCacheManager();
        Cache<Object, Object> cache = cacheManager.getCache(key);
        if (cache == null) {
            try {
                //configure the cache
                MutableConfiguration config =
                        new MutableConfiguration<Object, Object>()
                                .setTypes(Object.class, Object.class)
                                .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, url.getMethodParameter(method, "cache.write.expire", 60 * 1000))))
                                .setStoreByValue(false)
                                .setManagementEnabled(true)
                                .setStatisticsEnabled(true);
                cache = cacheManager.createCache(key, config);
            } catch (CacheException e) {
                // concurrent cache initialization
                cache = cacheManager.getCache(key);
            }
        }

        this.store = cache;
    }

    @Override
    public void put(Object key, Object value) {
        store.put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get(key);
    }

}

总结

我们可以通过JCCache扩展我们的缓存 比如redis缓存

以上是关于dubbo源码阅读-Filter默认实现之CacheFiler的主要内容,如果未能解决你的问题,请参考以下文章

dubbo源码阅读-Filter默认实现之CacheFiler

dubbo源码分析之过滤器Filter-12

dubbo源码阅读-服务订阅之远程订阅(dubbo)

dubbo源码阅读之SPI

Dubbo源码阅读系列之远程服务调用(上)

Dubbo Filter机制概述