使用 Reactive Lettuce 流水线 Redis 命令
Posted
技术标签:
【中文标题】使用 Reactive Lettuce 流水线 Redis 命令【英文标题】:Pipeline Redis commands with Reactive Lettuce 【发布时间】:2019-01-22 18:52:20 【问题描述】:我正在使用spring boot webflux
+ project reactor
+ lettuce
以非阻塞方式连接和查询 Redis。
我已经用LettuceConnectionFactory
配置了ReactiveRedisTemplate
。 spring 文档指出,使用ReactiveRedisTemplate
的管道的唯一方法是使用execute(<RedisCallback>)
方法。在非反应式RedisTemplate
中,我看到有一个executePipelined(<RedisCallback>)
方法可以在执行回调之前打开/关闭管道。但是对于ReactiveRedisTemplate.execute
方法,它使用LettuceReactiveRedisConnection
并且Spring ReactiveRedisConnection
和Lettuce
都没有引用管道。
所以我的问题是,是否可以在使用 Spring ReactiveRedisTemplate
+ ReactiveLettuceConnection
时对您的命令进行管道化?
我还注意到,将 ReactiveRedisTemplate.execute
与具有多个 Redis 命令的 RedisCallback
一起使用比单独调用命令要慢。
带有 ReactiveRedisTemplate 的管道示例代码:
reactiveRedisTemplate.execute(connection -> keys.flatMap(key ->
connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes()))))
.map(Map.Entry::getValue)
.map(ByteUtils::getBytes)
.map(b ->
try
return mapper.readValue(b, Value.class);
catch (IOException e1)
return null;
)
.collectList();
没有管道的代码:
keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
.map(Map.Entry::getValue)
.cast(Value.class)
.collectList();
谢谢!
【问题讨论】:
我遇到了同样的问题,已经谷歌搜索了 1 周没有结果,您有什么解决方案吗? @kriver。您找到问题的答案了吗?请更新答案 【参考方案1】:我认为不可能使用 RedisReactiveTemplate 或生菜的反应式 API。事实上,当你构建你的反应链时,它的某些部分会被延迟评估。
getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()
例如,在此示例中,只有当 getAsyncValue 返回值时才会触发 doSomething。
现在,如果我们获取您的 RedisCallback 样本并假设我们在连接对象中有一个 flushAll 方法。你在哪里/什么时候叫它?
tpl.execute(connection ->
Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes())));
connection.fluxAll();
return results;
)
这样不会将命令刷新到服务器,因为不会触发任何 hashCommands。
现在让我们看看我们拥有的所有信号回调:
doOnNext doOnError doOnCancel 先做 doOnSubscribe doOnRequest doOnTerminate doAfterTerminate 最后做 doOnCompletedoOnError 或 doOnCancel 不会帮助我们。但我们可以考虑使用 doFinally、doOnTerminate、doAfterTerminate:
tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
.hGetAll(ByteBuffer.wrap(key.getBytes())))
.doFinally(s -> connection.flushAll()))
但是 htGetAll 在命令刷新到服务器之前不会完成,因此不会调用 doFinnaly,因此我们将无法刷新....
我能想到的唯一解决方法是直接使用生菜的异步 api。有一个sample in the documentation 告诉你怎么做。
您的代码可能看起来像(未经测试):
// client is a RedisClient from lettuce
StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
.collectList()
.doOnNext(f -> command.flushCommands())
.flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))
【讨论】:
以上是关于使用 Reactive Lettuce 流水线 Redis 命令的主要内容,如果未能解决你的问题,请参考以下文章
R Shiny Reactive 值,dplyr 过滤器错误?