RXJS Observable的冷,热和Subject

Posted starof

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RXJS Observable的冷,热和Subject相关的知识,希望对你有一定的参考价值。

一、Observable的冷和热

 

Observable 热:直播。所有的观察者,无论进来的早还是晚,看到的是同样内容的同样进度,订阅的时候得到的都是最新时刻发送的值。

Observable 冷:点播。 新的订阅者每次从头开始。

冷的Observable例子:

一开始有个订阅者,

两秒后又有个订阅者,这两个序列按照自己的节奏走的,不同步。每个流进行都会从interval的0开始。

console.log(\'RxJS included?\', !!Rx);

const count$ = Rx.Observable.interval(1000).take(5);
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

热的Observable例子

第二个订阅者直接从2开始起,跟第一个订阅者看到的内容是一样的。

const count$ = Rx.Observable.interval(1000).take(5).share();
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

 

二、Subject

Subject即是观察者Observer,也是被观察对象Observable,同时实现了这两个接口。

意味着

  • 一方面它可以作为流的组成的一方,输出的一方。
  • 另一方面,它可以作为流的观察一方,接收一方。

Subject分为ReplaySubject和BehaviorSubject。

ReplaySubject:这种Subject会保留最新的n个值

BehaviorSubject:是ReplaySubject的特殊形式。 保留最新的一个值

【20200529】

拿subject做一个observer观察者,看Observable会丢什么东西出来,由它对外广播出去。

再拿subject去订阅两个观察者。

有n个observable去订阅subject,但是subject只会发出一个订阅的要求订阅原始observable。

1、subscribe的等价写法

subscribe 后面写的一个函数,相当于语法糖,快捷方式,临时创建冷一个observer对象。

默认情况应该是传入一个observer对象

console.log(\'RxJS included?\', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log(\'1: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 1:\'+ err);},
  complete: ()=>{console.log(\'1 is complete\');}
}


const observer2 = {
  next: (val)=>{console.log(\'2: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 2:\'+ err);},
  complete: ()=>{console.log(\'2 is complete\');}
}

//等价写法
counter$.subscribe(val =>{console.log(val);});
counter$.subscribe(observer2); 

2、两个observer ,两次subscribe

console.log(\'RxJS included?\', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log(\'1: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 1:\'+ err);},
  complete: ()=>{console.log(\'1 is complete\');}
}


const observer2 = {
  next: (val)=>{console.log(\'2: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 2:\'+ err);},
  complete: ()=>{console.log(\'2 is complete\');}
}

counter$.subscribe(observer1);

setTimeout(function(){
  counter$.subscribe(observer2);
},2000);
View Code

 

问题:需要在两处执行subscribe,很多情况下是这样的,定义好这些序列应该在什么时候被触发,我执行执行一句subscribe(),两个序列都会这么执行。这种情况下就需要用subject()。

3、subject

subject即使observable,因为它可以subscribe observer。

也是observer,因为它可以被observable subscribe。

 

console.log(\'RxJS included?\', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();


const observer1 = {
  next: (val)=>{console.log(\'1: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 1:\'+ err);},
  complete: ()=>{console.log(\'1 is complete\');}
}


const observer2 = {
  next: (val)=>{console.log(\'2: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 2:\'+ err);},
  complete: ()=>{console.log(\'2 is complete\');}
}

//不再用counter$去subscribe,用subject去subscribe, 
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定义好两边后,用counter$去subscribe
counter$.subscribe(subject);
View Code

 

一句执行counter$.subscribe(subject),把定义好的序列,包括等待2秒的序列全部完成了。

4,subject是一个hot observable

往流里推送新值

 第二个拿不到新值,因为第二个流订阅的时候,两个新值已经过去了。

5,ReplaySubject

replay把过去发生的事件进行重播。

ReplaySubject(2)把过去的2个事件进行重播。这样observer1 subscribe的时候就可以看到10和11。

6、BehaviorSubject只记住最新的值

总有一个最新值,总记住上一次的最新值

console.log(\'RxJS included?\', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.BehaviorSubject();


subject.next(10);
subject.next(11);
const observer1 = {
  next: (val)=>{console.log(\'1: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 1:\'+ err);},
  complete: ()=>{console.log(\'1 is complete\');}
}


const observer2 = {
  next: (val)=>{console.log(\'2: \' +val);},
  error: (err)=>{console.log(\'ERROR>> 2:\'+ err);},
  complete: ()=>{console.log(\'2 is complete\');}
}


//不再用counter$去subscribe,用subject去subscribe, 
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定义好两边后,用counter$去subscribe
counter$.subscribe(subject);
View Code

 

取值的时候,会取得到最新的data,尽管在取值的时候也就是subscribre的时候值已经发射完了,尽管时机已经错失了还是能够得到它上一次发射之后的最新的一个值。

 

 

三、Angular中对Rx的支持

大量内置Observable支持:如Http,ReactiveForms,Route等。

Async Pipe是什么?有什么用?

 

Observable需要subscribe 一下,成员数组变量等于Observable得到的值。

使用Async Pipe可以直接使用Observable,还不用去取消订阅。

memberResults$: Observable<User[]>; 

 

本文作者starof,因知识本身在变化,作者也在不断学习成长,文章内容也不定时更新,为避免误导读者,方便追根溯源,请诸位转载注明出处:https://www.cnblogs.com/starof/p/10505617.html 有问题欢迎与我讨论,共同进步。

以上是关于RXJS Observable的冷,热和Subject的主要内容,如果未能解决你的问题,请参考以下文章

有 2 个或更多组件订阅由 Injectable 提供的冷 Observable [重复]

ionic - RXJS 错误:rxjs_Observable__.Observable.combineLatest 不是函数

从角度 4 中的“rxjs/Observable”导入 Observable 时出错

Rxjs:Observable.combineLatest vs Observable.forkJoin

rxjs Observable 导入问题

Angular js 2 'node_modules/rxjs/Observable"' 没有导出的成员 'Observable'。导入 Observable