从正确的线程调用 RxJava2 可取消/可处置

Posted

技术标签:

【中文标题】从正确的线程调用 RxJava2 可取消/可处置【英文标题】:Invoke RxJava2 cancellable/disposable from correct thread 【发布时间】:2018-03-30 21:55:54 【问题描述】:

我正在实现一个从Resource 发出行的可观察对象。

问题是这个资源真的不喜欢从创建它的不同线程关闭(它会杀死一只小狗并在发生这种情况时抛出异常)。

当我处理订阅时,资源 Cancellable/Disposable 是从 main 线程调用的,而 observable 是在 Schedulers.io() 上订阅的。

这是 Kotlin 代码:

fun lines(): Observable<String> =
        Observable.create  emitter ->
            val resource = NetworkResource()
            emitter.setCancellable 
                resource.close() // <-- main thread :(
            
            try 
                while (!emitter.isDisposed)
                    emitter.onNext(resource.readLine()) // <-- blocked here!
             catch (ioe: IOException) 
                emitter.tryOnError(ioe) // <-- this also triggers the cancellable
            
        

val disposable = lines()
        .subscribeOn(Schedulers.io())
        .observeOn(androidSchedulers.mainThread())
        .subscribe  Log.i(TAG, "Line: $it" 

disposable.dispose() // <-- main thread :)

问题:考虑到订阅线程在resource.readLine() 上被阻塞,是否可以从正确的线程调用Cancellable

*正确的线程意味着来自subscribeOn(Schedures.io())的线程。

编辑:恐怕这个问题没有正确答案,除非resource.close() 被设为线程安全或对resource.dataReady 进行某种轮询以使线程不是被屏蔽了。

【问题讨论】:

我认为你应该尝试unsubscribeOn 方法来定义cancellable 的执行位置。 @masp 感谢您的评论,但它不起作用。请参阅我对以下答案的评论。还有其他想法吗? 【参考方案1】:

Schedulers.io() 管理一个线程池,因此它可能会也可能不会使用同一个线程来处理您的资源。您必须使用自定义调度程序和 unsubscribeOn() 运算符来确保您的 Observable 在同一个线程上订阅和取消订阅。比如:

Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor());

val disposable = lines()
        .unsubscribeOn(customScheduler)
        .subscribeOn(customScheduler)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe  Log.i(TAG, "Line: $it" 

【讨论】:

感谢您的回答,但它不起作用。来自执行程序的线程在resource.readLine() 处被阻塞,因此使用unsubscribeOn(...) 调度cancellable 永远不会调用它,因为线程永远不会被释放。想法?【参考方案2】:

如果您不介意稍微延迟拨打NetworkResource#close 的电话,为什么不直接

    fun lines(): Observable<String> =
            Observable.create  emitter ->
                val resource = NetworkResource()
                try 
                    while (!emitter.isDisposed) 
                        emitter.onNext(resource.readLine())
                    
                    resource.close()
                 catch (ioe: IOException) 
                    emitter.tryOnError(ioe)
                
            

但是这仍然存在一个问题:如果是IOException,没有人会打电话给NetworkResource#close(我认为在您的问题示例中也是如此)。

尝试解决这个问题:

    fun lines(): Observable<String> =
            Observable.create  emitter ->
                val resource = NetworkResource()
                try 
                    while (!emitter.isDisposed) 
                        emitter.onNext(resource.readLine()) 
                    
                 catch (ioe: IOException) 
                    emitter.tryOnError(ioe)
                 finally 
                    resource.close() // try-catch here, too?
                
            

或使用“Kotlin-Try-With-Resources”函数use

    fun lines(): Observable<String> =
            Observable.create  emitter ->
                NetworkResource().use  resource ->
                    try 
                        while (!emitter.isDisposed) 
                            emitter.onNext(resource.readLine())
                        
                     catch (ioe: IOException) 
                        emitter.tryOnError(ioe)
                    
                
            

我希望,这会有所帮助。祝你周末愉快。

【讨论】:

Hey Peti,感谢您的回答,但这并不能解决问题,因为线程仍将卡在 resource.readLine(),因此 emitter.isDisposed 标志将不会被检查,直到线路已收到,这可能需要一段时间。恐怕没有答案,除非从另一个线程调用resource.close() 是安全的。周末愉快! 是的,专用的 IO 线程将调用 close 延迟...这是此答案的“如果您不介意延迟通话”介绍 :-) 但这真的是一个问题?因为NetworkResource 似乎是阻塞的,所以在正在进行的lines() 订阅的生命周期中,专用 IO 线程无论如何都会以某种方式“浪费”...... 你说得对,线程被“浪费”是个小问题,我更担心的是无法关闭底层网络连接。【参考方案3】:

换一条路怎么样?

a) 使NetworkResource 线程安全(如果您可以控制源代码)

b) 通过用“代理”包装NetworkResource? “代理”是指内部使用专用线程的代理,该线程与NetworkResource 进行所有交互(构造、readLine、关闭...)

【讨论】:

再次感谢佩蒂。关于 a) 我无法控制它,关于 b) 已经尝试过了。

以上是关于从正确的线程调用 RxJava2 可取消/可处置的主要内容,如果未能解决你的问题,请参考以下文章

如何正确使用 Java 中 Apollo Client 的 RxJava2 库进行同步调用?

如何实现取消并正确处置 CancellationTokenSource

RxJava2:如何在处置订阅者后避免InterruptibleException?

在 CustomControl 中清理资源/处置

SylixOS中pthread_cancel函数浅析

登录窗体为主线程窗体时实现重登录的一个困难