rxjs 生成函数

Posted GoldenaArcher

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rxjs 生成函数相关的知识,希望对你有一定的参考价值。

rxjs 生成函数

生成函数 creation functions,AKA creation operators,这里挑几个比较重要的说一下,其他的都可以触类旁通。

of

of 会接受一串数据并且将其返回为一个 Observable,简单的使用方式如下:

代码:

import  Observable, of  from 'rxjs';

of('Alice', 'Ben', 'Charlie').subscribe(
  next: (val) => console.log(val),
  complete: () => console.log('Completed'),
);

console.log('---------------------------');

const names$ = new Observable()<string>((subscriber) => 
  subscriber.next('Alice');
  subscriber.next('Ben');
  subscriber.next('Charlie');
  subscriber.complete();
);

names$.subscribe(
  next: (val) => console.log(val),
  complete: () => console.log('Completed'),
);

// 手写实现的of
function ourOwnOf(...args: string[]): Observable<string> 
  return new Observable()<string>((subscriber) => 
    for (const arg of args) 
      subscriber.next(arg);
    
    subscriber.complete();
  );


console.log('---------------------------');

ourOwnOf('Alice', 'Ben', 'Charlie').subscribe(
  next: (val) => console.log(val),
  complete: () => console.log('Completed'),
);

from

from 与 of 很相似,不过 of 是接受多个参数,并将所有的参数返回为一个 observable,from 接受的参数可以有 array/promise/generator/iterator,并将其转化为一个 observable。

使用方式如下:

import  from  from 'rxjs';

from(['Alice', 'Ben', 'Charlie']).subscribe(
  next: (val) => console.log(val),
  complete: () => console.log('Completed\\n------------------'),
);

const promise = new Promise((res, rej) => 
  // res('Resolved');
  rej('Rejected');
);

const obsPromise$ = from(promise);
obsPromise$.subscribe(
  next: (val) => console.log(val),
  error: (err) => console.error(err),
  complete: () => console.log('Promise Completed\\n------------------'),
);

fromEvent

fromEvent 从名字上来说和 from 很像,实际上也挺像的,不过这里接受的是一系列的事件,包括
DOM EventTarget, Node.js EventEmitter, JQuery Events。

一个简单的理解就是:

subscribe fromEvent 相当于 addEventListener,unsubscribe 相当于 removeEventListener,不过 rxjs 有更多的实现。

marble 图大概如下:

DOM event:
---A---B-C----D---E-----F-------G---...--->
subscribe           unsubscribe
   A---B-C----D---E-|>

整个事件流是可以一直运行下去,而 subscribe/unsubscribe 只是截取了中间一部分的流进行操作。

import  fromEvent, Observable  from 'rxjs';

const triggerButton = document.querySelector('button#trigger');

const clickEvent$ = fromEvent < MouseEvent > (triggerButton, 'click');

// const subscription = clickEvent$.subscribe(
//   next: (event) => console.log(event.type, event.x, event.y),
// );

const triggerClick$ =
  new Observable() <
  MouseEvent >
  ((subscriber) => 
    triggerButton.addEventListener('click', (event: MouseEvent) => 
      console.log('Event cb executed');
      subscriber.next(event);
    );
  );

const subscription = triggerClick$.subscribe(
  next: (event) => console.log(event.type, event.x, event.y),
);

setTimeout(() => 
  console.log('Unsubscribe');
  subscription.unsubscribe();
, 5000);

这里就会触发一个 memory leak 的问题:

可以看到,尽管 subscription 已经 unsubscribe 了,但是鼠标点击依旧会触发回调函数中的调用。这就是因为事件添加到了按钮上,但是却从来没有被移除,因此需要在 teardown 部分中进行清理:

const triggerClick$ =
  new Observable() <
  MouseEvent >
  ((subscriber) => 
    const clickHandler = (event: MouseEvent) => 
      console.log('Event cb executed');
      subscriber.next(event);
    ;
    triggerButton.addEventListener('click', clickHandler);

    return () => 
      triggerButton.removeEventListener('click', clickHandler);
    ;
  );

⚠️:如果手动实现 Observable,并且会触发 side effect,那就一定需要实现清理。

timer

timer 就像是 setTimeout,不过和上面的事件一样,如果手动实现的话,需要注意清理 side effect。

import  timer  from 'rxjs';

const padEnd = (str) => str.padEnd(20, ' ');

console.log(padEnd('app started:') + new Date());

timer(2000).subscribe(
  next: (_) => console.log(padEnd('executed:') + new Date()),
  complete: () => console.log(padEnd('completed:') + new Date()),
);

interval

和 timer 差不多的实现方式,同样记得需要清理副作用。

interval(1000).subscribe((val) => console.log(val));

forkJoin

forkJoin 可以接受多个 HTTP 请求并返回一个 Observable,marble 图如下:

A
------------------A---|>
B
-----B----------------|>

forkJoin([A,B])
---------------[A,B]--|>

A 和 B 是两个流,当使用 forkJoin 后,那么这个 Observable 就会等所有的流被处理完毕之后再进行操作。

这是不使用 forkJoin 的操作,可以看到返回的结果是不能保证顺序的。

import  ajax  from 'rxjs/ajax';

const randomName$ = ajax('https://random-data-api.com/api/name/random_name');
const randomNation$ = ajax(
  'https://random-data-api.com/api/nation/random_nation'
);
const randomFood$ = ajax('https://random-data-api.com/api/food/random_food');

randomName$.subscribe((res) => console.log(res.response.first_name));
randomNation$.subscribe((res) => console.log(res.response.capital));
randomFood$.subscribe((res) => console.log(res.response.dish));

使用 forkJoin 就可以保证返回的顺序(有点像 Promise.all):

import  forkJoin  from 'rxjs';
import  ajax  from 'rxjs/ajax';

const randomName$ = ajax('https://random-data-api.com/api/name/random_name');
const randomNation$ = ajax(
  'https://random-data-api.com/api/nation/random_nation'
);
const randomFood$ = ajax('https://random-data-api.com/api/food/random_food');

forkJoin([randomName$, randomNation$, randomFood$]).subscribe(
  next: ([nameAjax, nationAjax, foodAjax]) => 
    console.log(
      `$nameAjax.response.first_name is from $nationAjax.response.capital and likes to eat $foodAjax.response.dish`
    );
  ,
);

如同其他的 Observable 一样,一旦会出现报错的情况,那么就会终止 Observable,并且发送的 http 请求也会被取消掉:

combineLatest

combineLatest 会接受多个 input 变化进行操作,marble 图如下:

A
---A-------B---C----------|>
B
------1--------2---|>
combineLatest([A,B])
----[A,1]-[B,1]-[C,2]-----|>

combineLatest 的两个特性为:

  • combineLatest 的触发的需求是每个流需要返回至少一个值才能启发

    如果其中的一个 observable 完成了,combineLatest 依旧会继续执行,直到最后一个 observable 完成。

  • 像上面的图一样,每一个流发出一个值,那么 combineLatest 就会发出一个数据。

下面实现这个温度转换器需要两个值:F --> C 或是 C --> F (华氏度转摄氏度或者摄氏度转华氏度),以及输入的温度,二者当中任何一个值的变化都会引发结果的改变。

import  combineLatest, fromEvent  from 'rxjs';

const temperatureInput = document.getElementById('temperature-input');
const conversionDropdown = document.getElementById('conversion-dropdown');
const resultText = document.getElementById('result-text');

const temperatureInputEvent$ = fromEvent(temperatureInput, 'input');
const conversionInputEvent$ = fromEvent(conversionDropdown, 'input');

combineLatest([temperatureInputEvent$, conversionInputEvent$]).subscribe(
  ([temperatureInputEvent, conversionInputEvent]) =>
    console.log(
      temperatureInputEvent.target['value'],
      conversionInputEvent.target['value']
    )
);

完整的实现如下:

import  combineLatest, fromEvent  from 'rxjs';

const temperatureInput = document.getElementById('temperature-input');
const conversionDropdown = document.getElementById('conversion-dropdown');
const resultText = document.getElementById('result-text');

const temperatureInputEvent$ = fromEvent(temperatureInput, 'input');
const conversionInputEvent$ = fromEvent(conversionDropdown, 'input');

combineLatest([temperatureInputEvent$, conversionInputEvent$]).subscribe(
  ([temperatureInputEvent, conversionInputEvent]) => 
    const temperature = temperatureInputEvent.target['value'];
    const conversion = conversionInputEvent.target['value'];

    let res: number;

    if (conversion === 'f-to-c') 
      res = ((temperature - 32) * 5) / 9;
     else 
      res = (temperature * 9) / 5 + 32;
    

    resultText.innerText = String(res);
  
);

同样的,如果报错了,那么整个执行就会终止。

以上是关于rxjs 生成函数的主要内容,如果未能解决你的问题,请参考以下文章

Angular RxJs:针对异步数据流编程工具

rxjs学习响应式编程理解

rxjs Observable 两大类操作符简介

在rxjs6中以随机间隔生成值流?

使用 combineLatest 和 Rxjs 返回 observable 的结果

RxJS 与 函数式编程 - 函数式编程