xmemcached整合

Posted clypm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了xmemcached整合相关的知识,希望对你有一定的参考价值。

简介

Xmemcached是一个高性能的基于java nio的memcached客户端。在经过三个RC版本后,正式发布1.10-final版本。
xmemcached特性一览:
1、高性能
2、支持完整的memcached文本协议,二进制协议将在1.2版本实现。
3、支持JMX,可以通过MBean调整性能参数、动态添加/移除server、查看统计等。
4、支持客户端统计
5、支持memcached节点的动态增减。
6、支持memcached分布:余数分布和一致性哈希分布。
7、更多的性能调整选项。


Spring整合memcached配置

<!--xmemcached config -->
	<bean name="xmemcachedClient"
		class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean"
		destroy-method="shutdown">
		<!-- multi service node -->
		<property name="servers" value="${memcached.servers}" />
		<property name="weights">
			<!-- memcached Aggregate -->
			<list>
				<value>1</value>
			</list>
		</property>
		<property name="connectionPoolSize" value="${memcached.connectionPoolSize}" />
		<property name="sessionLocator">
			<bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator" />
		</property>
		<property name="transcoder">
			<bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" />
		</property>
		<property name="bufferAllocator">
			<bean class="net.rubyeye.xmemcached.buffer.SimpleBufferAllocator" />
		</property>
		<property name="failureMode" value="true" />
	</bean>

	<!--memcached client -->
	<bean name="memcachedClient" class="com.cl.common.tool.MemCachedUtil">
		<property name="memcachedClient" ref="xmemcachedClient" />
	</bean>

其中各参数的意义:

参数

含义

servers

服务器列表,格式:ip:port

weights

主机映射:host1对应1号、host2对应2号..

sessionLocator

Session 分配器,有自带的,影响分布式

transcoder

通信编码方式

bufferAllocator

缓冲区分配器

       注:

 默认标准Hash, hash(key) mod server_count (余数分布)

MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
MemcachedClient mc = builder.build();

可以改为Consistent Hash(一致性哈希):

MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
MemcachedClient mc = builder.build();


使用实例


memcachedClient工具类

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.exception.MemcachedException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/**
 * Memcached客户端工具类
 * 
 * @author cl
 */
public class MemCachedUtil {
	
	private static final Logger LOG = LoggerFactory
			.getLogger(MemCachedUtil.class);

	private MemcachedClient memcachedClient;

	private static Long DEFAULT_OP_TIMEOUT = 10000L;// 10min

	public static int defaultExpireTime = 86400;// 默认缓存1天:24*60*60=86400

	/**
	 * Memcached是否可用
	 * 
	 * @return
	 * @author cl
	 */
	public boolean isAvaliable() {
		if (memcachedClient.isShutdown()) {
			LOG.error("memcached客户端已关闭");
			return false;
		}
		Map<InetSocketAddress, String> map = null;
		try {
			map = memcachedClient.getVersions();
		} catch (Exception e) {
			LOG.error("获取memcached server version时异常", e);
		}
		if (map == null || map.size() == 0) {
			LOG.error("当前没有可用的memcached server");
			return false;
		}
		return true;
	}

	/**
	 * @param key
	 * @return
	 * @author cl
	 */
	public Object getValue(String key) {
		Object value = null;
		Assert.notNull(key);
		try {
			value = memcachedClient.get(key);
		} catch (TimeoutException e) {
			LOG.error("Cache TimeoutException", e);
		} catch (InterruptedException e) {
			LOG.error("Cache InterruptedException", e);
		} catch (MemcachedException e) {
			LOG.error("Cache MemcachedException", e);
		}
		return value;
	}

	/**
	 * @param key
	 * @param t
	 * @return
	 * @author cl
	 */
	public <T> T getValue(String key, Class<T> t) {
		T value = null;
		Assert.notNull(key);
		try {
			value = this.memcachedClient.get(key);
		} catch (TimeoutException e) {
			LOG.error("Cache TimeoutException", e);
		} catch (InterruptedException e) {
			LOG.error("Cache InterruptedException", e);
		} catch (MemcachedException e) {
			LOG.error("Cache MemcachedException", e);
		}
		return value;
	}

	/**
	 * 在cache中保存value
	 * 
	 * @param key
	 * @param value
	 * @author cl
	 */
	public void setValue(String key, Object value) {
		Assert.notNull(key, "cache key is null.");
		try {
			memcachedClient.set(key, defaultExpireTime, value);
		} catch (TimeoutException e) {
			LOG.error("Cache TimeoutException", e);
		} catch (InterruptedException e) {
			LOG.error("Cache InterruptedException", e);
		} catch (MemcachedException e) {
			LOG.error("Cache MemcachedException", e);
		}
	}

	/**
	 * 在cache中保存value
	 * 
	 * @param key
	 * @param value
	 * @param exp
	 *            表示被保存的时长,单位:秒
	 * @author cl
	 */
	public void setValue(String key, int exp, Object value) {
		Assert.notNull(key, "cache key is null.");
		Assert.notNull(exp > 0, "exp must greate than zero.");
		try {
			memcachedClient.set(key, exp, value);
		} catch (TimeoutException e) {
			LOG.error("Cache TimeoutException", e);
		} catch (InterruptedException e) {
			LOG.error("Cache InterruptedException", e);
		} catch (MemcachedException e) {
			LOG.error("Cache MemcachedException", e);
		}
	}

	/**
	 * 删除cache保存的value
	 * 
	 * @param key
	 * @return
	 * @author cl
	 */
	public Boolean remove(String key) {
		try {
			return memcachedClient.delete(key, DEFAULT_OP_TIMEOUT);
		} catch (TimeoutException e) {
			LOG.error("Cache TimeoutException", e);
		} catch (InterruptedException e) {
			LOG.error("Cache InterruptedException", e);
		} catch (MemcachedException e) {
			LOG.error("Cache MemcachedException", e);
		}
		return Boolean.FALSE;
	}

	/**
	 * 删除cache保存的value,设置超时时间
	 * 
	 * @param key
	 * @param opTimeout
	 * @return
	 * @author cl
	 */
	public Boolean remove(String key, Long opTimeout) {
		try {
			return memcachedClient.delete(key, opTimeout);
		} catch (TimeoutException e) {
			LOG.error("Cache TimeoutException", e);
		} catch (InterruptedException e) {
			LOG.error("Cache InterruptedException", e);
		} catch (MemcachedException e) {
			LOG.error("Cache MemcachedException", e);
		}
		return Boolean.FALSE;
	}

	public MemcachedClient getMemcachedClient() {
		return memcachedClient;
	}

	public void setMemcachedClient(MemcachedClient memcachedClient) {
		this.memcachedClient = memcachedClient;
	}

}

使用总结

为了将N个前端数据同步,通过Memcached完成数据打通,但带来了一些新问题: 

  • 使用iBatis整合了Memcached,iBatis针对每台server生成了唯一标识,导致同一份数据sql会产生不同的key,造成重复缓存。——通过重写iBatis部分原码,终止了唯一标识的生成,同一个SQL产生同一个Key,同时对生成key做hash,控制长度,使得数据统一在Memcached。
  • 为了迎合iBatis的架构,通过CacheModel模式,对缓存数据分组管理。最初通过Map实现CacheModel,就是简单的Key对应最终的Object。为了后台操作数据时,前台能及时响应,以CacheModel为基准点。后台操作数据时,做Flush,清空对应的CacheModel,可以及时同步数据。但,由于前后台Domain对象可能不一致,调用CacheModel(Map)反序列化时,发生ClassNotFonudException(CNF)。——将CacheModel的Map实现改为Set,CacheModel仅存需要Flush掉的key,Object按原有方式缓存。
  • 以前一直用EhCache,也很少会把List<List>这样的重量级对象放进缓存里。即便如此,只要EhCache没有抛异常,我们恐怕也无感知。这次改用Memcached,没有注意到缓存List过大,导致“Cannot cache data larger than 1MB memcached”,即缓存对象体积不能超过1MB——使用Memcached数据压缩,优化SQL,可以暂时维持。
说了这么多,简要总结如下: 
  • Memcached的Key,要杜绝使用空格,且长度控制在250个字符。
  • Memcached的Value,要控制体积,必须小于1MB,必要时进行使用压缩。
  • 失效时间,0为永久有效,最大值不得超过30天(2592000s),否则重新计算可能缓存只有1秒
  • Memcached仅支持LRU算法,完全适用你的需要。
  • 尽量不要将List这种重体积对象扔到Memcached中,传输、存储都会产生瓶颈。
  • 使用一致性哈希算法实现,提高多个Memcacehd Server利用率。




关于使用XMemcached实现时,参考如下实现: 


rivate MemcachedClientBuilder createMemcachedClientBuilder(  
        Properties properties) {  
    String addresses = properties.getProperty(ADDRESSES).trim();  
  
    if (logger.isInfoEnabled()) {  
        logger.info("Configure Properties:[addresses = " + addresses + "]");  
    }  
    MemcachedClientBuilder builder = new XMemcachedClientBuilder(  
            AddrUtil.getAddresses(addresses));  
  
    // 使用二进制文件  
    builder.setCommandFactory(new BinaryCommandFactory());  
    // 使用一致性哈希算法(Consistent Hash Strategy)  
    builder.setSessionLocator(new KetamaMemcachedSessionLocator());  
    // 使用序列化传输编码  
    builder.setTranscoder(new SerializingTranscoder());  
    // 进行数据压缩,大于1KB时进行压缩  
    builder.getTranscoder().setCompressionThreshold(1024);  
  
    return builder;  
}  

主要有以下几点参考: 
  • 使用二进制文件模式
  • 使用一致性哈希算法
  • 使用序列化编码
  • 对数据进行压缩


关于SET&ADD 
  • SET&ADD都属于更新操作,都要先申请内存
  • SET,会擦除这个键所对应的内存,不管原先是否有内容
  • ADD,会先查看这个键对应的内存是否有内容,如果有,则等待;若没有,则获取锁,并更新内存。



缓存命中率,通常认为:缓存命中率低于95%的设计都是不合理的,存在设计缺陷的。 

 

就对高并发的攻击

为了应对上述情况,做了如下调整:

 

  1. 更新数据时,先写Cache,然后写Database(双写),如果可以,写操作交给队列后续完成。
  2. 限制统一帐号,同一动作,同一秒钟并发次数,超过1次不做做动作,返回操作失败。
  3. 限制统一用户,每日动作次数,超限返回操作失败。

用Memcached的add方法,就可以很快速的解决问题。不需要很繁琐的开发,也不需要依赖数据库记录,完全内存操作。

以下实现一个判定冲突的方法:

** 
 * 冲突延时 1秒 
 */  
public static final int MUTEX_EXP = 1;  
/** 
 * 冲突键 
 */  
public static final String MUTEX_KEY_PREFIX = "MUTEX_";  
  
/** 
 * 冲突判定 
 *  
 * @param key 
 */  
public boolean isMutex(String key) {  
    return isMutex(key, MUTEX_EXP);  
}  
  
/** 
 * 冲突判定 
 *  
 * @param key 
 * @param exp 
 * @return true 冲突 
 */  
public boolean isMutex(String key, int exp) {  
    boolean status = true;  
    try {  
        if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) {  
            status = false;  
        }  
    } catch (Exception e) {  
        logger.error(e.getMessage(), e);  
    }  
    return status;  
}  

做个说明:

 

选项 说明
add 仅当存储空间中不存在键相同的数据时才保存
replace 仅当存储空间中存在键相同的数据时才保存
set 与add和replace不同,无论何时都保存

也就是说,如果add操作返回为true,则认为当前不冲突!

 

回归场景,恶意用户1秒钟操作6次,遇到上述这个方法,只有乖乖地1秒后再来。别小看这1秒钟,一个数据库操作不过几毫秒。1秒延迟,足以降低系统负载,增加恶意用户成本。

 

附我用到的基于XMemcached实现:

import net.rubyeye.xmemcached.MemcachedClient;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 
 * @author Snowolf
 * @version 1.0
 * @since 1.0
 */
@Component
public class MemcachedManager {

	/**
	 * 缓存时效 1天
	 */
	public static final int CACHE_EXP_DAY = 3600 * 24;

	/**
	 * 缓存时效 1周
	 */
	public static final int CACHE_EXP_WEEK = 3600 * 24 * 7;

	/**
	 * 缓存时效 1月
	 */
	public static final int CACHE_EXP_MONTH = 3600 * 24 * 30 * 7;

	/**
	 * 缓存时效 永久
	 */
	public static final int CACHE_EXP_FOREVER = 0;

	/**
	 * 冲突延时 1秒
	 */
	public static final int MUTEX_EXP = 1;
	/**
	 * 冲突键
	 */
	public static final String MUTEX_KEY_PREFIX = "MUTEX_";
	/**
	 * Logger for this class
	 */
	private static final Logger logger = Logger
			.getLogger(MemcachedManager.class);

	/**
	 * Memcached Client
	 */
	@Autowired
	private MemcachedClient memcachedClient;

	/**
	 * 缓存
	 * 
	 * @param key
	 * @param value
	 * @param exp
	 *            失效时间
	 */
	public void cacheObject(String key, Object value, int exp) {
		try {
			memcachedClient.set(key, exp, value);
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		}
		logger.info("Cache Object: [" + key + "]");
	}

	/**
	 * Shut down the Memcached Cilent.
	 */
	public void finalize() {
		if (memcachedClient != null) {
			try {
				if (!memcachedClient.isShutdown()) {
					memcachedClient.shutdown();
					logger.debug("Shutdown MemcachedManager...");
				}
			} catch (Exception e) {
				logger.error(e.getMessage(), e);
			}
		}
	}

	/**
	 * 清理对象
	 * 
	 * @param key
	 */
	public void flushObject(String key) {
		try {
			memcachedClient.deleteWithNoReply(key);
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		}
		logger.info("Flush Object: [" + key + "]");
	}

	/**
	 * 冲突判定
	 * 
	 * @param key
	 */
	public boolean isMutex(String key) {
		return isMutex(key, MUTEX_EXP);
	}

	/**
	 * 冲突判定
	 * 
	 * @param key
	 * @param exp
	 * @return true 冲突
	 */
	public boolean isMutex(String key, int exp) {
		boolean status = true;
		try {
			if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) {
				status = false;
			}
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		}
		return status;
	}

	/**
	 * 加载缓存对象
	 * 
	 * @param key
	 * @return
	 */
	public <T> T loadObject(String key) {
		T object = null;
		try {
			object = memcachedClient.<T> get(key);
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		}
		logger.info("Load Object: [" + key + "]");
		return object;
	}

}

PS:Redis的SETNX(即SET if Not eXists,类似于memcache的add)


以上是关于xmemcached整合的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolExecutor各参数之意义

spring aop + xmemcached 配置service层缓存策略

SpringBoot 整合线程池及各参数详解

XMemcached的基本使用

TensorFlow中与卷积核有关的各参数的意义

Xmemcached学习笔记二(简单使用)