Kotlin RX .zipWith 函数体每个观察者执行一次还是一次?

Posted

技术标签:

【中文标题】Kotlin RX .zipWith 函数体每个观察者执行一次还是一次?【英文标题】:Kotlin RX .zipWith function body executes once or once per observer? 【发布时间】:2022-01-07 18:39:05 【问题描述】:

当数据传输到我的应用程序时,它遵循以下顺序:

1 个带有 ID 信息的 ReadStart 数据包 1 个或多个 DataPackets 组合形成有效负载 1 个 ReadDone 数据包表示传输完成

我有一个创建 Observable 的 Kotlin RX 函数:

val readStartPublishProcessor: PublishProcessor<ReadStartPacket>
val dataPacketPublishProcessor: PublishProcessor<DataPacket>
val readDonePublishProcessor: PublishProcessor<ReadDonePacket>
...

private fun setupReadListener(): Flowable<ReadEvent> 
    val dataFlowable = dataPacketPublishProcessor.buffer(readDonePublishProcessor)
    
    return readStartPublishProcessor
        .zipWith(other = dataFlowable)  readStart, dataPackets ->
            Log.d(tag, "ReadEvent with $dataPackets.size data packets")
            ReadEvent(event = readStart, payload = combinePackets(dataPackets))
        

通过阅读.zipWith 的文档,我希望.zipWith 函数体对readStartPublishProcessordataFlowable 发出的每对值执行一次,然后将该计算结果传递给每个订阅者:

Zip 方法返回一个 Observable,它应用你的函数 选择由两个(或 更多)其他 Observables,这个函数的结果变成 返回的 Observable 发出的项目。 ...它只会发出 many items 作为源 Observable 发出的项目数 发出最少的项目。

但如果我有超过 1 个观察者,我会看到 .zipWith 函数体执行的次数与观察者的数量相同,每次都使用相同的一对发射值。这是一个问题,因为从 .zipWith 函数体中调用的函数会产生副作用。 (注意:观察者中不使用 .share 和 .replay 运算符。)

为什么似乎为每个观察者运行 .zipWith 函数体而不是只运行一次,有没有办法编写它以便它只执行一次而不管观察者的数量?

【问题讨论】:

【参考方案1】:

几点...

zipWith 中调用的函数不应包含副作用。它应该是一个纯函数。如果您在那里绝对需要副作用,请使用do 运算符之一。

zipWith 返回的 observable 是冷的(默认情况下,Observable 是冷的)这意味着每个观察者都有自己的执行上下文。即,操作员在每次订阅时订阅其源 observables,并为每个订阅调用其拥有的功能块。

如果您希望订阅共享执行上下文,则必须使用 sharerefCount 运算符。 Learn more about Hot and Cold Observables here.

【讨论】:

如果 .zipWith 主体应该是一个纯函数,为什么要为给定的输入对多次执行它?每次执行的结果都是一样的,这是对 CPU 周期的浪费。 Rx Design Guidelines, "Rx 中的组合贯穿每个订阅的每个运算符(共享运算符除外,例如 Publish)。这将使每个订阅的每个副作用都发生。如果这种行为是期望的行为,最好通过将副作用代码放在 Do 运算符中来明确这一点。” 正是因为块被多次调用,你应该确保块是纯的。 购买为什么要浪费cpu周期多次计算相同的结果? 这就是为什么我包含了关于 Hot and Cold Observables 的链接。我建议您阅读 Rx 背后的一些哲学。如果您仍有疑问,您可以随时在software engineering stack exchange 上提问,或者如果它们与代码而不是哲学直接相关,请在此处提问。

以上是关于Kotlin RX .zipWith 函数体每个观察者执行一次还是一次?的主要内容,如果未能解决你的问题,请参考以下文章

erlang实现zipwith函数

有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?

理解Haskells类型签名(例如zip with with(+))

Kotlin-inline:你需要知道的一切(Android)

kcp结构体字段含义

Kotlin 类和对象