RxSwift - 执行控制流会导致异步操作被执行两次
Posted
技术标签:
【中文标题】RxSwift - 执行控制流会导致异步操作被执行两次【英文标题】:RxSwift - doing control flow leads to async operation being performed twice 【发布时间】:2021-03-26 14:49:06 【问题描述】:看看这个例子:
func query2() -> Observable<Int>
print("query2() called")
return Observable.create observer in
print("creating query2() thread")
let thread = Thread.init(block:
sleep(1)
let numbers = [
1,2,3,4,5
]
for num in numbers
observer.onNext(num)
observer.onCompleted()
)
thread.start()
return Disposables.create
thread.cancel()
let numbers = query2()
let even = numbers.filter $0 % 2 == 0
let odd = numbers.filter $0 % 2 != 0
let merged = even.concat(odd)
merged.subscribe(onNext: n in
print(n)
)
预期的输出是:
query2() called
creating query() thread
2
4
1
3
5
但是,当需要从odd
中提取值时,线程似乎是第二次创建的。
实际输出:
query2() called
creating query2() thread
2
4
creating query2() thread
1
3
5
我看了这段代码,然后想 - 啊,我错过了 .share()
运算符,因为 even
和 odd
来自同一个流。我最初没有添加它,因为我最终将它们合并到一个流 merged
以供订阅,并认为 Rx 会为我做优化。
所以我使用了 share():let numbers = query2().share()
输出仍然保持不变。
我怎样才能防止这种情况发生?
【问题讨论】:
【参考方案1】:每次订阅结果 Observable 时都会调用传递给 Observable.create
的闭包。
share()
运算符有一个引用计数。它会在收到订阅请求时订阅其源,然后如果它在源运行时收到另一个请求,它也会向第二个源发送事件。
这里的惊喜是你的 observable 在第一次订阅返回之前完成,所以没有什么可分享的。请注意,当您在后台线程中调用onNext
时,它立即 调用该后台线程中传递给订阅的闭包。当您调用onCompleted
时,它会立即完成。它不会等待订阅退出来做这些事情。
这里的解决方案是使用多播运算符使您的numbers
可观察到热。向它传递一个 ReplaySubject,它将存储输出并将其重播给任何后续订阅者。不要忘记调用connect()
来让 Observable 运行它的生成器函数。
类似这样的:
let numbers = Observable.deferred () -> Observable<Int> in
print("called")
return Observable.from([1, 2, 3, 4, 5, 6])
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .default)) // this ensures that the block passed to `deferred` is called on a background thread.
.multicast(ReplaySubject<Int>.createUnbounded())
numbers.connect()
let even = numbers.filter $0 % 2 == 0
let odd = numbers.filter $0 % 2 != 0
let merged = even.concat(odd)
merged.subscribe(onNext: print($0) )
【讨论】:
效果很好。似乎如果我要在 Rx 世界中进行大部分编程,我需要做很多多播(这需要我非常清楚代码图,因为这是明确的)。您是否建议通过在订阅后执行所有同步转换尽快进入非 Rx 世界? 迄今为止,我已经使用 RxSwift 6 年了。我从来不需要在生产或示例应用程序中使用多播。share
运算符是我所需要的(有时需要重播)。这里的特殊情况是您正在吐出流,然后使用 concat 重新组合它,这会及时错开其订阅。这是一件非常不寻常的事情。以上是关于RxSwift - 执行控制流会导致异步操作被执行两次的主要内容,如果未能解决你的问题,请参考以下文章