等到第二个 Observable 发出
Posted
技术标签:
【中文标题】等到第二个 Observable 发出【英文标题】:wait Until a second Observable emits 【发布时间】:2020-07-02 03:16:39 【问题描述】:在 Rxjs 中,有管道 takeUntil,但没有管道 wait Until,这使得当前的 observable 等待seconde Observable 发射。
我的最终目标是让许多 Observable 仍在等待,直到我的 Observable init$ 只发出一个值,才能继续执行。所以我的 Observable init$ 必须执行一次,在此之前我的其他 observable 必须等到 inits 发出任何不同于 null 的值。
在这个简单的例子中,我想添加一个管道到 pipedSource 做 wait Until init$ ,所以源必须等到init$ 发出来发出它的值。
import interval, timer, Subject, combineLatest from 'rxjs';
import takeUntil, skipWhile, skipUntil, concatMap, map, take from 'rxjs/operators';
import BehaviorSubject from 'rxjs';
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
skipWhile((res)=> res === null)
//wait until init$ emits a non null value
)
//first subscription to source
pipedSource.subscribe(val => console.log(val));
source.next(profile:"me");
//init emits once
setTimeout(()=>
init$.next(1);
,2000);
// a second subscription to source
setTimeout(()=>
pipedSource.subscribe(val => console.log(val));
,3000);
想要的结果:
//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile"
【问题讨论】:
所以你想跳过直到? 第一个obs,被订阅了。当订阅中发生正确的事情时,会发出另一个事件。订阅该事件的任何人只会在第一件事完成后收到通知。 【参考方案1】:当第一个 observable 发出非空值时,您希望运行第二个 observable。为此,请在skipWhile
之后使用concatMap
或switchMap
。
ngOnInit()
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject(profile:"me");
const pipedSource = init$
.pipe(
skipWhile((res)=> res === null),
concatMap(() => source)
);
pipedSource.subscribe(val => console.log('first', val));
//init emits once
setTimeout(()=>
init$.next(1);
,2000);
// a second subscription to source
setTimeout(()=>
pipedSource.subscribe(val => console.log('second', val));
, 3000);
这里我先订阅init$
observable,等待它发出非空值,然后切换到source
observable。
演示:https://stackblitz.com/edit/angular-p7kftd
【讨论】:
在这里,在 await$ 发出后,我得到的是 wait$ 的值而不是 source 值,我更新了我的问题以更准确,请您更新您的答案。【参考方案2】:如果我理解你的问题,我看到了 2 个潜在案例。
第一个是你的source
Observable 开始独立于wait$
发出它的值。当wait$
发出时,您才开始使用source
发出的值。这种行为可以使用combineLatest
函数来实现,像这样
//emits value every 500ms
const source$ = interval(500);
combineLatest(source$, wait$)
.pipe(
map(([s, v]) => s), // to filter out the value emitted by wait$
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() =>
wait$.next(1);
, 2000);
在这种情况下,您在控制台上看到的打印内容是从 2
开始的序列,因为 wait$
在 2 秒后发出。
第二种情况是当您希望 source
仅在 wait$
发出后才开始发出其值。在这种情况下,您可以使用switchMap
运算符,例如此处
const wait_2$ = new Subject();
//emits value every 500ms
const source_2$ = interval(500);
wait_2$
.pipe(
switchMap(() => source_2$),
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() =>
wait_2$.next(1);
, 4000);
在这种情况下,4 秒后,以0
开头的序列会打印在控制台上。
【讨论】:
我更新我的问题更准确,你能更新你的答案吗以上是关于等到第二个 Observable 发出的主要内容,如果未能解决你的问题,请参考以下文章
如何让一个 Observable 序列在发射前等待另一个完成?