在RxJS中,Observer是否被注入到Observable执行中?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在RxJS中,Observer是否被注入到Observable执行中?相关的知识,希望对你有一定的参考价值。

我已经多次阅读了ReactiveX文档,但仍然不能完全理解当Observer订阅Observable时会发生什么。

我们来看一个简单的例子:

import { Observable } from 'rxjs'; 

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.complete();
});

const observer = {
  next: (x) => console.log('got value ' + x),
  error: (err) => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done')
};

observable.subscribe(observer);

StackBlitz code


我的问题:

传递给Observable的subscriber对象来自哪里?

来自RxJS documentation

observable.subscribesubscribenew Observable(function subscribe(subscriber) {...})具有相同的名称并非巧合。在图书馆中,它们是不同的,但出于实际目的,您可以认为它们在概念上是平等的。

所以,显然传递给Observable构造函数(subscriber)中的subscribe回调的对象实际上并不是observer对象。至少不是如果你按照上面的引用来了解库的实际工作方式。

如果它不是传入的observer对象,那么subscriber.next(1)subscribe.complete()调用究竟是什么?这怎么连接到nextobserver财产?


澄清编辑:

我知道如何利用RxJS,事实上,人们可以从概念上想象注入观察者(正如引言所说)。但是,我在这里想要了解它是如何工作的。

答案

Observable创建过程如下:

Observable由作者定义(为了解释的目的,这里用new手动):

const myObservable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
  return function tearDownLogic() {
    console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
  }
});

传递给上面的subscribeObservable回调由Observable constructor在本地保存:

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

所以,我们有完整的subscribe函数,由我们或任何其他预先制作的Observable定义,保存下来以便以后执行。

观察者可以用几种形式之一传递给subscribe回调。或者,作为一到三个函数直接(下一个,错误,完成),或作为具有相同三个方法中的一个或多个的对象。出于解释的目的,我们将实现最后一个更详细的选项:

const observer = {
  next(v) {
    console.log(v);
  }
  error(err) {
    console.log(err);
  }
  complete() {
    console.log('Observable has now completed and can no longer emit values to observer');
  }
}

现在,有趣的部分开始了。我们将observer传递给Observable.subscribe(..)方法:

myObserver.subscribe(observer);

subscribe method看起来像这样:

  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {


    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);


    if (operator) {
      sink.add(operator.call(sink, this.source));
    } else {
      sink.add(
        this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
        this._subscribe(sink) :
        this._trySubscribe(sink)
      );
    }


    if (config.useDeprecatedSynchronousErrorHandling) {
      if (sink.syncErrorThrowable) {
        sink.syncErrorThrowable = false;
        if (sink.syncErrorThrown) {
          throw sink.syncErrorValue;
        }
      }
    }


    return sink;
  }

简要介绍一下,subscribe方法:

  1. 以前面讨论的形式之一接收observer
  2. toSubscriber将观察者转换为Subscriber对象,无论其传递形式(Subscriber实例保存在sink变量中)
  3. 注意:除非您订阅运营商,否则operator变量是undefined。因此,只需忽略围绕ifoperator语句
  4. Subscriber扩展(原型链接到)Subscription对象,它的原型有两个重要方法:unsubscribe()add()
  5. add(..)用于向Observable添加“拆除逻辑”(函数),这将在Observable完成或取消订阅时运行。它将传递给它的任何函数,将其包装在Subscription对象中,并将函数放入Subscription_unsubscribe变量中。这个Subscription保存在我们上面创建的Subscriber中,在一个名为_subscriptions的变量中。如上所述,我们做了所有这些,以便当Subscriber取消订阅或完成时,所有add()'ed拆卸逻辑执行
  6. 作为旁注,Observable.subscribe()返回Subscriber实例。因此,您可以随时调用mySubscriber.add( // some tear down logic)来添加将在Observable完成或取消订阅时执行的函数
  7. 现在有一个重要的部分包括:this._trySubscribe(sink)运行(在add()内部,作为参数)。 _trySubscribe(..)是实际运行subscribe构造函数之前保存的Observable回调的函数。重要的是,它传递sink(我们新的Subscriber实例)作为Observable回调的回调。换句话说,当subscriber.next(1)内部的Observable执行时,我们实际上在next(1)sink)实例中执行Subscribernext()Subscriber的原型上)。

所以,这让我走到了尽头。除了其他方面,toSubscribe内部和取消订阅过程中还有更多细节,但这些不在本问答范围内。

简而言之,为了回答标题中的问题,观察者确实被传递到Observable,只是在被转换为统一的Subscriber对象之后。

希望这将有助于将来的其他人。

另一答案

不,观察者不会注入观察者。

AFAICT,混淆源于这样一个事实:new Observable(...)语法更像是一个低级别的工厂,而不是一个有用的模式。

它或多或少是由of(value1, value2, ..., valueN)from(enumeration)fromEvent(...)等更直接的实现所使用的机制。

这种方法是您应该关注的实际用例。

在所有这些方法中,所有这些方法都将某种同步或异步值或交互桥接到可观察流的精彩世界中。为此,他们以某种方式表现得像一个正确的观察者:他们生成物品并将它们放入流中。为此,他们使用一个名为next的函数。就像Observer实现中的方法一样,实际上也会以相同的方式调用bacause。

特别是,您可以在此处查看subscribe方法的实现:

https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts

如果您想了解订阅期间实际发生的事情,我建议您实际查看代码。但是,IMO,你应该在熟悉了各种Observable创建函数之后才能尝试。

希望能帮助到你。

以上是关于在RxJS中,Observer是否被注入到Observable执行中?的主要内容,如果未能解决你的问题,请参考以下文章

RxJS - 发生错误时观察到的不会完成

Rxjs笔记三:Observer的简写形式以及退订Observable

你如何使用 RxJS v5 返回 new Observable(function(observer) ...?

如何在RXJS 6.3.3中使用ForkJoin导入Observer?

如何在Angular 2中创建RxJS主题?

[RxJS] Subject: an Observable and Observer hybrid