在RxJS中,Observer是否被注入到Observable执行中?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在RxJS中,Observer是否被注入到Observable执行中?相关的知识,希望对你有一定的参考价值。
我已经多次阅读了ReactiveX文档,但仍然不能完全理解当Observer订阅Observable时会发生什么。
我们来看一个简单的例子:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
});
const observer = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
observable.subscribe(observer);
我的问题:
传递给Observable的subscriber
对象来自哪里?
在
observable.subscribe
的subscribe
和new Observable(function subscribe(subscriber) {...})
具有相同的名称并非巧合。在图书馆中,它们是不同的,但出于实际目的,您可以认为它们在概念上是平等的。
所以,显然传递给Observable构造函数(subscriber
)中的subscribe回调的对象实际上并不是observer
对象。至少不是如果你按照上面的引用来了解库的实际工作方式。
如果它不是传入的observer
对象,那么subscriber.next(1)
和subscribe.complete()
调用究竟是什么?这怎么连接到next
的observer
财产?
澄清编辑:
我知道如何利用RxJS,事实上,人们可以从概念上想象注入观察者(正如引言所说)。但是,我在这里想要了解它是如何工作的。
Observable
创建过程如下:
Observable
由作者定义(为了解释的目的,这里用new
手动):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
传递给上面的subscribe
的Observable
回调由Observable constructor在本地保存:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
所以,我们有完整的subscribe
函数,由我们或任何其他预先制作的Observable
定义,保存下来以便以后执行。
观察者可以用几种形式之一传递给subscribe
回调。或者,作为一到三个函数直接(下一个,错误,完成),或作为具有相同三个方法中的一个或多个的对象。出于解释的目的,我们将实现最后一个更详细的选项:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
现在,有趣的部分开始了。我们将observer
传递给Observable.subscribe(..)
方法:
myObserver.subscribe(observer);
subscribe method看起来像这样:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
简要介绍一下,subscribe
方法:
- 以前面讨论的形式之一接收
observer
toSubscriber
将观察者转换为Subscriber
对象,无论其传递形式(Subscriber
实例保存在sink
变量中)- 注意:除非您订阅运营商,否则
operator
变量是undefined
。因此,只需忽略围绕if
的operator
语句 Subscriber
扩展(原型链接到)Subscription
对象,它的原型有两个重要方法:unsubscribe()
,add()
add(..)
用于向Observable
添加“拆除逻辑”(函数),这将在Observable
完成或取消订阅时运行。它将传递给它的任何函数,将其包装在Subscription
对象中,并将函数放入Subscription
的_unsubscribe
变量中。这个Subscription
保存在我们上面创建的Subscriber
中,在一个名为_subscriptions
的变量中。如上所述,我们做了所有这些,以便当Subscriber
取消订阅或完成时,所有add()
'ed拆卸逻辑执行- 作为旁注,
Observable.subscribe()
返回Subscriber
实例。因此,您可以随时调用mySubscriber.add( // some tear down logic)
来添加将在Observable
完成或取消订阅时执行的函数 - 现在有一个重要的部分包括:
this._trySubscribe(sink)
运行(在add()
内部,作为参数)。_trySubscribe(..)
是实际运行subscribe
构造函数之前保存的Observable
回调的函数。重要的是,它传递sink
(我们新的Subscriber
实例)作为Observable
回调的回调。换句话说,当subscriber.next(1)
内部的Observable
执行时,我们实际上在next(1)
(sink
)实例中执行Subscriber
(next()
在Subscriber
的原型上)。
所以,这让我走到了尽头。除了其他方面,toSubscribe
内部和取消订阅过程中还有更多细节,但这些不在本问答范围内。
简而言之,为了回答标题中的问题,观察者确实被传递到Observable
,只是在被转换为统一的Subscriber
对象之后。
希望这将有助于将来的其他人。
不,观察者不会注入观察者。
AFAICT,混淆源于这样一个事实:new Observable(...)
语法更像是一个低级别的工厂,而不是一个有用的模式。
它或多或少是由of(value1, value2, ..., valueN)
,from(enumeration)
和fromEvent(...)
等更直接的实现所使用的机制。
这种方法是您应该关注的实际用例。
在所有这些方法中,所有这些方法都将某种同步或异步值或交互桥接到可观察流的精彩世界中。为此,他们以某种方式表现得像一个正确的观察者:他们生成物品并将它们放入流中。为此,他们使用一个名为next
的函数。就像Observer
实现中的方法一样,实际上也会以相同的方式调用bacause。
特别是,您可以在此处查看subscribe方法的实现:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts
如果您想了解订阅期间实际发生的事情,我建议您实际查看代码。但是,IMO,你应该在熟悉了各种Observable创建函数之后才能尝试。
希望能帮助到你。
以上是关于在RxJS中,Observer是否被注入到Observable执行中?的主要内容,如果未能解决你的问题,请参考以下文章
Rxjs笔记三:Observer的简写形式以及退订Observable
你如何使用 RxJS v5 返回 new Observable(function(observer) ...?