EhCache RMI 分布式缓存/缓存集群

Posted xlxxcc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EhCache RMI 分布式缓存/缓存集群相关的知识,希望对你有一定的参考价值。

EhCache 系统简介

EhCache 是一个纯 Java 的进程内缓存框架,具有快速、精干等特点。
EhCache 的主要特性有:

  1. 快速、精干
  2. 简单;
  3. 多种缓存策略;
  4. 缓存数据有两级:内存和磁盘,因此无需担心容量问题;
  5. 缓存数据会在虚拟机重启的过程中写入磁盘;
  6. 可以通过 RMI、可插入 API 等方式进行分布式缓存;
  7. 具有缓存和缓存管理器的侦听接口;
  8. 支持多缓存管理器实例,以及一个实例的多个缓存区域;
  9. 提供 Hibernate 的缓存实现;

EhCache集群解决的问题:
  由 于 EhCache 是进程中的缓存系统,一旦将应用部署在集群环境中,每一个节点维护各自的缓存数据,当某个节点对缓存数据进行更新,这些更新的数据无法在其它节点中共享, 这不仅会降低节点运行的效率,而且会导致数据不同步的情况发生。例如某个网站采用 A、B 两个节点作为集群部署,当 A 节点的缓存更新后,而 B 节点缓存尚未更新就可能出现用户在浏览页面的时候,一会是更新后的数据,一会是尚未更新的数据。
  所以就需要用到 EhCache 的集群解决方案。
  
EhCache集群方案:

• Terracotta
• RMI
• JMS : 依赖 ehcache-jmsreplication.jar
• JGroups : 依赖ehcache-jgroupsreplication.jar
• EhCache Server

  其中的三种最为常用集群方式,分别是 RMI、JGroups 以及 EhCache Server 。

EhCache集群疑问

• 你如何知道集群环境中的其他缓存?
• 分布式传送的消息是什么形式?
• 什么情况需要进行复制?增加(Puts),更新(Updates)或是失效(Expiries)?
• 采用什么方式进行复制?同步还是异步方式?

EhCache集群基本概念

1、正确的元素类型:只有可序列化的元素可以进行复制。一些操作,比如移除,只需要元素的键值而不用整个元素;在这样的操作中即使元素不是可序列化的但键值是可序列化的也可以被复制。
2、成员发现(Peer Discovery):Ehcache进行集群的时候有一个cache组的概念。每个cache都是其他cache的一个peer,没有主cache的存在。成员发现(Peer Discovery)正是用来解决 “你如何知道集群环境中的其他缓存?” 这个问题的。Ehcache提供了两种机制用来进行成员发现,即:自动成员发现和手动成员发现。要使用一个内置的成员发现机制要在ehcache的配置文件中指定cacheManagerPeerProviderFactory元素的class属性为
net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory。

  自动的发现方式用TCP广播机制来确定和维持一个广播组。它只需要一个简单的配置可以自动的在组中添加和移除成员。在集群中也不需要什么优化服务器的知识,这是默认推荐的。ehcache.xml配置示例代码如下:

cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
properties="peerDiscovery=automatic, multicastGroupAddress=230.0.0.1,
multicastGroupPort=4446, timeToLive=32"/>
<!-- timeToLive
    0是限制在同一个服务器
    1是限制在同一个子网
    32是限制在同一个网站
    64是限制在同一个region
    128是限制在同一个大洲
    255是不限制-->

  手动的发现方式需要知道每个监听器的IP地址和端口。集群成员(也就是服务器)不能在运行时动态地添加和移除。ehcache.xml配置示例代码如下:

<!-- server1 -->
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
properties="peerDiscovery=manual,
rmiUrls=//server2:40001/sampleCache11|//server2:40001/sampleCache12"/>

<!-- server2 -->
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
properties="peerDiscovery=manual,
rmiUrls=//server1:40001/sampleCache11|//server1:40001/sampleCache12"/>

CacheManagerPeerListener
  每个CacheManagerPeerListener监听从成员们发向当前CacheManager的消息。配置 CacheManagerPeerListener需要指定一个CacheManagerPeerListenerFactory,它以插件的机制实现, 用来创建CacheManagerPeerListener。
  Ehcache有一个内置的基于RMI的分布系统。它的监听器是RMICacheManagerPeerListener,这个监听器可以用RMICacheManagerPeerListenerFactory来配置。
  示例代码:

<cacheManagerPeerListenerFactory
class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory"
properties="hostName=localhost, port=40001,
socketTimeoutMillis=2000"/>

  属性说明:

hostname (可选) – 运行监听器的服务器名称。标明了做为集群群组的成员的地址,同时也是你想要控制的从集群中接收消息的接口。
   在CacheManager初始化的时候会检查hostname是否可用。
   如果hostName不可用,CacheManager将拒绝启动并抛出一个连接被拒绝的异常。
  如果指定,hostname将用InetAddress.getLocalHost().getHostAddress()来得到。
port – 监听器监听的端口。
socketTimeoutMillis (可选) – Socket超时的时间。默认是2000ms。当你socket同步缓存请求地址比较远,不是本地局域网。你可能需要把这个时间配置大些,不然很可能延时导致同步缓存失败。

CacheReplicators
  每个要进行同步的cache都需要设置一个用来向CacheManager的成员复制消息的缓存事件监听器。这个工作要通过为每个cache的配置增加一个cacheEventListenerFactory元素来完成。
  代码:

<cache name="sampleCache2" maxElementsInMemory="10" eternal="false" timeToIdleSeconds="100" timeToLiveSeconds="100" overflowToDisk="false">
<cacheEventListenerFactory class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
properties="replicateAsynchronously=true,replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=false, replicateRemovals=true "/>
</cache>

  cacheEventListenerFactory 支持以下属性

replicatePuts=true | false – 当一个新元素增加到缓存中的时候是否要复制到其他的peers. 默认是true。
replicateUpdates=true | false – 当一个已经在缓存中存在的元素被覆盖时是否要进行复制。默认是true。
replicateRemovals= true | false – 当元素移除的时候是否进行复制。默认是true。
replicateAsynchronously=true | false – 复制方式是异步的(指定为true时)还是同步的(指定为false时)。默认是true。
replicatePutsViaCopy=true | false – 当一个新增元素被拷贝到其他的cache中时是否进行复制指定为true时为复制,默认是true。

Cache属性说明:

<cache 
        name="userCache" 
        maxElementsInMemory="10000" 
        eternal="true" 
        overflowToDisk="true" 
        timeToIdleSeconds="0" 
        timeToLiveSeconds="0"
        diskPersistent="false" 
        diskExpiryThreadIntervalSeconds="120">
        <cacheEventListenerFactory 
           class="net.sf.ehcache.distribution.RMICacheReplicatorFactory" 
          properties="replicateAsynchronously=true,replicatePuts=true,replicateUpdates=true,replicateUpdatesViaCopy=false,replicateRemovals=true">  
        </cacheEventListenerFactory>
        <bootstrapCacheLoaderFactory
          class="net.sf.ehcache.distribution.RMIBootstrapCacheLoaderFactory"
               properties="bootstrapAsynchronously=true"> 
        </bootstrapCacheLoaderFactory>
    </cache>

必须属性:
  name:缓存名称。
  maxElementsInMemory:缓存最大个数。
  eternal:对象是否永久有效,一但设置了,timeout将不起作用。
  overflowToDisk:当内存中对象数量达
  maxElementsInMemory时,Ehcache将会对象写到磁盘中。
   diskSpoolBufferSizeMB:这个参数设置DiskStore(磁盘缓存)的缓存区大小。默认是30MB。每个Cache都应该有自己的一个缓冲区。
  maxElementsOnDisk:硬盘最大缓存个数。
可选的属性:
  timeToIdleSeconds:设置对象在失效前的允许闲置时间(单位:秒)。仅当eternal=false对象不是永久有效时使用,可选属性,默认值是0,也就是可闲置时间无穷大。
  timeToLiveSeconds:设置对象在失效前允许存活时间(单位:秒)。最大时间介于创建时间和失效时间之间。仅当eternal=false对象不是永久有效时使用,默认是0.,也就是对象存活时间无穷大。
  diskPersistent:是否disk store在虚拟机启动时持久化. The default value is false.
  memoryStoreEvictionPolicy:当达到maxElementsInMemory限制时,Ehcache将会根据指定的策略去清理内存。默认策略是LRU(最近最少使用)。你可以设置为FIFO(先进先出)或是LFU(较少使用)。
  diskExpiryThreadIntervalSeconds:磁盘失效线程运行时间间隔,默认是120秒。
  clearOnFlush:内存数量最大时是否清除。
缓存子元素:
  cacheEventListenerFactory:注册相应的的缓存监听类,用于处理缓存事件,如put,remove,update,和expire
  bootstrapCacheLoaderFactory:指定相应的BootstrapCacheLoader,用于在初始化缓存,以及自动设置。

EhCache RMI 手动方式配置缓存

Tomcat1: 127.0.0.1:8080, Cache Server1: 127.0.0.1:40001
Tomcat2: 127.0.0.1:8088, Cache Server2: 127.0.0.1:40002

用到的架包:
ehcache-core-2.6.8.jar
log4j-1.2.17.jar
servlet-api.jar (示例中使用了@WebServlet,请确保servler-api的架包版本大于3.0)
slf4j-api-1.6.1.jar
slf4j-log4j12-1.6.1.jar

创建Web工程TestEhcache, 工程目录如下:
这里写图片描述

CacheManagerFactory.java

private CacheManager manager;
    private static CacheManagerFactory factory = new CacheManagerFactory();
    private final static String EHCACHEFILE = "/ehcache.xml";

    private CacheManagerFactory() {
    }

    public static CacheManagerFactory getInstance() {
        return factory;
    }

    public CacheManager getCacheManager() {
        if (manager == null) {
            InputStream is = this.getClass().getResourceAsStream(EHCACHEFILE);
            manager = CacheManager.create(is);
        }
        return manager;
    }

    public Cache getCache(String cache) {
        return getCacheManager().getCache(cache);
    }

    public void setCache(Cache cache) {
        getCacheManager().addCache(cache);
    }

    public void setCache(String cache) {
        getCacheManager().addCache(cache);
    }

    public Element getElement(String cacheName, String key) {
        if (getCache(cacheName) == null)
            setCache(cacheName);
        return getCache(cacheName).get(key);
    }

    public void setElement(String cache, Element element) {
        if (getCache(cache) == null)
            setCache(cache);
        getCache(cache).put(element);
    }

    public Boolean continaElementKey(String cacheName, String key) {
        if (getCache(cacheName) == null)
            setCache(cacheName);
        return getCache(cacheName).isKeyInCache(key);
    }

TesAction.java

@WebServlet("/test")
public class TesAction extends HttpServlet {

    private static final long serialVersionUID = 1L;

    CacheManagerFactory cmf = CacheManagerFactory.getInstance();

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
        doPost(request,response);
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
        String res = "";
        String key = request.getParameter("key");

        Element element = cmf.getElement("userCache", "map");
        if(element == null){
            Map<String, String> map = new HashMap<String, String>();
            map.put(key, key);
            cmf.setElement("userCache", new Element("map", map));
        }else{
            Map<String, String> map = (Map<String, String>) element.getValue();
            res = map.get(key);
            if(res == null){
                map.put(key, key);
                // 多次测试发现,存在同名Element是,重复put的是无法复制的,因此当遇到两个节点同步不上的时候,先remove后put。 
                cmf.getCache("userCache").remove("map");
                cmf.setElement("userCache", new Element("map", map));
                res = "0;null";
            }
        }

        response.setContentType("text/html;charset=UTF-8");
        response.setCharacterEncoding("UTF-8");
        PrintWriter out = response.getWriter();
        out.write(res);
        out.close();
    }

    @Override
    public void init() throws ServletException {
        super.init();
    }

}

ehcache.xml:

<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.sf.net/ehcache.xsd">

    <diskStore path="java.io.tmpdir" />

    <cacheManagerPeerProviderFactory 
        class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
        properties="peerDiscovery=manual,rmiUrls=//127.0.0.1:40002/userCache|//127.0.0.1:40002/resourceCache">
    </cacheManagerPeerProviderFactory>

    <cacheManagerPeerListenerFactory 
        class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory"
        properties="hostName=127.0.0.1, port=40001, socketTimeoutMillis=2000">
    </cacheManagerPeerListenerFactory>

    <defaultCache maxElementsInMemory="10000" eternal="false" timeToIdleSeconds="30" timeToLiveSeconds="30" overflowToDisk="false"/>

    <cache 
        name="userCache" 
        maxElementsInMemory="10000" 
        eternal="true" 
        overflowToDisk="true" 
        timeToIdleSeconds="0" 
        timeToLiveSeconds="0"
        diskPersistent="false" 
        diskExpiryThreadIntervalSeconds="120">
        <cacheEventListenerFactory 
            class="net.sf.ehcache.distribution.RMICacheReplicatorFactory" 
            properties="replicateAsynchronously=true,replicatePuts=true,replicateUpdates=true,replicateUpdatesViaCopy=false,replicateRemovals=true">    
        </cacheEventListenerFactory>
        <bootstrapCacheLoaderFactory
            class="net.sf.ehcache.distribution.RMIBootstrapCacheLoaderFactory"
            properties="bootstrapAsynchronously=true"> 
        </bootstrapCacheLoaderFactory>
    </cache>

    <cache 
        name="resourceCache" 
        maxElementsInMemory="10000" 
        eternal="true" 
        overflowToDisk="true" 
        timeToIdleSeconds="0" 
        timeToLiveSeconds="0"
        diskPersistent="false" 
        diskExpiryThreadIntervalSeconds="120">
        <cacheEventListenerFactory 
            class="net.sf.ehcache.distribution.RMICacheReplicatorFactory" 
            properties="replicateAsynchronously=true,replicatePuts=true,replicateUpdates=true,replicateUpdatesViaCopy=false,replicateRemovals=true">    
        </cacheEventListenerFactory>
        <bootstrapCacheLoaderFactory
            class="net.sf.ehcache.distribution.RMIBootstrapCacheLoaderFactory"
            properties="bootstrapAsynchronously=true"> 
        </bootstrapCacheLoaderFactory>
    </cache>

</ehcache>

复制TestEhcache到 TestEhcache1, 修改TestEhcache1下的ehcache.xml ,只需要修改cacheManagerPeerProviderFactory和cacheManagerPeerListenerFactory修改为如下代码,其他不变

<cacheManagerPeerProviderFactory 
        class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
        properties="peerDiscovery=manual,rmiUrls=//127.0.0.1:40001/userCache|//127.0.0.1:40001/resourceCache">
    </cacheManagerPeerProviderFactory>

    <cacheManagerPeerListenerFactory 
        class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory"
        properties="hostName=127.0.0.1, port=40002, socketTimeoutMillis=2000">
    </cacheManagerPeerListenerFactory>

为了区别,将TestEhcache1的TesAction.doPost()方法修改为如下代码,只get的缓存不put缓存,用于观察TestEhcache1是否能同步TestEhcache的数据:

String res = "0;null";
        Element element = cmf.getElement("userCache", "map");
        if(element != null){
            Map<String, String> map = (Map<String, String>) element.getValue();
            res = map.get(request.getParameter("key"));
            if(res == null){
                res = "0;null";
            }
        }
        response.setContentType("text/html;charset=UTF-8");
        response.setCharacterEncoding("UTF-8");
        PrintWriter out = response.getWriter();
        out.write(res);
        out.close();

将TestEhcache部署到tomcat1, TestEhcache1部署到Tomcat2。依次执行如下代码:
localhost:8080/TestEhcache/test?key=125
localhost:8080/TestEhcache/test?key=125
localhost:8088/TestEhcache1/test?key=125
如果输出内容分别如下,说明集群ok, 两节点数据同步没问题, 否者请仔细检查配置文件和TestAction代码:
0;null
125
125

EhCache RMI 自动方式配置缓存
将ehcache.xml的cacheManagerPeerProviderFactory代码改为:

<cacheManagerPeerProviderFactory
 class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
    properties="peerDiscovery=automatic, multicastGroupAddress=192.168.0.1,
    multicastGroupPort=40004, timeToLive=32"
/>

EhCache Jgroups 方式配置缓存

在TestEhcache和TestEhcache1工程中添加ehcache-jgroupsreplication-1.7.jar和jgroups-3.6.9.Final.jar。 使用该方式比RMI方式配置简单。

TestEhcache 的 ehcache.xml

<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.sf.net/ehcache.xsd">

    <diskStore path="java.io.tmpdir" />

    <cacheManagerPeerProviderFactory 
        class="net.sf.ehcache.distribution.jgroups.JGroupsCacheManagerPeerProviderFactory"
        properties="connect=TCP(bind_port=4001):
            TCPPING(initial_hosts=192.168.8.150[4001],192.168.8.150[4002];port_range=10;timeout=3000;num_initial_members=3):
            MERGE2(min_interval=3000;max_interval=5000):  
            FD_ALL(interval=5000;timeout=20000):  
            FD(timeout=5000;max_tries=48;):  
            VERIFY_SUSPECT(timeout=1500):  
            pbcast.NAKACK(retransmit_timeout=100,200,300,600,1200,2400,4800;discard_delivered_msgs=true):  
            pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;max_bytes=0):  
            pbcast.GMS(print_local_addr=true;join_timeout=5000)"
        propertySeparator="::">
    </cacheManagerPeerProviderFactory>

    <defaultCache 
        maxElementsInMemory="10000" 
        eternal="true" 
        overflowToDisk="true" 
        timeToIdleSeconds="0" 
        timeToLiveSeconds="0"
        diskPersistent="false" 
        diskExpiryThreadIntervalSeconds="120">
        <cacheEventListenerFactory 
            class="net.sf.ehcache.distribution.jgroups.JGroupsCacheReplicatorFactory"
            properties="replicateAsynchronously=true,replicatePuts=true,replicateUpdates=true,replicateUpdatesViaCopy=false,replicateRemovals=true"/>
        <bootstrapCacheLoaderFactory
            class="net.sf.ehcache.distribution.jgroups.JGroupsBootstrapCacheLoaderFactory"
            properties="bootstrapAsynchronously=true"> 
        </bootstrapCacheLoaderFactory>
    </defaultCache>
</ehcache>

复制TestEhcache的ehcache.xml到TestEhcache1,并且修改下列代码:

connect=TCP(bind_port=4002)

推荐文章:http://raychase.iteye.com/blog/1545906
http://blog.csdn.net/tang06211015/article/details/52281551

以上是关于EhCache RMI 分布式缓存/缓存集群的主要内容,如果未能解决你的问题,请参考以下文章

Spring-ehcache RMI形式的分布式缓存配置

Ehcache学习ehcache缓存共享

深入探讨在集群环境中使用 EhCache 缓存系统

EhCache 分布式缓存/缓存集群

java ehcache

Ehcache基础知识学习