一个一个可观察的 RxJS

Posted

技术标签:

【中文标题】一个一个可观察的 RxJS【英文标题】:One by one observables RxJS 【发布时间】:2022-01-23 18:45:34 【问题描述】:

我对可观察对象有疑问。我已经准备好stackblitz 来简化我的问题。

我有 2 个可观察对象(obs1$、obs2$)和数字数组。 我想等待 obs1$ 完成,然后遍历数组并返回每个元素的 observable,运行 obs2$。

功能代码如下:

oneByOneObservables(): Observable<unknown> 
  const obs1$ = of(1, 2, 3);
  const arr = [4, 5, 6];
  const obs2$ = of(7, 8, 9);

  return obs1$.pipe(
    concat(() => arr.map((item) => of(item))),
    () => obs2$
  );

我有一个错误:

没有重载匹配这个调用。 最后一个重载给出了以下错误。 '() => Observable[]' 类型的参数不可分配给 'SchedulerLike | 类型的参数可观察输入'。 '() => Observable[]' 类型中缺少属性 '[Symbol.iterator]' 但在 'Iterable' 类型中是必需的。

感谢您的帮助

【问题讨论】:

obs2$ 应该在什么时候运行?在从 obs1$ 返回的每个 observables 完成之后? @LukaszGawrys 没关系。对我来说最重要的是在 obs1$ 完成后从数组中运行 obs2$ 和 observables obs1$ 中,您发出 1,2,3 - 您是否关心这些值中的每一个,还是只需要继续使用 3 作为最后发出的值?或者你根本不关心值,只需要知道 Observable 是否完成? 顺便说一句。由于某些错误的导入,您的 Stackblitz 无法开箱即用。 我不关心这些值。我只需要知道 obs1$ 是否完成。 【参考方案1】:

这里有一些代码:

等待 obs1$ 完成 ✓ 然后循环遍历数组并返回每个元素的 observable ✓ (目前这是一个 noop 浪费了几个 cpu 周期。创建一个 observable 没有任何作用,你打算订阅这些吗?按顺序?一次?由你决定,我猜) 运行 obs2$ ✓
function oneByOneObservables(): Observable<number> 
  const obs1$ = of(1, 2, 3);
  const arr = [4, 5, 6];
  const obs2$ = of(7, 8, 9);

  return obs1$.pipe(
    concatWith(defer(() => 
      arr.map((item) => of(item))
      return obs2$
    ))
  );


oneByOneObservables().subscribe(console.log);

输出:

1
2
3
7
8
9

一个例子,你一个接一个地订阅 observables 数组。

function oneByOneObservables(): Observable<number> 
  const obs1$ = of(1, 2, 3);
  const arr = [4, 5, 6];
  const obs2$ = of(7, 8, 9);

  return obs1$.pipe(
    concatWith(defer(() => 
      concat(...arr.map((item) => of(item)))
    )),
    concatWith(obs2$)
  );


oneByOneObservables().subscribe(console.log)

输出:

1
2
3
4
5
6
7
8
9

【讨论】:

【参考方案2】:

所以这可以解决问题。我添加了一些日志来追踪它。

这里也是Stackblitz。

import  Component  from '@angular/core';
import  ChangeDetectionStrategy  from '@angular/core';
import  Observable, of, forkJoin  from 'rxjs';
import  concatMap, tap, last  from 'rxjs/operators';

@Component(
  selector: 'my-app',
  styleUrls: ['./app.component.scss'],
  templateUrl: './app.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush,
)
export class AppComponent 
  oneByOneObservables(): Observable<unknown> 
    const obs1$ = of(1, 2, 3).pipe(tap((v) => console.log('Obs1', v)));
    const arr = [4, 5, 6];
    const obs2$ = of(7, 8, 9);

    return obs1$.pipe(
      last(),
      tap((v) => console.log('Obs1 last value', v)),
      concatMap(() => forkJoin(arr.map((item) => of(item)))),
      tap((v) => console.log('Array of observables value', v)),
      concatMap(() => obs2$),
      tap((v) => console.log('Obs2 value', v))
    );
  

【讨论】:

【参考方案3】:

你可以使用switchMap吗?虽然我不确定您正在寻找的 observables 的输出是什么。

oneByOneObservables(): Observable<unknown> 
  const obs1$ = of(1, 2, 3);
  const arr = [4, 5, 6];
  const obs2$ = of(7, 8, 9);

  return obs1$.pipe(
    concat(() => arr.map((item) => of(item))),
    switchMap(() => obs2$)
  );

【讨论】:

以上是关于一个一个可观察的 RxJS的主要内容,如果未能解决你的问题,请参考以下文章

可观察的数组到数组(Rxjs)

rxjs中常用的操作符

RxJs:根据字段值更改创建可观察对象

Pipe RxJS 可观察到现有主题

submitForm 上的 Angular 2 RxJS 可观察取消订阅

在 foreach 中等待几个可观察的 rxjs 直到完成执行另一个