key/value存储系统-MemcachedRedisTair
Posted 07H_JH
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了key/value存储系统-MemcachedRedisTair相关的知识,希望对你有一定的参考价值。
每个产品的可配置参数繁多,涉及缓存策略、分布算法、序列化方式、数据压缩技术、通信方式、并发、超时等诸多方面因素,都会对测试结果产生影响,单纯的性能对比存在非常多的局限性和不合理性,所以不能作为任何评估依据,仅供参考。
1、尽管 Memcached 和 Redis 都标识为Distribute,但从Server端本身而言它们并不提供分布式的解决方案,需要Client端实现一定的分布算法将数据存储到各个节点,从而实现分布式存储,两者都提供了Replication功能(Master-Slave)保障可靠性。
2、Tair 则本身包含 Config Server 和 Data Server 采用一致性哈希算法分布数据存储,由ConfigSever来管理所有数据节点,理论上服务器端节点的维护对前端应用不会产生任何影响,同时数据能按指定复制到不同的DataServer保障可靠性,从Cluster角度来看属于一个整体Solution
环境
Memcached | Memcached 1.4.21 | Xmemcached 2.0.0 |
Redis | Redis 2.8.19 | Jedis 2.8.5 |
Tair | Tair 2.3 | Tair Client 2.3.1 |
测试
从数据库读取一组数据缓存(SET)到每个缓存服务器,其中对于每个Server的写入数据是完全一致的,不设置过期时间,进行如下测试。
1)单线程进行1次写入
2)单线程进行500次写入
3)单线程进行2000次写入
4)并行500个线程,每个线程进行1次写入
5)并行500个线程,每个线程进行5次写入
6)并行2000个线程,每个线程进行1次写入
2、分别从每个缓存服务器读取(GET)数据,其中对于每个Server的读取数据大小是完全一致的,进行如下测试。
1)单线程进行1次读取
2)单线程进行500次读取
3)单线程进行2000次读取
4)并行500个线程,每个线程进行1次读取
5)并行500个线程,每个线程进行5次读取
6)并行2000个线程,每个线程进行1次读取
四、单线程测试
1、缓存Model对象(OrderInfo)的定义参照tbOrder表(包括单据号、制单日期、商品、数量等字段)
2、单线程的读写操作对于代码的要求相对较低,不需要考虑Pool,主要代码如下:
1)Memcached单线程读写,使用二进制方式序列化,不启用压缩。
1 public static void putItems2Memcache(List<OrderInfo> orders) throws Exception { 2 MemcachedClient memcachedClient = null; 3 try { 4 MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("10.129.221.70:12000")); 5 builder.setCommandFactory(new BinaryCommandFactory()); 6 memcachedClient = builder.build(); 7 8 for (OrderInfo order : orders) { 9 boolean isSuccess = memcachedClient.set("order_" + order.BillNumber, 0, order); 10 if (!isSuccess) { 11 System.out.println("put: order_" + order.BillNumber + " " + isSuccess); 12 } 13 } 14 } catch (Exception ex) { 15 ex.printStackTrace(); 16 } finally { 17 memcachedClient.shutdown(); 18 } 19 } 20 21 public static void getItemsFromMemcache(List<String> billNumbers) throws Exception { 22 MemcachedClient memcachedClient = null; 23 try { 24 MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("10.129.221.70:12000")); 25 builder.setCommandFactory(new BinaryCommandFactory()); 26 memcachedClient = builder.build(); 27 28 for (String billnumber : billNumbers) { 29 OrderInfo result = memcachedClient.get(billnumber); 30 31 if (result == null) { 32 System.out.println(" get failed : " + billnumber + " not exist "); 33 } 34 } 35 } catch (Exception ex) { 36 ex.printStackTrace(); 37 } finally { 38 memcachedClient.shutdown(); 39 } 40 }View Code
2)Redis单线程读写,由于Jedis Client 不支持对象的序列化,需要自行实现对象序列化(本文使用二进制方式)。
1 public static void putItems2Redis(List<OrderInfo> orders) { 2 Jedis jedis = new Jedis("10.129.221.70", 6379); 3 4 try { 5 jedis.connect(); 6 7 for (OrderInfo order : orders) { 8 String StatusCode = jedis.set(("order_" + order.BillNumber).getBytes(), SerializeUtil.serialize(order)); 9 if (!StatusCode.equals("OK")) { 10 System.out.println("put: order_" + order.BillNumber + " " + StatusCode); 11 } 12 } 13 } catch (Exception ex) { 14 ex.printStackTrace(); 15 } finally { 16 jedis.close(); 17 } 18 } 19 20 public static void getItemsFromRedis(List<String> billNumbers) { 21 Jedis jedis = new Jedis("10.129.221.70", 6379); 22 23 try { 24 jedis.connect(); 25 26 for (String billnumber : billNumbers) { 27 byte[] result = jedis.get(billnumber.getBytes()); 28 if (result.length > 0) { 29 OrderInfo order = (OrderInfo) SerializeUtil.unserialize(result); 30 if (order == null) { 31 System.out.println(" unserialize failed : " + billnumber); 32 } 33 } else { 34 System.out.println(" get failed : " + billnumber + " not exist "); 35 } 36 } 37 } catch (Exception ex) { 38 ex.printStackTrace(); 39 } finally { 40 jedis.close(); 41 } 42 }View Code
序列化代码
1 package common; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.ObjectInputStream; 6 import java.io.ObjectOutputStream; 7 8 public class SerializeUtil { 9 10 /** 11 * 序列化 12 * @param object 13 * @return 14 */ 15 public static byte[] serialize(Object object) { 16 ObjectOutputStream oos = null; 17 ByteArrayOutputStream baos = null; 18 19 try { 20 baos = new ByteArrayOutputStream(); 21 oos = new ObjectOutputStream(baos); 22 oos.writeObject(object); 23 byte[] bytes = baos.toByteArray(); 24 return bytes; 25 } catch (Exception e) { 26 e.printStackTrace(); 27 } 28 return null; 29 } 30 31 /** 32 * 反序列化 33 * @param bytes 34 * @return 35 */ 36 public static Object unserialize(byte[] bytes) { 37 ByteArrayInputStream bais = null; 38 try { 39 bais = new ByteArrayInputStream(bytes); 40 ObjectInputStream ois = new ObjectInputStream(bais); 41 return ois.readObject(); 42 } catch (Exception e) { 43 e.printStackTrace(); 44 } 45 46 return null; 47 } 48 }View Code
3)Tair单线程读写,使用Java序列化,默认压缩阀值为8192字节,但本文测试的每个写入项都不会超过这个阀值,所以不受影响。
1 public static void putItems2Tair(List<OrderInfo> orders) { 2 try { 3 List<String> confServers = new ArrayList<String>(); 4 confServers.add("10.129.221.70:5198"); 5 //confServers.add("10.129.221.70:5200"); 6 7 DefaultTairManager tairManager = new DefaultTairManager(); 8 tairManager.setConfigServerList(confServers); 9 tairManager.setGroupName("group_1"); 10 tairManager.init(); 11 12 for (OrderInfo order : orders) { 13 ResultCode result = tairManager.put(0, "order_" + order.BillNumber, order); 14 if (!result.isSuccess()) { 15 System.out.println("put: order_" + order.BillNumber + " " + result.isSuccess() + " code:" + result.getCode()); 16 } 17 } 18 } catch (Exception ex) { 19 ex.printStackTrace(); 20 } 21 } 22 23 public static void getItemsFromTair(List<String> billNumbers) { 24 try { 25 List<String> confServers = new ArrayList<String>(); 26 confServers.add("10.129.221.70:5198"); 27 //confServers.add("10.129.221.70:5200"); 28 29 DefaultTairManager tairManager = new DefaultTairManager(); 30 tairManager.setConfigServerList(confServers); 31 tairManager.setGroupName("group_1"); 32 tairManager.init(); 33 34 for (String billnumber : billNumbers) { 35 Result<DataEntry> result = tairManager.get(0, billnumber); 36 if (result.isSuccess()) { 37 DataEntry entry = result.getValue(); 38 if (entry == null) { 39 System.out.println(" get failed : " + billnumber + " not exist "); 40 } 41 } else { 42 System.out.println(result.getRc().getMessage()); 43 } 44 } 45 } catch (Exception ex) { 46 ex.printStackTrace(); 47 } 48 }
3、测试结果,每项重复测试取平均值
五、多线程测试
1、除了多线程相关代码外的公共代码和单线程基本一致,多线程测试主要增加了Client部分代码对ConnectionPool、TimeOut相关设置,池策略、大小都会对性能产生很大影响,为了达到更高的性能,不同的使用场景下都需要有科学合理的测算。
2、主要测试代码
1)每个读写测试线程任务完成后统一调用公共Callback,在每批测试任务完成后记录消耗时间
1 package common; 2 3 public class ThreadCallback {
以上是关于key/value存储系统-MemcachedRedisTair的主要内容,如果未能解决你的问题,请参考以下文章
Apache Samza流处理框架介绍——kafka+LevelDB的Key/Value数据库来存储历史消息+?