在 RxJava2(Android) 中订阅 Vs 订阅?

Posted

技术标签:

【中文标题】在 RxJava2(Android) 中订阅 Vs 订阅?【英文标题】:Subscribewith Vs subscribe in RxJava2(Android)? 【发布时间】:2017-11-22 06:38:32 【问题描述】:

何时调用 subscribeWith 方法而不是普通订阅?用例是什么?

compositeDisposable.add(get()
    .observeOn(androidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() 
                    @Override public void onNext(News value) 
                        handleResponse(value);
                    

                    @Override public void onError(Throwable e) 
                        handleError(e);
                    

                    @Override public void onComplete() 
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    
                ));

谢谢


稍后添加

根据文档,两者都返回一次性 SingleObserver 实例:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) 
    subscribe(observer);
    return observer;


@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) 
    ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
    subscribe(s);
    return s;

ConsumerSingleObserver 类实现 SingleObserver 和 Disposable。

【问题讨论】:

感谢@Simbatrons 的回答,总结特定用例(据我了解)是,如果您有相同的观察者要绑定到不同的可观察对象,请使用 subscribeWith。 (因此多个 Observable 可以使用相同的观察者实现)。如果您认为这不是用例的唯一区别,请添加您的评论 我想知道同样的事情——在我看来,你的第一个 sn-p 使用 lambda 等更干净。所以,是的,当你想重用同一个 Observer 时,这似乎是一种罕见的情况唯一一次需要 .subscribeWith()?奇怪的是,文档并没有真正提到返回 Disposable 的 subscribe 的重载变体。相反,他们只是指出你使用新的和笨拙的 subscribeWith() 【参考方案1】:

Observable#subscribe 解释:

在您的第一个代码 sn-p 中:

.subscribe(this::handleResponse, this::handleError));

您实际上正在使用几个重载的Observable#subscribe 方法之一:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

还有一个也接受Action 来执行onComplete:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete) 

另一个选项允许您简单地传入Observer (注意:void 方法) (编辑 2 - 此方法在 ObservableSource 中定义,这是Observable 扩展。)

public final void subscribe(Observer<? super T> observer)

在您问题的第二个代码 sn-p 中,您使用了 subscribeWith 方法,该方法仅返回您传入的 Observer(为了方便/缓存等):

public final <E extends Observer<? super T>> E subscribeWith(E observer)

Observer#onComplete 解释:

Observer#onComplete 在 Observable 发出流中的所有项目后被调用。 来自 java 文档:

/**
 * Notifies the Observer that the @link Observable has finished sending push-based notifications.
 * <p>
 * The @link Observable will not call this method if it calls @link #onError.
 */
void onComplete();

例如,如果您的代码 sn-ps 中的get() 返回一个发出多个News 对象的Observable,则每个对象都将在Observer#onNext 中处理。在这里您可以处理每个项目。

在它们都被处理后(假设没有发生错误),然后onComplete 将被调用。在这里,您可以执行任何需要执行的额外操作(例如更新 UI),因为您已经处理了所有 News 对象。

不要与 Disposable#dispose 混淆,后者在可观察流结束(完成/错误)时被调用,或者由您手动终止观察(这是 CompositeDisposable 出现的地方,因为它可以帮助您处理一次包含的所有Disposables)。

如果在您的场景中,get() 将返回一个仅发出单个项目的 Observable,则不要使用 Observable,而是考虑使用仅处理一个项目的 io.reactivex.Single(在 @987654346 @),并且不需要为 onComplete 指定Action :)

编辑:回复您的评论:

但是我仍然没有使用subscribeWith,你说它通过了 缓存等的观察者,它传递到哪里?完成了吗?和 据我了解 subscribeWith 实际上并没有消耗 可观察(或单一)对吗?

为了进一步阐明subscribeWith 的解释,我的意思是它 使用您传递给subscribeWithObserver 对象(与subscribe 方法完全相同)但是它还会将同一 Observer 直接返回给您。在撰写本文时,subscribeWith 的实现是:

public final <E extends Observer<? super T>> E subscribeWith(E observer) 
    subscribe(observer);
    return observer;

因此,subscribeWith可以subscribe互换使用。

你能举一个 subscribeWith 的用例吗?我猜可能是 会完全回答问题

subscribeWith javadoc 给出了以下用法示例:

Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();

ResourceObserver<Integer> rs = new ResourceObserver<>() 
     // ...
;

composite.add(source.subscribeWith(rs));

请参见此处,subscribeWith 的使用将返回与实例化相同的 ResourceObserver 对象。这为执行订阅和将ResourceObserver 添加到CompositeDisposable 提供了便利(注意ResourceObservable 实现Disposable。)

编辑 2 回复第二条评论。

source.subscribeWith(rs); source.subscribe(rs);两者都返回 SingleObserver 实例,

ObservableSource#subscribe(Observer &lt;? super T&gt; observer) 返回Observer。它是一个 void 方法(参见上面 Observable#subscribe 解释下的注释。)而 Observable#subscribeWith DOES 返回 Observer。 如果我们要改用ObservableSource#subscribe 重写示例用法代码,我们必须像这样在两行中完成:

source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);

Observable#subscribeWith 方法使我们可以方便地在一行中完成上述操作composite.add(source.subscribeWith(rs));

它可能会与所有看起来有些相似的重载订阅方法混淆,但存在差异(其中一些是微妙的)。查看代码和文档有助于区分它们。


编辑 3 subscribeWith 的另一个示例用例

subscribeWith 方法适用于您可能希望重用的 Observer 的特定实现。例如,在上面的示例代码中,它在订阅中提供了ResourceObserver 的特定实现,从而继承了它的功能,同时仍然允许您处理 onNext onError 和 onComplete。

另一个使用示例:对于您问题中的示例代码,如果您想在多个地方对get() 响应执行相同的订阅怎么办?

您无需在不同的类中复制 onNext 和 onError 的 Consumer 实现,而是为 eg 定义一个新类。

//sample code..
public class GetNewsObserver extends DisposableObserver<News> 
    //implement your onNext, onError, onComplete.
    ....

现在,每当您执行 get() 请求时,您只需执行以下操作即可订阅:

compositeDisposable.add(get()
    ...
    .subscribeWith(new GetNewsObserver()));

现在代码很简单,您保持处理响应的责任分离,现在可以在任何地方重用 GetNewsObserver

【讨论】:

感谢您提供的宝贵信息,它帮助我更好地了解了 disposing 和 onComplete。但是我仍然没有使用subscribeWith,你说它通过观察者进行缓存等,它传递到哪里?完成了吗?根据我的理解,subscribeWith 实际上并没有消耗 observable(或 Single)对吗?所以 subscribeWith 不能与 Subscribe 方法互换使用。你能举一个 subscribeWith 的用例吗?我想这将完全回答这个问题。 tnx 添加了对答案的回复:) 参考你最近的编辑回复 source.subscribeWith(rs); source.subscribe(rs);两者都返回 SingleObserver 实例,所以你是对的,两者都可以互换使用,即 composite.add(one | another) 。但是为什么有人选择使用其中一个而不是另一个我仍然不清楚。抱歉,如果我错过了您的观点,但我没有看到特定的用例,我的意思是,在这种情况下,我们应该使用 subscribeWith 并且 subscribe 不提供。助教 source.subscribe(rs);不返回观察者。我在答案的“编辑 2”部分添加了进一步的解释。 这完全取决于用例。如果您要订阅并在一个地方提供 onNext、onError 的唯一实现,那么使用接受 lambda 的 #subscribe 是有意义的。否则,如果您需要在各种不同的类中重用常见的 Observer 代码,例如我在 edit3 中给出的示例或 javadoc 中的 ResourceObservable 示例,那么您可以使用#subscribeWith。使用最能解决问题的方法:)

以上是关于在 RxJava2(Android) 中订阅 Vs 订阅?的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

RxJava2:如何在处置订阅者后避免InterruptibleException?

RxJava的学习与实现

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

Android EventBus3.x 使用详解