xmemcached整合
Posted clypm
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了xmemcached整合相关的知识,希望对你有一定的参考价值。
简介
Xmemcached是一个高性能的基于java nio的memcached客户端。在经过三个RC版本后,正式发布1.10-final版本。
xmemcached特性一览:
1、高性能
2、支持完整的memcached文本协议,二进制协议将在1.2版本实现。
3、支持JMX,可以通过MBean调整性能参数、动态添加/移除server、查看统计等。
4、支持客户端统计
5、支持memcached节点的动态增减。
6、支持memcached分布:余数分布和一致性哈希分布。
7、更多的性能调整选项。
Spring整合memcached配置
<!--xmemcached config -->
<bean name="xmemcachedClient"
class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean"
destroy-method="shutdown">
<!-- multi service node -->
<property name="servers" value="${memcached.servers}" />
<property name="weights">
<!-- memcached Aggregate -->
<list>
<value>1</value>
</list>
</property>
<property name="connectionPoolSize" value="${memcached.connectionPoolSize}" />
<property name="sessionLocator">
<bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator" />
</property>
<property name="transcoder">
<bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" />
</property>
<property name="bufferAllocator">
<bean class="net.rubyeye.xmemcached.buffer.SimpleBufferAllocator" />
</property>
<property name="failureMode" value="true" />
</bean>
<!--memcached client -->
<bean name="memcachedClient" class="com.cl.common.tool.MemCachedUtil">
<property name="memcachedClient" ref="xmemcachedClient" />
</bean>
其中各参数的意义:
参数 | 含义 |
servers | 服务器列表,格式:ip:port |
weights | 主机映射:host1对应1号、host2对应2号.. |
sessionLocator | Session 分配器,有自带的,影响分布式 |
transcoder | 通信编码方式 |
bufferAllocator | 缓冲区分配器 |
注:
默认标准Hash, hash(key) mod server_count (余数分布)
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
MemcachedClient mc = builder.build();
可以改为Consistent Hash(一致性哈希):
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
MemcachedClient mc = builder.build();
使用实例
memcachedClient工具类
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.exception.MemcachedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
/**
* Memcached客户端工具类
*
* @author cl
*/
public class MemCachedUtil {
private static final Logger LOG = LoggerFactory
.getLogger(MemCachedUtil.class);
private MemcachedClient memcachedClient;
private static Long DEFAULT_OP_TIMEOUT = 10000L;// 10min
public static int defaultExpireTime = 86400;// 默认缓存1天:24*60*60=86400
/**
* Memcached是否可用
*
* @return
* @author cl
*/
public boolean isAvaliable() {
if (memcachedClient.isShutdown()) {
LOG.error("memcached客户端已关闭");
return false;
}
Map<InetSocketAddress, String> map = null;
try {
map = memcachedClient.getVersions();
} catch (Exception e) {
LOG.error("获取memcached server version时异常", e);
}
if (map == null || map.size() == 0) {
LOG.error("当前没有可用的memcached server");
return false;
}
return true;
}
/**
* @param key
* @return
* @author cl
*/
public Object getValue(String key) {
Object value = null;
Assert.notNull(key);
try {
value = memcachedClient.get(key);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return value;
}
/**
* @param key
* @param t
* @return
* @author cl
*/
public <T> T getValue(String key, Class<T> t) {
T value = null;
Assert.notNull(key);
try {
value = this.memcachedClient.get(key);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return value;
}
/**
* 在cache中保存value
*
* @param key
* @param value
* @author cl
*/
public void setValue(String key, Object value) {
Assert.notNull(key, "cache key is null.");
try {
memcachedClient.set(key, defaultExpireTime, value);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
}
/**
* 在cache中保存value
*
* @param key
* @param value
* @param exp
* 表示被保存的时长,单位:秒
* @author cl
*/
public void setValue(String key, int exp, Object value) {
Assert.notNull(key, "cache key is null.");
Assert.notNull(exp > 0, "exp must greate than zero.");
try {
memcachedClient.set(key, exp, value);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
}
/**
* 删除cache保存的value
*
* @param key
* @return
* @author cl
*/
public Boolean remove(String key) {
try {
return memcachedClient.delete(key, DEFAULT_OP_TIMEOUT);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return Boolean.FALSE;
}
/**
* 删除cache保存的value,设置超时时间
*
* @param key
* @param opTimeout
* @return
* @author cl
*/
public Boolean remove(String key, Long opTimeout) {
try {
return memcachedClient.delete(key, opTimeout);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return Boolean.FALSE;
}
public MemcachedClient getMemcachedClient() {
return memcachedClient;
}
public void setMemcachedClient(MemcachedClient memcachedClient) {
this.memcachedClient = memcachedClient;
}
}
使用总结
为了将N个前端数据同步,通过Memcached完成数据打通,但带来了一些新问题:
- 使用iBatis整合了Memcached,iBatis针对每台server生成了唯一标识,导致同一份数据sql会产生不同的key,造成重复缓存。——通过重写iBatis部分原码,终止了唯一标识的生成,同一个SQL产生同一个Key,同时对生成key做hash,控制长度,使得数据统一在Memcached。
- 为了迎合iBatis的架构,通过CacheModel模式,对缓存数据分组管理。最初通过Map实现CacheModel,就是简单的Key对应最终的Object。为了后台操作数据时,前台能及时响应,以CacheModel为基准点。后台操作数据时,做Flush,清空对应的CacheModel,可以及时同步数据。但,由于前后台Domain对象可能不一致,调用CacheModel(Map)反序列化时,发生ClassNotFonudException(CNF)。——将CacheModel的Map实现改为Set,CacheModel仅存需要Flush掉的key,Object按原有方式缓存。
- 以前一直用EhCache,也很少会把List<List>这样的重量级对象放进缓存里。即便如此,只要EhCache没有抛异常,我们恐怕也无感知。这次改用Memcached,没有注意到缓存List过大,导致“Cannot cache data larger than 1MB memcached”,即缓存对象体积不能超过1MB——使用Memcached数据压缩,优化SQL,可以暂时维持。
- Memcached的Key,要杜绝使用空格,且长度控制在250个字符。
- Memcached的Value,要控制体积,必须小于1MB,必要时进行使用压缩。
- 失效时间,0为永久有效,最大值不得超过30天(2592000s),否则重新计算可能缓存只有1秒
- Memcached仅支持LRU算法,完全适用你的需要。
- 尽量不要将List这种重体积对象扔到Memcached中,传输、存储都会产生瓶颈。
- 使用一致性哈希算法实现,提高多个Memcacehd Server利用率。
关于使用XMemcached实现时,参考如下实现:
rivate MemcachedClientBuilder createMemcachedClientBuilder(
Properties properties) {
String addresses = properties.getProperty(ADDRESSES).trim();
if (logger.isInfoEnabled()) {
logger.info("Configure Properties:[addresses = " + addresses + "]");
}
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(addresses));
// 使用二进制文件
builder.setCommandFactory(new BinaryCommandFactory());
// 使用一致性哈希算法(Consistent Hash Strategy)
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
// 使用序列化传输编码
builder.setTranscoder(new SerializingTranscoder());
// 进行数据压缩,大于1KB时进行压缩
builder.getTranscoder().setCompressionThreshold(1024);
return builder;
}
主要有以下几点参考:
- 使用二进制文件模式
- 使用一致性哈希算法
- 使用序列化编码
- 对数据进行压缩
关于SET&ADD
- SET&ADD都属于更新操作,都要先申请内存
- SET,会擦除这个键所对应的内存,不管原先是否有内容
- ADD,会先查看这个键对应的内存是否有内容,如果有,则等待;若没有,则获取锁,并更新内存。
缓存命中率,通常认为:缓存命中率低于95%的设计都是不合理的,存在设计缺陷的。
就对高并发的攻击
为了应对上述情况,做了如下调整:
- 更新数据时,先写Cache,然后写Database(双写),如果可以,写操作交给队列后续完成。
- 限制统一帐号,同一动作,同一秒钟并发次数,超过1次不做做动作,返回操作失败。
- 限制统一用户,每日动作次数,超限返回操作失败。
用Memcached的add方法,就可以很快速的解决问题。不需要很繁琐的开发,也不需要依赖数据库记录,完全内存操作。
以下实现一个判定冲突的方法:
**
* 冲突延时 1秒
*/
public static final int MUTEX_EXP = 1;
/**
* 冲突键
*/
public static final String MUTEX_KEY_PREFIX = "MUTEX_";
/**
* 冲突判定
*
* @param key
*/
public boolean isMutex(String key) {
return isMutex(key, MUTEX_EXP);
}
/**
* 冲突判定
*
* @param key
* @param exp
* @return true 冲突
*/
public boolean isMutex(String key, int exp) {
boolean status = true;
try {
if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) {
status = false;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return status;
}
做个说明:
选项 | 说明 |
add | 仅当存储空间中不存在键相同的数据时才保存 |
replace | 仅当存储空间中存在键相同的数据时才保存 |
set | 与add和replace不同,无论何时都保存 |
也就是说,如果add操作返回为true,则认为当前不冲突!
回归场景,恶意用户1秒钟操作6次,遇到上述这个方法,只有乖乖地1秒后再来。别小看这1秒钟,一个数据库操作不过几毫秒。1秒延迟,足以降低系统负载,增加恶意用户成本。
附我用到的基于XMemcached实现:
import net.rubyeye.xmemcached.MemcachedClient;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* @author Snowolf
* @version 1.0
* @since 1.0
*/
@Component
public class MemcachedManager {
/**
* 缓存时效 1天
*/
public static final int CACHE_EXP_DAY = 3600 * 24;
/**
* 缓存时效 1周
*/
public static final int CACHE_EXP_WEEK = 3600 * 24 * 7;
/**
* 缓存时效 1月
*/
public static final int CACHE_EXP_MONTH = 3600 * 24 * 30 * 7;
/**
* 缓存时效 永久
*/
public static final int CACHE_EXP_FOREVER = 0;
/**
* 冲突延时 1秒
*/
public static final int MUTEX_EXP = 1;
/**
* 冲突键
*/
public static final String MUTEX_KEY_PREFIX = "MUTEX_";
/**
* Logger for this class
*/
private static final Logger logger = Logger
.getLogger(MemcachedManager.class);
/**
* Memcached Client
*/
@Autowired
private MemcachedClient memcachedClient;
/**
* 缓存
*
* @param key
* @param value
* @param exp
* 失效时间
*/
public void cacheObject(String key, Object value, int exp) {
try {
memcachedClient.set(key, exp, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("Cache Object: [" + key + "]");
}
/**
* Shut down the Memcached Cilent.
*/
public void finalize() {
if (memcachedClient != null) {
try {
if (!memcachedClient.isShutdown()) {
memcachedClient.shutdown();
logger.debug("Shutdown MemcachedManager...");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* 清理对象
*
* @param key
*/
public void flushObject(String key) {
try {
memcachedClient.deleteWithNoReply(key);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("Flush Object: [" + key + "]");
}
/**
* 冲突判定
*
* @param key
*/
public boolean isMutex(String key) {
return isMutex(key, MUTEX_EXP);
}
/**
* 冲突判定
*
* @param key
* @param exp
* @return true 冲突
*/
public boolean isMutex(String key, int exp) {
boolean status = true;
try {
if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) {
status = false;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return status;
}
/**
* 加载缓存对象
*
* @param key
* @return
*/
public <T> T loadObject(String key) {
T object = null;
try {
object = memcachedClient.<T> get(key);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("Load Object: [" + key + "]");
return object;
}
}
PS:Redis的SETNX(即SET if Not eXists,类似于memcache的add)
以上是关于xmemcached整合的主要内容,如果未能解决你的问题,请参考以下文章