从正确的线程调用 RxJava2 可取消/可处置
Posted
技术标签:
【中文标题】从正确的线程调用 RxJava2 可取消/可处置【英文标题】:Invoke RxJava2 cancellable/disposable from correct thread 【发布时间】:2018-03-30 21:55:54 【问题描述】:我正在实现一个从Resource
发出行的可观察对象。
问题是这个资源真的不喜欢从创建它的不同线程关闭(它会杀死一只小狗并在发生这种情况时抛出异常)。
当我处理订阅时,资源 Cancellable
/Disposable
是从 main
线程调用的,而 observable 是在 Schedulers.io()
上订阅的。
这是 Kotlin 代码:
fun lines(): Observable<String> =
Observable.create emitter ->
val resource = NetworkResource()
emitter.setCancellable
resource.close() // <-- main thread :(
try
while (!emitter.isDisposed)
emitter.onNext(resource.readLine()) // <-- blocked here!
catch (ioe: IOException)
emitter.tryOnError(ioe) // <-- this also triggers the cancellable
val disposable = lines()
.subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread())
.subscribe Log.i(TAG, "Line: $it"
disposable.dispose() // <-- main thread :)
问题:考虑到订阅线程在resource.readLine()
上被阻塞,是否可以从正确的线程调用Cancellable
?
*正确的线程意味着来自subscribeOn(Schedures.io())
的线程。
编辑:恐怕这个问题没有正确答案,除非resource.close()
被设为线程安全或对resource.dataReady
进行某种轮询以使线程不是被屏蔽了。
【问题讨论】:
我认为你应该尝试unsubscribeOn
方法来定义cancellable
的执行位置。
@masp 感谢您的评论,但它不起作用。请参阅我对以下答案的评论。还有其他想法吗?
【参考方案1】:
Schedulers.io()
管理一个线程池,因此它可能会也可能不会使用同一个线程来处理您的资源。您必须使用自定义调度程序和 unsubscribeOn()
运算符来确保您的 Observable
在同一个线程上订阅和取消订阅。比如:
Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
val disposable = lines()
.unsubscribeOn(customScheduler)
.subscribeOn(customScheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe Log.i(TAG, "Line: $it"
【讨论】:
感谢您的回答,但它不起作用。来自执行程序的线程在resource.readLine()
处被阻塞,因此使用unsubscribeOn(...)
调度cancellable 永远不会调用它,因为线程永远不会被释放。想法?【参考方案2】:
如果您不介意稍微延迟拨打NetworkResource#close
的电话,为什么不直接
fun lines(): Observable<String> =
Observable.create emitter ->
val resource = NetworkResource()
try
while (!emitter.isDisposed)
emitter.onNext(resource.readLine())
resource.close()
catch (ioe: IOException)
emitter.tryOnError(ioe)
但是这仍然存在一个问题:如果是IOException
,没有人会打电话给NetworkResource#close
(我认为在您的问题示例中也是如此)。
尝试解决这个问题:
fun lines(): Observable<String> =
Observable.create emitter ->
val resource = NetworkResource()
try
while (!emitter.isDisposed)
emitter.onNext(resource.readLine())
catch (ioe: IOException)
emitter.tryOnError(ioe)
finally
resource.close() // try-catch here, too?
或使用“Kotlin-Try-With-Resources”函数use
fun lines(): Observable<String> =
Observable.create emitter ->
NetworkResource().use resource ->
try
while (!emitter.isDisposed)
emitter.onNext(resource.readLine())
catch (ioe: IOException)
emitter.tryOnError(ioe)
我希望,这会有所帮助。祝你周末愉快。
【讨论】:
Hey Peti,感谢您的回答,但这并不能解决问题,因为线程仍将卡在resource.readLine()
,因此 emitter.isDisposed
标志将不会被检查,直到线路已收到,这可能需要一段时间。恐怕没有答案,除非从另一个线程调用resource.close()
是安全的。周末愉快!
是的,专用的 IO 线程将调用 close
延迟...这是此答案的“如果您不介意延迟通话”介绍 :-) 但这真的是一个问题?因为NetworkResource
似乎是阻塞的,所以在正在进行的lines()
订阅的生命周期中,专用 IO 线程无论如何都会以某种方式“浪费”......
你说得对,线程被“浪费”是个小问题,我更担心的是无法关闭底层网络连接。【参考方案3】:
换一条路怎么样?
a) 使NetworkResource
线程安全(如果您可以控制源代码)
或
b) 通过用“代理”包装NetworkResource
? “代理”是指内部使用专用线程的代理,该线程与NetworkResource
进行所有交互(构造、readLine、关闭...)
【讨论】:
再次感谢佩蒂。关于 a) 我无法控制它,关于 b) 已经尝试过了。以上是关于从正确的线程调用 RxJava2 可取消/可处置的主要内容,如果未能解决你的问题,请参考以下文章
如何正确使用 Java 中 Apollo Client 的 RxJava2 库进行同步调用?
如何实现取消并正确处置 CancellationTokenSource