等到第二个 Observable 发出

Posted

技术标签:

【中文标题】等到第二个 Observable 发出【英文标题】:wait Until a second Observable emits 【发布时间】:2020-07-02 03:16:39 【问题描述】:

在 Rxjs 中,有管道 takeUntil,但没有管道 wait Until,这使得当前的 observable 等待seconde Observable 发射。

我的最终目标是让许多 Observable 仍在等待,直到我的 Observable init$ 只发出一个值,才能继续执行。所以我的 Observable init$ 必须执行一次,在此之前我的其他 observable 必须等到 inits 发出任何不同于 null 的值。

在这个简单的例子中,我想添加一个管道到 pipedSourcewait Until init$ ,所以源必须等到init$ 发出来发出它的值。

import  interval, timer, Subject, combineLatest  from 'rxjs';
import  takeUntil, skipWhile, skipUntil, concatMap, map, take  from 'rxjs/operators';
import  BehaviorSubject  from 'rxjs';

const init$ = new BehaviorSubject(null);

const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
    skipWhile((res)=> res === null)
    //wait until init$ emits a non null value
)

//first subscription to source
pipedSource.subscribe(val => console.log(val));

source.next(profile:"me");

//init emits once
setTimeout(()=>
  init$.next(1);
,2000);

// a second subscription to source
setTimeout(()=>
  pipedSource.subscribe(val => console.log(val));
,3000);

想要的结果:

//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile" 

【问题讨论】:

所以你想跳过直到? 第一个obs,被订阅了。当订阅中发生正确的事情时,会发出另一个事件。订阅该事件的任何人只会在第一件事完成后收到通知。 【参考方案1】:

当第一个 observable 发出非空值时,您希望运行第二个 observable。为此,请在skipWhile 之后使用concatMapswitchMap

ngOnInit() 
  const init$ = new BehaviorSubject(null);

  const source = new BehaviorSubject(profile:"me");
  const pipedSource = init$
  .pipe(
      skipWhile((res)=> res === null),
      concatMap(() => source)
  );

  pipedSource.subscribe(val => console.log('first', val));

  //init emits once
  setTimeout(()=>
    init$.next(1);
  ,2000);

  // a second subscription to source
  setTimeout(()=>
    pipedSource.subscribe(val => console.log('second', val));
  , 3000);

这里我先订阅init$ observable,等待它发出非空值,然后切换到source observable。

演示:https://stackblitz.com/edit/angular-p7kftd

【讨论】:

在这里,在 await$ 发出后,我得到的是 wait$ 的值而不是 source 值,我更新了我的问题以更准确,请您更新您的答案。【参考方案2】:

如果我理解你的问题,我看到了 2 个潜在案例。

第一个是你的source Observable 开始独立于wait$ 发出它的值。当wait$ 发出时,您才开始使用source 发出的值。这种行为可以使用combineLatest 函数来实现,像这样

//emits value every 500ms
const source$ = interval(500);

combineLatest(source$, wait$)
    .pipe(
        map(([s, v]) => s), // to filter out the value emitted by wait$
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => 
    wait$.next(1);
, 2000);

在这种情况下,您在控制台上看到的打印内容是从 2 开始的序列,因为 wait$ 在 2 秒后发出。

第二种情况是当您希望 source 仅在 wait$ 发出后才开始发出其值。在这种情况下,您可以使用switchMap 运算符,例如此处

const wait_2$ = new Subject();

//emits value every 500ms
const source_2$ = interval(500);

wait_2$
    .pipe(
        switchMap(() => source_2$),
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => 
    wait_2$.next(1);
, 4000);

在这种情况下,4 秒后,以0 开头的序列会打印在控制台上。

【讨论】:

我更新我的问题更准确,你能更新你的答案吗

以上是关于等到第二个 Observable 发出的主要内容,如果未能解决你的问题,请参考以下文章

如何让一个 Observable 序列在发射前等待另一个完成?

Rx 操作符五

RxSwift:如何让一个 observable 触发另一个?

RxJS之转化操作符 ( Angular环境 )

Observable 里的常见操作符理解

性能测试中容易混淆的概念