连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?

Posted

技术标签:

【中文标题】连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?【英文标题】:Concatenating two observable sequences that both have subscribeOn. How do I ensure my observable runs on a thread? 【发布时间】:2017-02-07 09:43:26 【问题描述】:

当涉及到强制某段 Observable.create 代码在特定线程(即后台线程)中运行时,我担心使用 subscribeOn 运算符可能不起作用,因为有时我可能会链接这个 observable序列到另一个在主线程上运行的可观察序列(使用observeOn)。

示例

情况是我在主线程上运行了一个 Observable 序列(即一个警告框,询问用户是否执行网络调用)。

通过以下方式确保Observable.create 代码在后台线程中运行会更好:

Observable<String>.empty()
   .observeOn(ConcurrentMainScheduler(queue: background.queue))
   .concat(myObservableNetworkCall)

为什么不直接使用subscribeOn

问题是,如果我使用了subscribeOn(第二个)并且之前的 observable(警报控制器)被设置为使用subscribeOn(第一个)在后台线程上运行,那么第二个subscribeOn 运算符就不会工作,因为第一次调用更接近源 observable:

如果您指定多个 subscribeOn() 运算符,则靠近源(最左侧)的那个将是使用的那个。

Thomas Nield on RxJava's subscribeOn and observeOn operators(2016 年 2 月)

这可能是 RxJava 的行为,但我不确定 Swift。 Reactivex.io 只是说我们不应该多次调用subscribeOn

我倾向于将操作包装到Observable&lt;Void&gt;s 中,它们需要在不同的线程上运行......这就是为什么我要询问如何确保 Observable 代码在我指定的线程中运行。 subscribeOn 不起作用,因为我可以连接 observable。

我希望它应该运行的线程被封装在我的 Observable 定义中,而不是链中的更高层。

最佳做法是:

    使用我希望使用的数据类型从 Observable.empty 开始。 使用observeOn 强制我希望它运行的线程。 将它与我想要使用的实际 Observable 连接起来。

编辑

    我已阅读有关 reactivex.io 的 subscribeOnobserveOn 文档。

    我熟悉如何使用subscribeOnobserveOn 在线程之间切换。

    我特别担心的是在连接或组合可观察序列时使用subscribeOn 的复杂性。

    问题是,observables 需要专门在一个线程上运行,而且它们不知道它们将在哪里以及与谁连接。因为我确切知道它们应该在哪个线程上运行,所以我更愿意将调度程序定义封装在可观察对象的定义中,而不是在链接序列时。

【问题讨论】:

您看过驱动程序吗?它们在同一个线程上执行。 github.com/ReactiveX/RxSwift/blob/master/Documentation/Units.md 您会使用这种方式(假设您的 viewModel 中有一个变量错误); viewModel .error .filter $0 != nil .asDriver() .drive(onNext: [unowned self] error in self.handleError(error!) ) .addDisposableTo(disposeBag) 是的,我已经阅读过驱动程序,但这解决了另一个问题。如果它更清楚,请为我的初始示例想象两个单独的后台线程(不是主线程)。我的困惑是关于 subscribeOn 运算符的性质,以及确保连接的 Observable&lt;Void&gt;.create() 在特定线程中运行的可能推荐最佳实践。 【参考方案1】:

在函数声明中最好不要指定函数在哪个线程上被调用。 例如:

func myObservableNetworkCall() -> Observable<String> 
    return Observable<String>.create  observer in
        // your network code here

        return Disposables.create 
            // Your dispose
        
    


func otherObservableNetworkCall(s: String) -> Observable<String> 
    return Observable<String>.create  observer in
        // your network code here

        return Disposables.create 
            // Your dispose
        
    

然后在Scheduler之间切换:

myObservableNetworkCall()
    .observeOn(ConcurrentMainScheduler(queue: background.queue)) // .background thread, network request, mapping, etc...
    .flatMap  string in
        otherObservableNetworkCall(s: string)
    
    .observeOn(MainScheduler.instance) // switch to MainScheduler, UI updates
    .subscribe(onNext: string in
        // do something 
    )

【讨论】:

谢谢,但这不是我的问题。我熟悉如何使用subscribeOnobserveOn。如果可以请重新阅读问题以了解连接序列时有关subscribeOn 的复杂性。 我的意思是,如我的问题所示,这就是我现在的做法。一个空的 observable 跟随我使用的数据类型,并连接我使用的 observable。这是您的建议,作为确保某些东西在后台线程中运行的最佳实践吗?我不确定最佳做法是什么。 from this question subscribeOn() 告诉整个链开始处理哪个线程。每个链只能调用一次。如果您在流的下游再次调用它,它将无效。 observeOn() 导致在它下面发生的所有操作都在指定的调度程序上执行。您可以在每个流中多次调用它以在不同线程之间移动。 还有一个 good read 解释 RxSwift 多线程 我很清楚如何使用subscribeOnobserveOn。我需要澄清两个称为subscribeOn 的可观察对象的串联,以确保它们都在我指定的线程上运行。最佳实践是从一个空的 observable 开始,指定调度程序,然后将其连接起来吗?

以上是关于连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?的主要内容,如果未能解决你的问题,请参考以下文章

在具有 Java 8 的嵌入式 Tomcat 8.5 上启用 TLS 握手的可观察性(日志记录/指标)

如何将 TabControl 的项目绑定到 wpf 中的可观察集合?

可观察数组的可观察对象?

访问:具有 2 个主键字段的可更新连接查询,这两个主键字段也是外键

如何创建一个表示其他两个可观察对象完成的可观察对象?

RxJava 观察调用/订阅线程