Rx.Observable subscribe 和 forEach 有啥区别

Posted

技术标签:

【中文标题】Rx.Observable subscribe 和 forEach 有啥区别【英文标题】:What is the difference between Rx.Observable subscribe and forEachRx.Observable subscribe 和 forEach 有什么区别 【发布时间】:2016-04-04 15:28:33 【问题描述】:

像这样创建 Observable 之后

var source = Rx.Observable.create(function(observer) ...);

订阅

有什么区别
source.subscribe(function(x) );

forEach

source.forEach(function(x) );

【问题讨论】:

【参考方案1】:

在RxJS 5.0 后面的ES7 spec 中(但RxJS 4.0 没有),两者并不相同。

订阅

public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): Subscription

Observable.subscribe 是您进行大部分真正的 Observable 处理的地方。它返回一个订阅令牌,您可以使用它来取消您的订阅。当您不知道已订阅的事件/序列的持续时间,或者您可能需要在已知持续时间之前停止收听时,这一点很重要。

forEach

public forEach(next: Function, PromiseCtor?: PromiseConstructor): Promise

Observable.forEach 返回一个承诺,当 Observable 完成或出错时,该承诺将解决或拒绝。它旨在阐明您以更“同步”的方式处理有界/有限持续时间的可观察序列的情况,例如整理所有传入值,然后通过处理承诺呈现一次。

实际上,您可以对每个值以及错误和完成事件采取任何一种方式。因此,最显着的功能差异是无法取消承诺。

【讨论】:

“在 ES7 规范中,RxJS 5.0 遵循(但 RxJS 4.0 没有),两者并不相同。”注意到规格随时间而变化是非常重要的。这很烦人,所以现在它是正确的,也许它不是过去或将来不会。因此,正确的答案必须始终指定哪个版本的内容(以及这个版本)。 使用 forEach 我们需要取消订阅还是它自己处理? @SamiullahKhan :您的问题需要比是/否更复杂的答案。 forEach 内部订阅,然后返回一个promise;有了这个承诺,您将无法退订。但是,您也无法取消,因此您可能会假设它会在没有您干预的情况下完成或失败。可观察库的传统设计期望是Unsubscribe 自动发生在CompleteError 上。因此,如果forEach 在概念上适合您,那么不,您不需要取消订阅。 forEach 不是 UnsubscribeCompleteError 会。 所以,据我了解,forEach 对于await obs.forEach(doSomething) 之类的东西很有用,您的方法将在 observable 完成后完成。另一方面,当观察者的生命周期可能比可观察者的生命周期短时,您将使用.subscribe() 和相应的.unsubscribe()【参考方案2】:

我只是回顾了最新的可用代码,从技术上讲,foreach 的代码实际上是在 RxScala、RxJS 和 RxJava 中调用 subscribe。好像差别不大。他们现在有一个返回类型,允许用户有一种方法来停止订阅或类似的。

当我在 RxJava 早期版本上工作时,订阅有订阅返回,而 forEach 只是一个 void。由于这些变化,您可能会看到一些不同的答案。

/**
 * Subscribes to the [[Observable]] and receives notifications for each element.
 *
 * Alias to `subscribe(T => Unit)`.
 *
 * $noDefaultScheduler
 *  
 * @param onNext function to execute for each item.
 * @throws java.lang.IllegalArgumentException if `onNext` is null
 * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
 * @since 0.19
 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
 */
def foreach(onNext: T => Unit): Unit = 
    asJavaObservable.subscribe(onNext)
 

def subscribe(onNext: T => Unit): Subscription = 
    asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))


/**
 *  Subscribes an o to the observable sequence.
 *  @param Mixed [oOrOnNext] The object that is to receive notifications or an action to invoke for each element in the observable sequence.
 *  @param Function [onError] Action to invoke upon exceptional termination of the observable sequence.
 *  @param Function [onCompleted] Action to invoke upon graceful termination of the observable sequence.
 *  @returns Disposable A disposable handling the subscriptions and unsubscriptions.
 */
observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) 
  return this._subscribe(typeof oOrOnNext === 'object' ?
    oOrOnNext :
    observerCreate(oOrOnNext, onError, onCompleted));
;

/**
 * Subscribes to the @link Observable and receives notifications for each element.
 * <p>
 * Alias to @link #subscribe(Action1)
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>@code forEach does not operate by default on a particular @link Scheduler.</dd>
 * </dl>
 * 
 * @param onNext
 *            @link Action1 to execute for each item.
 * @throws IllegalArgumentException
 *             if @code onNext is null
 * @throws OnErrorNotImplementedException
 *             if the Observable calls @code onError
 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
 */
public final void forEach(final Action1<? super T> onNext) 
    subscribe(onNext);


public final Disposable forEach(Consumer<? super T> onNext) 
    return subscribe(onNext);

【讨论】:

由于forEach没有返回subscribe的结果,所以不同的是返回值。这意味着您不能取消通过调用 forEach 启动的“订阅”。 感谢您的指出。这是他们在最近的代码上更新的部分,我只是相应地更新了我的答案。

以上是关于Rx.Observable subscribe 和 forEach 有啥区别的主要内容,如果未能解决你的问题,请参考以下文章

你必须知道的6个Rxjs的操作符

Rxjs 如何知道 observable 有多少订阅者?

.unsubscribe 和 .take(1) 的区别

从其他 Observable 获取值的 Rx Observable

如何声明 Rx.Observable.fromPromise() 的 TypeScript 返回类型

Spring Boot 2 - 将 Mono 转换为 rx.Observable?