如何将始终从特定线程调用其回调的侦听器包装到符合 subscribeOn 定义的调度程序的 Observable 中?

Posted

技术标签:

【中文标题】如何将始终从特定线程调用其回调的侦听器包装到符合 subscribeOn 定义的调度程序的 Observable 中?【英文标题】:How to wrap listeners that always calls its callbacks from a specific thread into an Observable that conforms to the Scheduler defined by subscribeOn? 【发布时间】:2017-04-12 17:38:21 【问题描述】:

简要介绍给不熟悉 Android 和/或 Firebase 开发的人:

android 开发中,您应该始终从主线程(也称为 UI 线程)操作应用程序的视图,但如果您的应用程序需要进行一些繁重的处理,则应该使用后台线程 otherwise the app would seem unresponsive。

Firebase 是一项服务,它提供了一种在云中存储和与 NoSQL 数据库同步数据的方法。它还提供了一个 Android SDK 来管理这个数据库。每次使用此 SDK 进行操作(如查询)时,Firebase 都会在其自己的内部后台线程上进行所有繁重的处理并始终调用其回调 on the main thread,从而避免了这些线程陷阱。

例子:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

ValueEventListener postListener = new ValueEventListener() 
  @Override
  public void onDataChange(DataSnapshot dataSnapshot) 
    // This is always called on the main thread
    // Get Post object and use the values to update the UI
    Post post = dataSnapshot.getValue(Post.class);
    // ...
  

  @Override
  public void onCancelled(DatabaseError databaseError) 
    // Getting Post failed, log a message
    printError(databaseError.toException());
    // ...
  
;

postsQuery.addValueEventListener(postListener);

我面临的实际问题:

我正在尝试使用这样的方法将 Firebase 的查询侦听器与 RxJava 包装起来:

private static Observable<DataSnapshot> queryObservable(final Query query) 
  return Observable.fromEmitter(emitter -> 
    // This is called on the Scheduler's thread defined with .subscribeOn()
    printThread("emitter");
    final ValueEventListener listener = new ValueEventListener() 
      @Override public void onDataChange(final DataSnapshot dataSnapshot) 
        // This is always called on the main thread
        printThread("onDataChange");
        emitter.onNext(dataSnapshot);
      

      @Override public void onCancelled(final DatabaseError databaseError) 
        // This is called on the main thread too
        emitter.onError(databaseError.toException());
      
    ;

    query.addValueEventListener(listener);

    emitter.setCancellation(() -> query.removeEventListener(listener));
  , Emitter.BackpressureMode.BUFFER);

但是因为 Observable 是从 Firebase 的回调(在主线程上调用)内部发出项目,所以任何进一步的 .subscribeOn() 运算符都将被忽略。

比如像这样调用上面的方法:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

queryObservable(postsQuery).doOnSubscribe(() -> printThread("onSubscribe"))
    .subscribeOn(Schedulers.io())
    .subscribe(dataSnapshot -> printThread("onNext"));

将打印以下内容:

onSubscribe Thread: Rxioscheduler-2
emitter Thread: RxIoScheduler-2
onDataChange Thread: main
onNext Thread: main

据我了解,当 Firebase 的 SDK 调用 onDataChange() 回调并从其自己的内部后台线程切换到主线程时,它也会使 Observable 在主线程上发出新项目,从而使任何 .subscribeOn() 运算符无用顺流而下。

实际问题:

我该怎么做才能正确地将这样的侦听器包装到 Observable 中,而且还要使它们符合 .subscribeOn() 定义的调度程序?

谢谢!

更新:

我知道.observeOn() 让我能够在另一个线程上处理 Firebase 返回的数据。这就是我已经在做的事情,但这不是这个问题的重点。关键是:当我通过.subscribeOn() 传递一个调度程序时,我希望上游符合该调度程序的线程,但是当 Observable 有一个内部侦听器被另一个线程上的回调触发时,这不会发生。发生这种情况时,我将失去.subscribeOn() 保证。

这个问题的严重性起初可能看起来并不明显,但如果这个 Observable 是库的一部分呢?那里的最佳做法是什么?库是否应该强制其客户在调用该方法后始终调用.observeOn()?库是否应该调用.observeOn() 本身并将其称为“默认调度程序”?在任何这些情况下,.subscribeOn() 都是无用的,这对我来说似乎不正确。

【问题讨论】:

似乎唯一的选择是在查询后调用observeOn强制更改下游线程。 @maxost 这实际上是我正在使用的当前解决方法。问题是它仍然忽略了由.subscribeOn() 定义的调度程序。这将使queryObservable() 方法上面的API 不好。我希望有一个适当的解决方案。 @RenanFerrari 你找到解决方案了吗?谢谢 @GabbarSingh 不是。我一直在回调中做很少的工作,然后尽快致电observeOn 你找到解决办法了吗? 【参考方案1】:

只需在 IO 中使用 observeOn,在主线程中使用 subscribeOn,这样您就可以在 MainThread 中管理您收到的内容,并将 firebase 工作移至不同的 Thread

记得将 rxAndroid 导入你的 gradle(Rxjava 或 RxJava 2):

 compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

还建议您检查下一个库作为参考(或仅使用它):

RxJava : https://github.com/nmoskalenko/RxFirebase

RxJava 2.0:https://github.com/FrangSierra/Rx2Firebase

其中一个适用于 RxJava,另一个适用于 RxJava 2.0 的新 RC。有兴趣的可以看看here两者的区别。

【讨论】:

【参考方案2】:

我遇到了同样的问题,最后我结合 Coroutines 在后台运行监听器。 为此,只需在 onDataChange 中添加协程后台工作

亲切的问候

【讨论】:

以上是关于如何将始终从特定线程调用其回调的侦听器包装到符合 subscribeOn 定义的调度程序的 Observable 中?的主要内容,如果未能解决你的问题,请参考以下文章

如何通知多线程应用程序中的特定线程

当 `this` 不可用时,从拖动回调中检索 DOM 目标

始终调用 p:ajax 事件的 oncomplete,即使侦听器没有返回任何内容[重复]

包装器和绑定器std::bind和std::function的回调技术

包装器和绑定器std::bind和std::function的回调技术

RxJava Flowable.create(),如何尊重 subscribeOn() 线程