如何有选择地跳过 Flux 上的几个处理步骤
Posted
技术标签:
【中文标题】如何有选择地跳过 Flux 上的几个处理步骤【英文标题】:How to optionally skip several processing steps on Flux 【发布时间】:2019-12-20 09:24:03 【问题描述】:我有从套接字接收到的动态热数据流。 我需要检查条件,如果值匹配,则使用新消息跳转到第 3 步。
final Flux<Msg> msgs = Flux.generate(receiver);
final Flux<Msg> processed = msgs
.map(this::checkCondition) //step1
.map(remote::doLongRunning) //optional step2
.map(this::processFurther) //step3
...
public Msg checkCondition(Msg msg)
if(doCheck(msg))
//is there a way to jump to step3 here ?
return new OtherMsg(msg, "someAdditionalData"))
else
return msg
我能想到的唯一选择 - 拆分 Flux 并将其组装回来,有没有更清洁的方法?
final Flux<Msg> msgs = Flux.generate(receiver);
final Flux<OtherMsg> checked = msgs
.filter(this::doCheck) //step1
.map(msg -> new OtherMsg(msg, "someAdditionalData"));
final Flux<OtherMsg> unchecked = msgs
.filter(msg -> !doCheck(msg)) //step1
.map(remote::doLongRunning); //optional step2
Flux.merge(checked, unchecked)
.map(this::processFurther) //step3
【问题讨论】:
【参考方案1】:您不能跳过一个步骤,但您可以将flatMap()
与三元运算符一起使用以作为条件分支的一种形式:
final Flux<Msg> processed = msgs
.flatMap(msg -> doCheck(msg)
? Mono.just(new OtherMsg(msg, "someAdditionalData")).map(remote::doLongRunning)
: Mono.just(msg))
.map(this::processFurther);
这样你可以调用任何其他方法来操作三元表达式第一部分的值,如果doCheck()
返回false,第二部分将确保它被绕过。 processFurther()
将在 flatMap()
调用之后执行,因此无论如何都会执行。
【讨论】:
以上是关于如何有选择地跳过 Flux 上的几个处理步骤的主要内容,如果未能解决你的问题,请参考以下文章