使用 Reactive Lettuce 流水线 Redis 命令

Posted

技术标签:

【中文标题】使用 Reactive Lettuce 流水线 Redis 命令【英文标题】:Pipeline Redis commands with Reactive Lettuce 【发布时间】:2019-01-22 18:52:20 【问题描述】:

我正在使用spring boot webflux + project reactor + lettuce 以非阻塞方式连接和查询 Redis。 我已经用LettuceConnectionFactory 配置了ReactiveRedisTemplate。 spring 文档指出,使用ReactiveRedisTemplate 的管道的唯一方法是使用execute(<RedisCallback>) 方法。在非反应式RedisTemplate 中,我看到有一个executePipelined(<RedisCallback>) 方法可以在执行回调之前打开/关闭管道。但是对于ReactiveRedisTemplate.execute 方法,它使用LettuceReactiveRedisConnection 并且Spring ReactiveRedisConnectionLettuce 都没有引用管道。

所以我的问题是,是否可以在使用 Spring ReactiveRedisTemplate + ReactiveLettuceConnection 时对您的命令进行管道化?

我还注意到,将 ReactiveRedisTemplate.execute 与具有多个 Redis 命令的 RedisCallback 一起使用比单独调用命令要慢。

带有 ReactiveRedisTemplate 的管道示例代码:

reactiveRedisTemplate.execute(connection -> keys.flatMap(key -> 
                                connection.hashCommands()
                                .hGetAll(ByteBuffer.wrap(key.getBytes()))))
                    .map(Map.Entry::getValue)
                    .map(ByteUtils::getBytes)
                    .map(b -> 
                        try 
                        return mapper.readValue(b, Value.class);
                         catch (IOException e1) 
                        return null;
                        
                    )
                    .collectList();

没有管道的代码:

keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
            .map(Map.Entry::getValue)
            .cast(Value.class)
            .collectList();

谢谢!

【问题讨论】:

我遇到了同样的问题,已经谷歌搜索了 1 周没有结果,您有什么解决方案吗? @kriver。您找到问题的答案了吗?请更新答案 【参考方案1】:

我认为不可能使用 RedisReactiveTemplate 或生菜的反应式 API。事实上,当你构建你的反应链时,它的某些部分会被延迟评估。

getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()

例如,在此示例中,只有当 getAsyncValue 返回值时才会触发 doSomething。

现在,如果我们获取您的 RedisCallback 样本并假设我们在连接对象中有一个 flushAll 方法。你在哪里/什么时候叫它?

tpl.execute(connection -> 
                    Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
                            connection.hashCommands()
                                    .hGetAll(ByteBuffer.wrap(key.getBytes())));
                    connection.fluxAll();
                    return results;
                )

这样不会将命令刷新到服务器,因为不会触发任何 hashCommands。

现在让我们看看我们拥有的所有信号回调:

doOnNext doOnError doOnCancel 先做 doOnSubscribe doOnRequest doOnTerminate doAfterTerminate 最后做 doOnComplete

doOnError 或 doOnCancel 不会帮助我们。但我们可以考虑使用 doFinally、doOnTerminate、doAfterTerminate:

tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
                                        .hGetAll(ByteBuffer.wrap(key.getBytes())))
                                        .doFinally(s -> connection.flushAll()))

但是 htGetAll 在命令刷新到服务器之前不会完成,因此不会调用 doFinnaly,因此我们将无法刷新....

我能想到的唯一解决方法是直接使用生菜的异步 api。有一个sample in the documentation 告诉你怎么做。

您的代码可能看起来像(未经测试):

// client is a RedisClient from lettuce

StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
                .collectList()
                .doOnNext(f -> command.flushCommands())
                .flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))

【讨论】:

以上是关于使用 Reactive Lettuce 流水线 Redis 命令的主要内容,如果未能解决你的问题,请参考以下文章

redis 的Lettuce配置

Spring Boot 整合 Lettuce Redis

R Shiny Reactive 值,dplyr 过滤器错误?

如果声明R闪亮

springboot2.x版本整合redis(单机/集群)(使用lettuce)

lettuce之springboot整合redis