RxSwift - 执行控制流会导致异步操作被执行两次

Posted

技术标签:

【中文标题】RxSwift - 执行控制流会导致异步操作被执行两次【英文标题】:RxSwift - doing control flow leads to async operation being performed twice 【发布时间】:2021-03-26 14:49:06 【问题描述】:

看看这个例子:

func query2() -> Observable<Int> 
    print("query2() called")
    return Observable.create  observer in
        print("creating query2() thread")
        let thread = Thread.init(block: 
            sleep(1)
            let numbers = [
                1,2,3,4,5
            ]
            for num in numbers 
                observer.onNext(num)
            
            observer.onCompleted()
        )
        thread.start()
        return Disposables.create 
            thread.cancel()
        
    


let numbers = query2()
let even = numbers.filter  $0 % 2 == 0 
let odd = numbers.filter  $0 % 2 != 0 
let merged = even.concat(odd)

merged.subscribe(onNext:  n in
    print(n)
)

预期的输出是:

query2() called
creating query() thread
2
4
1
3
5

但是,当需要从odd 中提取值时,线程似乎是第二次创建的。

实际输出:

query2() called
creating query2() thread
2
4
creating query2() thread
1
3
5

我看了这段代码,然后想 - 啊,我错过了 .share() 运算符,因为 evenodd 来自同一个流。我最初没有添加它,因为我最终将它们合并到一个流 merged 以供订阅,并认为 Rx 会为我做优化。

所以我使用了 share():let numbers = query2().share()

输出仍然保持不变。

我怎样才能防止这种情况发生?

【问题讨论】:

【参考方案1】:

每次订阅结果 Observable 时都会调用传递给 Observable.create 的闭包。

share() 运算符有一个引用计数。它会在收到订阅请求时订阅其源,然后如果它在源运行时收到另一个请求,它也会向第二个源发送事件。

这里的惊喜是你的 observable 在第一次订阅返回之前完成,所以没有什么可分享的。请注意,当您在后台线程中调用onNext 时,它立即 调用该后台线程中传递给订阅的闭包。当您调用onCompleted 时,它会立即完成。它不会等待订阅退出来做这些事情。

这里的解决方案是使用多播运算符使您的numbers 可观察到热。向它传递一个 ReplaySubject,它将存储输出并将其重播给任何后续订阅者。不要忘记调用connect() 来让 Observable 运行它的生成器函数。

类似这样的:

let numbers = Observable.deferred  () -> Observable<Int> in
    print("called")
    return Observable.from([1, 2, 3, 4, 5, 6])

.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .default)) // this ensures that the block passed to `deferred` is called on a background thread.
.multicast(ReplaySubject<Int>.createUnbounded())

numbers.connect()

let even = numbers.filter  $0 % 2 == 0 
let odd = numbers.filter  $0 % 2 != 0 
let merged = even.concat(odd)

merged.subscribe(onNext:  print($0) )

【讨论】:

效果很好。似乎如果我要在 Rx 世界中进行大部分编程,我需要做很多多播(这需要我非常清楚代码图,因为这是明确的)。您是否建议通过在订阅后执行所有同步转换尽快进入非 Rx 世界? 迄今为止,我已经使用 RxSwift 6 年了。我从来不需要在生产或示例应用程序中使用多播。 share 运算符是我所需要的(有时需要重播)。这里的特殊情况是您正在吐出流,然后使用 concat 重新组合它,这会及时错开其订阅。这是一件非常不寻常的事情。

以上是关于RxSwift - 执行控制流会导致异步操作被执行两次的主要内容,如果未能解决你的问题,请参考以下文章

如何用python实现异步io

转载使用Win32API实现Windows下异步串口通讯

架构笔记[一]:同步与异步

未调用 RxSwift 订阅块

RXSwift 简单的异步示例?

如何在退出时执行异步操作