Rxjs 可观察等待,直到满足某些条件

Posted

技术标签:

【中文标题】Rxjs 可观察等待,直到满足某些条件【英文标题】:Rxjs observable wait until some condition is met 【发布时间】:2018-01-27 00:42:20 【问题描述】:

我有以下重试逻辑来重试操作。它适用于单个请求。对于多个正在进行的请求,我想在重试之前等待现有的重试逻辑完成。

handleError(errors: Observable<any>) 

    const retryCountStart: number = 1;

    // wait if there is any existing operation retrying
    // once it is complete, continue here

    return errors
        .mergeScan<any, any>(
        (retryCount: any, err: any) => 

            if (retryCount <= 5) 
                return Observable.of(retryCount + 1);
             

        ,retryCountStart)
        .delay(1000);

如何在上述方法中添加延迟直到满足某些条件?

【问题讨论】:

【参考方案1】:

您可以使用 async / await 来实现这个目的,并通过 Promise 解析:

async handleError(errors: Observable<any>) 

    const retryCountStart: number = 1;

    // wait if there is any existing operation retrying
    // ----------------------------------------------------------
    await new Promise(resolve => 
        // declare some global variable to check in while loop
        while(this.retrying)
            setTimeout(()=> 
                // Just adding some delay 
                // (you can remove this setTimeout block if you want)
            ,50);
        

        // when while-loop breaks, resolve the promise to continue
        resolve();
    );
    // ----------------------------------------------------------

    // once it is complete, continue here

    return errors
        .mergeScan<any, any>(
        (retryCount: any, err: any) => 

            if (retryCount <= 5) 
                return Observable.of(retryCount + 1);
             

        ,retryCountStart)
        .delay(1000);

【讨论】:

感谢您的快速回复。如果我使用 async/await,那么我认为我将不得不使用异步更新我的所有调用者函数。没有 async/await 是否可以实现? 不,您必须在此之前使用async,我在答案中以及handleError 方法之前添加了这一点。 如果有任何其他解决方案,那么我不知道。我在我的项目中使用这个没有任何错误:)【参考方案2】:

据我了解,您只想在上一个流完成后才开始下一个流(即将流添加到队列)

import  Observable, of, BehaviorSubject, from  from 'rxjs';
import  tap, finalize, filter, take, switchMap, delay  from 'rxjs/operators';

class StreamQueue 
  lastStreamCompleted$: Observable<boolean> = new BehaviorSubject(true);

  private runAfter<T>(lastStreamCompleted$: Observable<boolean>, stream$: Observable<T>): [Observable<boolean>, Observable<T>] 
    const newLastStreamCompleted$ = new BehaviorSubject(false);
    const newStream$ = lastStreamCompleted$
      .pipe(
        filter(lastStreamCompleted => lastStreamCompleted),
        take(1),
        switchMap(() => stream$),
        finalize(() => newLastStreamCompleted$.next(true)),
    );
    return [newLastStreamCompleted$, newStream$];
  

  add(stream$: Observable<any>) 
    const [newLastStreamCompleted$, newStream$] = this.runAfter(this.lastStreamCompleted$, stream$);
    this.lastStreamCompleted$ = newLastStreamCompleted$;
    return newStream$;
  


const streamQueue = new StreamQueue();

streamQueue.add(from([1, 2]).pipe(delay(100))).subscribe(console.log);
setTimeout(()=>streamQueue.add(from([21, 22]).pipe(delay(100))).subscribe(console.log), 100);
streamQueue.add(from([11, 12]).pipe(delay(100))).subscribe(console.log);

// Output:
// 1
// 2
// 11
// 12
// 21
// 22

【讨论】:

以上是关于Rxjs 可观察等待,直到满足某些条件的主要内容,如果未能解决你的问题,请参考以下文章

我如何只调用这么多时间的可观察服务并且直到满足某个条件的时间间隔

如何仅提取 json 的某些属性,使用 rxjs 作为可观察对象返回

一个一个可观察的 RxJS

当页面不活动时,RxJS 6 暂停或缓冲可观察

Angular RxJS入门笔记 (Observable可观察对象Subscribe订阅Observer观察者Subscription对象)

rxjs中常用的操作符