RxJS之组合操作符 ( Angular环境 )

Posted 沙滩海风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJS之组合操作符 ( Angular环境 )相关的知识,希望对你有一定的参考价值。

一 merge操作符

把多个 Observables 的值混合到一个 Observable 中

import { Component, OnInit } from \'@angular/core\';
import { of } from \'rxjs/observable/of\';
import { range } from \'rxjs/observable/range\';
import { merge } from \'rxjs/observable/merge\';
import { Observable } from \'rxjs/Observable\';

@Component({
  selector: \'app-combine\',
  templateUrl: \'./combine.component.html\',
  styleUrls: [\'./combine.component.css\']
})
export class CombineComponent implements OnInit {

  constructor() { }

  ngOnInit() {
    const low: Observable<number> = range(100, 3);
    const middle: Observable<number> = range(200, 3);
    const high: Observable<number> = range(300, 3);
    merge(low, middle, high).subscribe((val: number) => {
      console.log(val);
    });
  }

}

合并是没有顺序的:先到达的值在合并后的Observable中先输出。

import { Component, OnInit } from \'@angular/core\';
import { merge } from \'rxjs/observable/merge\';
import { interval } from \'rxjs/observable/interval\';
import { map } from \'rxjs/operators/map\';
import { delay } from \'rxjs/operators/delay\';

@Component({
  selector: \'app-combine\',
  templateUrl: \'./combine.component.html\',
  styleUrls: [\'./combine.component.css\']
})
export class CombineComponent implements OnInit {

  constructor() { }

  ngOnInit() {
    let count = 0;
    const subscription = merge(
      interval(30).pipe(map(val => val + 300)),
      interval(40).pipe(map(val => val + 400)),
      interval(50).pipe(map(val => val + 500))
    ).pipe(delay(3000)) // 合并后的Observable,延迟3秒再开始的输出
      .subscribe(
        val => {
          count++;
          console.log(val);
          if (count >= 10) { // 只输出10个数
            subscription.unsubscribe();
          }
        }
      );
  }

}

 二 forkJoin操作符

forkJoin will wait for all passed Observables to complete and then it will emit an array with last values from corresponding Observables.

import { Component, OnInit } from \'@angular/core\';
import { Observable } from \'rxjs/Observable\';
import { forkJoin } from \'rxjs/observable/forkJoin\';
import { of } from \'rxjs/observable/of\';

@Component({
  selector: \'app-combine\',
  templateUrl: \'./combine.component.html\',
  styleUrls: [\'./combine.component.css\']
})
export class CombineComponent implements OnInit {

  constructor() { }

  ngOnInit() {
    forkJoin(
      of(1, 3, 5, 7),
      of(2, 4, 6, 8)
    ).subscribe(
      (arr: number[]) => {
        console.log(`next: ${arr[0]}, ${arr[1]}`);
      },
      null,
      () => {
        console.log(\'complete.\');
      }
    );
  }

}

处理并行的多个ajax请求 ( safari停止跨域限制 )

import { Component, OnInit } from \'@angular/core\';
import { Observable } from \'rxjs/Observable\';
import { forkJoin } from \'rxjs/observable/forkJoin\';
import { of } from \'rxjs/observable/of\';
import { HttpClient } from \'@angular/common/http\';

@Component({
  selector: \'app-combine\',
  templateUrl: \'./combine.component.html\',
  styleUrls: [\'./combine.component.css\']
})
export class CombineComponent implements OnInit {

  constructor(public http: HttpClient) { }

  ngOnInit() {
    forkJoin(
      this.http.get(\'https://www.baidu.com\', { responseType: \'text\' }),
      this.http.get(\'https://www.sogou.com\', { responseType: \'text\' })
    ).subscribe(
      (arr: any[]) => {
        const baidu = arr[0].substring(arr[0].indexOf(\'<title>\') + 7, arr[0].indexOf(\'</title>\'));
        const sogou = arr[1].substring(arr[1].indexOf(\'<title>\') + 7, arr[1].indexOf(\'</title>\'));
        console.log(baidu);
        console.log(sogou);
      }
    );
  }
}

三 startWith操作符

返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。

import { Component, OnInit } from \'@angular/core\';
import { HttpClient } from \'@angular/common/http\';
import { of } from \'rxjs/observable/of\';
import { startWith } from \'rxjs/operators/startWith\';

@Component({
  selector: \'app-combine\',
  templateUrl: \'./combine.component.html\',
  styleUrls: [\'./combine.component.css\']
})
export class CombineComponent implements OnInit {

  constructor(private http: HttpClient) { }

  ngOnInit() {
    of(\'Mikey\', \'Don\').pipe(
      startWith(\'Leo\', \'Raph\')
    ).subscribe(
      (val: string) => {
        console.log(val);
      }
    );
  }
}

 

以上是关于RxJS之组合操作符 ( Angular环境 )的主要内容,如果未能解决你的问题,请参考以下文章

RxJS之转化操作符 ( Angular环境 )

RxJS之过滤操作符 ( Angular环境 )

Angular之Rxjs基础操作

Angular 7 RxJs - HTTP 调用未在 ForkJoin 中完成

Angular-RxJS- 在长 API 调用中间隐藏微调器并继续 API 调用

Angular使用总结 --- 搜索场景中使用rxjs的操作符