如何让一个 Observable 序列在发射前等待另一个完成?

Posted

技术标签:

【中文标题】如何让一个 Observable 序列在发射前等待另一个完成?【英文标题】:How to make one Observable sequence wait for another to complete before emitting? 【发布时间】:2015-08-11 17:36:37 【问题描述】:

假设我有一个Observable,就像这样:

var one = someObservable.take(1);

one.subscribe(function() /* do something */ );

然后,我有第二个Observable

var two = someOtherObservable.take(1);

现在,我想将subscribe() 发送到two,但我想确保onetwo 订阅者被解雇之前完成。

我可以在two上使用什么样的缓冲方法让第二个等待第一个完成?

我想我希望暂停 two 直到 one 完成。

【问题讨论】:

我相信这个问题的答案是 .exhaustMap() 方法但是我不会假装知道如何实现它 - 完整描述在这里:blog.angular-university.io/rxjs-higher-order-mapping 【参考方案1】:

我能想到的几种方法

import take, publish from 'rxjs/operators'
import concat from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() /*do something */);

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function()/*do something */);
one.subscribe(function()/*do something */, null, two.connect.bind(two));

【讨论】:

我最终使用了pauseresume 而不是publishconnect,但示例二基本上是我采取的路线。 此方法是否总是让第一个可观察的 (one) 在 subscribe() 函数中的第二个 (two) 之前解析? 为什么不使用Observable.forkJoin()?看到这个链接learnrxjs.io/operators/combination/forkjoin.html @mspasiuk 根据 OP 的要求,他们只希望第二个订阅 第一个完成之后。 forkJoin 同时订阅。 @Spray'n'Pray 否,因为这会在收到来自one 的第一个值后完成订阅,因此它甚至不会订阅two【参考方案2】:

skipUntil() 和 last()

skipUntil : 忽略发射的项目,直到另一个 observable 发射

last:从序列中发出最后一个值(即等到它完成后再发出)

请注意,传递给skipUntil 的可观察对象发出的任何内容都会取消跳过,这就是为什么我们需要添加last() - 以等待流完成。

main$.skipUntil(sequence2$.pipe(last()))

官方:https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


可能的问题:请注意 last() 本身 will error 如果没有发出任何内容。 last() 运算符确实有一个 default 参数,但仅当与谓词一起使用时。我认为如果这种情况对您来说是个问题(如果 sequence2$ 可以在不发射的情况下完成),那么其中一个应该可以工作(目前未经测试):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

请注意,undefined 是要发出的有效项目,但实际上可以是任何值。另请注意,这是连接到sequence2$ 的管道,而不是main$ 管道。

【讨论】:

非常笨拙的演示:angular-vgznak.stackblitz.io你需要点击打开控制台托盘 您的语法错误。 skipUntil 不能直接附加到 observable,否则您将收到以下错误:“Property 'skipUntil' does not exist on type 'Observable'。”您需要先通过 .pipe() 运行它 是的,这是需要管道之前的旧答案。谢谢你提到它。我现在会更新它,但我在手机上。随意编辑答案。【参考方案3】:

如果要确保保留执行顺序,可以使用flatMap,如下例

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

结果是:

"1"
"11"
"111"
"finished"

【讨论】:

【参考方案4】:

这是利用 switchMap 的结果选择器的另一种可能性

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => 
    /* do something */ 
  );

由于 switchMap 的结果选择器已经被贬值,这里是一个更新的版本

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => 
  /* do something */ 
);

【讨论】:

【参考方案5】:

这是一种可重复使用的方法(它是打字稿,但您可以将其改编为 js):

function waitFor<T>(signal: Observable<any>) 
    return (source: Observable<T>) => signal.pipe(
        first(),
        switchMap(_ => source),
    );

您可以像使用任何运算符一样使用它:

var two = someOtherObservable.pipe(waitFor(one), take(1));

它基本上是一个操作符,它推迟对源 observable 的订阅,直到信号 observable 发出第一个事件。

【讨论】:

这个可重用函数有 rxswift 版本吗【参考方案6】:

如果第二个 observable 是 hot,则有 another way 来执行 pause/resume

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function ()  
  /* resume paused source2 */ 
  pauser.onNext(true);
).subscribe(function()
  // do something
);

source2.subscribe(function()
  // start to recieve data 
);

您也可以使用缓冲版本 pausableBuffered 在暂停期间保留数据。

【讨论】:

【参考方案7】:

这是一个用 TypeScript 编写的自定义运算符,它在发出结果之前等待信号:

export function waitFor<T>(
    signal$: Observable<any>
) 
    return (source$: Observable<T>) =>
        new Observable<T>(observer => 
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        );

你可以这样使用它:

two.pipe(waitFor(one))
   .subscribe(value => ...);

【讨论】:

漂亮的图案!你甚至可以做three.pipe(waitFor(one), waitFor(two), take(1)) 您在非正常 IMO 的运营商内部订阅 @MehdiBenmoha 为什么会这样?这是使用first(), 运算符的订阅。我认为它在性能方面是安全的。【参考方案8】:

这是另一个,但我觉得更直接和直观(或者如果你习惯了 Promises,至少是自然的),方法。基本上,您使用 Observable.create() 创建一个 Observable,将 onetwo 包装为单个 Observable。这与Promise.all() 的工作方式非常相似。

var first = someObservable.take(1);
var second = Observable.create((observer) => 
  return first.subscribe(
    function onNext(value) 
      /* do something with value like: */
      // observer.next(value);
    ,
    function onError(error) 
      observer.error(error);
    ,
    function onComplete() 
      someOtherObservable.take(1).subscribe(
        function onNext(value) 
          observer.next(value);
        ,
        function onError(error) 
          observer.error(error);
        ,
        function onComplete() 
          observer.complete();
        
      );
    
  );
);

那么,这里发生了什么?首先,我们创建一个新的 Observable。传递给Observable.create() 的函数,恰如其分地命名为onSubscription,被传递给观察者(根据您传递给subscribe() 的参数构建),类似于resolvereject 在创建一个对象时组合成一个对象新的承诺。这就是我们让魔法发挥作用的方式。

onSubscription 中,我们订阅了第一个 Observable(在上面的示例中,它被称为one)。我们如何处理nexterror 取决于您,但我的示例中提供的默认值一般来说应该是合适的。但是,当我们收到complete 事件时,这意味着one 现在已经完成,我们可以订阅下一个 Observable;从而在第一个 Observable 完成后触发第二个 Observable。

为第二个 Observable 提供的示例观察者相当简单。基本上,second 现在的行为就像您期望 two 在 OP 中的行为一样。更具体地说,second 将发出第一个也是唯一一个由someOtherObservable 发出的值(因为take(1))然后完成,假设没有错误。

示例

这是一个完整的工作示例,如果您想在现实生活中看到我的示例,您可以复制/粘贴:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => 
  return first.subscribe(
    function onNext(value) 
      /* do something with value like: */
      observer.next(value);
    ,
    function onError(error) 
      observer.error(error);
    ,
    function onComplete() 
      someOtherObservable.take(1).subscribe(
        function onNext(value) 
          observer.next(value);
        ,
        function onError(error) 
          observer.error(error);
        ,
        function onComplete() 
          observer.complete();
        
      );
    
  );
).subscribe(
  function onNext(value) 
    console.log(value);
  ,
  function onError(error) 
    console.error(error);
  ,
  function onComplete() 
    console.log("Done!");
  
);

如果你看控制台,上面的例子会打印出来:

1

6

完成!

【讨论】:

这是我创建自己的自定义 'cluster(T, X, D)' 运算符所需的突破,该运算符仅处理时间跨度 T 内从源发出的第一个 X,并发出间隔为 D 的结果延迟。谢谢! 我很高兴它有帮助,当我意识到这一点时也很有启发性。【参考方案9】:

好吧,我知道这已经很老了,但我认为您可能需要的是:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => 
   // one is completed and we get two's subscription.
)

【讨论】:

【参考方案10】:

感谢 mergeMap(或他的别名 flatMap)运算符,您可以使用以前的 Observable 发出的结果,如下所示:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))

【讨论】:

从这里:learnrxjs.io/learn-rxjs/operators/transformation/mergemap - “如果内部 observables 的发射和订阅顺序很重要,试试 concatMap!”

以上是关于如何让一个 Observable 序列在发射前等待另一个完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何在订阅前等待 Observable 完成

RxJava之错误处理

iOS RxSwift - 如何“断开”一个 observable?

RxJava2.0中flatMap操作符用法和源码分析

RxJava2.0中flatMap操作符用法和源码分析

Android RxJava使用介绍 RxJava的操作符