Redis之Pipeline使用注意事项

Posted 小徐xfg

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis之Pipeline使用注意事项相关的知识,希望对你有一定的参考价值。

参考内容:http://www.redis.cn/topics/pipelining.html


重要说明: 使用管道发送命令时,服务器将被迫回复一个队列答复,占用很多内存。所以,如果你需要发送大量的命令,最好是把他们按照合理数量分批次的处理,例如10K的命令,读回复,然后再发送另一个10k的命令,等等。这样速度几乎是相同的,但是在回复这10k命令队列需要非常大量的内存用来组织返回数据内容。


Jedis jedis = poolFactory.getjedisResourcePool().getResource();
Pipeline pl = jedis.pipelined();
Pipeline 的特点:
1、Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性
2、jedis.pipelined()方法会先创建一个pipeline的链接对象,详细的步骤如下:
A)、创建一个新的Pipeline对象
public Pipeline pipelined()
pipeline = new Pipeline(); pipeline.setClient(client); return pipeline;
B)、获取一个连接并hset操作
public Response<Long> hset(String key, String field, String value)
getClient(key).hset(key, field, value); return getResponse(BuilderFactory.LONG);
C)、把数据作为安全数据,进行操作
public void hset(final String key, final String field, final String value)
hset(SafeEncoder.encode(key), SafeEncoder.encode(field), SafeEncoder.encode(value));
D)、把指令与数据一块
public void hset(final byte[] key, final byte[] field, final byte[] value)
sendCommand(HSET, key, field, value);
E)、发送流
Protocol.sendCommand(outputStream, cmd, args);
F)、做具体的数据操作
public static void sendCommand(final RedisOutputStream os, final Command command,
final byte[]... args)
sendCommand(os, command.raw, args);




Pipeline 使用以及地从实现原理


1、Pipeline 是以流的形式进行储存数据,Connection类中有加载拼接的命令,详细如下:
protected Connection sendCommand(final Command cmd, final byte[]... args)
try

connect(); Protocol.sendCommand(outputStream, cmd, args); pipelinedCommands++; return this;

catch (JedisConnectionException ex)
/*

  • When client send request which formed by invalid protocol, Redis send back error message
  • before close connection. We try to read it to provide reason of failure.
    */
    try Unknown macro: String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null && errorMessage.length() > 0) ex = new JedisConnectionException(errorMessage, ex.getCause());

    catch (Exception e)

    /* * Catch any IOException or JedisConnectionException occurred from InputStream#read and just * ignore. This approach is safe because reading error message is optional and connection * will eventually be closed. */

    // Any other exceptions related to connection?
    broken = true;
    throw ex;

在Protocol类中有把数据拼接成数据,其中args是拼接数据的参数
private static void sendCommand(final RedisOutputStream os, final byte[] command,
final byte[]... args)
try
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (final byte[] arg : args)

os.write(DOLLAR_BYTE); os.writeIntCrLf(arg.length); os.write(arg); os.writeCrLf();

catch (IOException e)

throw new JedisConnectionException(e);

2、关于Pipeline 同步数据的问题
A)、Pipeline 有与redis形同的操作,但是在数据落盘的时候需要在执行的方法后添加sync()方法,如果insert时有多条数据,在数据拼接完之后,在执行sync()方法,这样可以提高效率。
B)、如果在hget()时没有sync()时会报,没有在hget()同步数据
C)、如果在hset(),hdel(),hget()获取数据时都没有执行sync()方法,但是在最后执行了pl.close()方法,Pipeline 同样会执行sync()方法,详细的代码如下:
Pipeline类下的close()方法中的clear(),有sync()方法,
@Override
public void close() throws IOException

clear();

public void clear()
if (isInMulti())

discard();

sync();

可以看出client.getAll()获取到了所有的数据,去执行sync()方法
public void sync()
if (getPipelinedResponseLength() > 0)
List<Object> unformatted = client.getAll();
for (Object o : unformatted)

generateResponse(o);


3、Pipeline 的默认的同步的个数为53个,也就是说arges中累加到53条数据时会把数据提交

以上是关于Redis之Pipeline使用注意事项的主要内容,如果未能解决你的问题,请参考以下文章

redis之管道——pipeline

redis管道命令pipeline的使用

redis性能提升之pipeline

redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

使用pipeline管道执行redis命令

使用redis pipeline打包执行多条任务