连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?
Posted
技术标签:
【中文标题】连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?【英文标题】:Concatenating two observable sequences that both have subscribeOn. How do I ensure my observable runs on a thread? 【发布时间】:2017-02-07 09:43:26 【问题描述】:当涉及到强制某段 Observable.create
代码在特定线程(即后台线程)中运行时,我担心使用 subscribeOn
运算符可能不起作用,因为有时我可能会链接这个 observable序列到另一个在主线程上运行的可观察序列(使用observeOn
)。
示例
情况是我在主线程上运行了一个 Observable 序列(即一个警告框,询问用户是否执行网络调用)。
通过以下方式确保Observable.create
代码在后台线程中运行会更好:
Observable<String>.empty()
.observeOn(ConcurrentMainScheduler(queue: background.queue))
.concat(myObservableNetworkCall)
为什么不直接使用subscribeOn
?
问题是,如果我使用了subscribeOn
(第二个)并且之前的 observable(警报控制器)被设置为使用subscribeOn
(第一个)在后台线程上运行,那么第二个subscribeOn
运算符就不会工作,因为第一次调用更接近源 observable:
如果您指定多个 subscribeOn() 运算符,则靠近源(最左侧)的那个将是使用的那个。
Thomas Nield on RxJava's subscribeOn and observeOn operators(2016 年 2 月)
这可能是 RxJava 的行为,但我不确定 Swift。 Reactivex.io 只是说我们不应该多次调用subscribeOn
。
我倾向于将操作包装到Observable<Void>
s 中,它们需要在不同的线程上运行......这就是为什么我要询问如何确保 Observable 代码在我指定的线程中运行。 subscribeOn
不起作用,因为我可以连接 observable。
我希望它应该运行的线程被封装在我的 Observable 定义中,而不是链中的更高层。
最佳做法是:
-
使用我希望使用的数据类型从 Observable.empty 开始。
使用
observeOn
强制我希望它运行的线程。
将它与我想要使用的实际 Observable 连接起来。
编辑
我已阅读有关 reactivex.io 的 subscribeOn
和 observeOn
文档。
我熟悉如何使用subscribeOn
和observeOn
在线程之间切换。
我特别担心的是在连接或组合可观察序列时使用subscribeOn
的复杂性。
问题是,observables 需要专门在一个线程上运行,而且它们不知道它们将在哪里以及与谁连接。因为我确切知道它们应该在哪个线程上运行,所以我更愿意将调度程序定义封装在可观察对象的定义中,而不是在链接序列时。
【问题讨论】:
您看过驱动程序吗?它们在同一个线程上执行。 github.com/ReactiveX/RxSwift/blob/master/Documentation/Units.md 您会使用这种方式(假设您的 viewModel 中有一个变量错误); viewModel .error .filter $0 != nil .asDriver() .drive(onNext: [unowned self] error in self.handleError(error!) ) .addDisposableTo(disposeBag) 是的,我已经阅读过驱动程序,但这解决了另一个问题。如果它更清楚,请为我的初始示例想象两个单独的后台线程(不是主线程)。我的困惑是关于subscribeOn
运算符的性质,以及确保连接的 Observable<Void>.create()
在特定线程中运行的可能推荐最佳实践。
【参考方案1】:
在函数声明中最好不要指定函数在哪个线程上被调用。 例如:
func myObservableNetworkCall() -> Observable<String>
return Observable<String>.create observer in
// your network code here
return Disposables.create
// Your dispose
func otherObservableNetworkCall(s: String) -> Observable<String>
return Observable<String>.create observer in
// your network code here
return Disposables.create
// Your dispose
然后在Scheduler之间切换:
myObservableNetworkCall()
.observeOn(ConcurrentMainScheduler(queue: background.queue)) // .background thread, network request, mapping, etc...
.flatMap string in
otherObservableNetworkCall(s: string)
.observeOn(MainScheduler.instance) // switch to MainScheduler, UI updates
.subscribe(onNext: string in
// do something
)
【讨论】:
谢谢,但这不是我的问题。我熟悉如何使用subscribeOn
和observeOn
。如果可以请重新阅读问题以了解连接序列时有关subscribeOn
的复杂性。
我的意思是,如我的问题所示,这就是我现在的做法。一个空的 observable 跟随我使用的数据类型,并连接我使用的 observable。这是您的建议,作为确保某些东西在后台线程中运行的最佳实践吗?我不确定最佳做法是什么。
from this question subscribeOn()
告诉整个链开始处理哪个线程。每个链只能调用一次。如果您在流的下游再次调用它,它将无效。 observeOn()
导致在它下面发生的所有操作都在指定的调度程序上执行。您可以在每个流中多次调用它以在不同线程之间移动。
还有一个 good read 解释 RxSwift 多线程
我很清楚如何使用subscribeOn
和observeOn
。我需要澄清两个称为subscribeOn
的可观察对象的串联,以确保它们都在我指定的线程上运行。最佳实践是从一个空的 observable 开始,指定调度程序,然后将其连接起来吗?以上是关于连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?的主要内容,如果未能解决你的问题,请参考以下文章
在具有 Java 8 的嵌入式 Tomcat 8.5 上启用 TLS 握手的可观察性(日志记录/指标)
如何将 TabControl 的项目绑定到 wpf 中的可观察集合?