带有延迟功能的 Rxjs 重试

Posted

技术标签:

【中文标题】带有延迟功能的 Rxjs 重试【英文标题】:Rxjs Retry with Delay function 【发布时间】:2017-12-12 05:52:56 【问题描述】:

我正在尝试将 retrydelay 函数一起使用,我希望函数会在 1000 毫秒延迟后调用,但它没有,这可能是什么错误? 查看控制台输出,时间是 16:22:48。

我预计那里是 16:22:48、16:22:59 ...

canCreate: boolean;
getSomeFunction(): Observable<boolean> 
        return new Observable<boolean>(
            observer => 
                const canCreate = null; // this is just null for now, will some value later
                if (canCreate == null) 
                    observer.error('error');
                 else 
                    observer.next(true);
                
                observer.complete();
            
        )
    


this.getSomeFunction()
      .do((value) => 
        this.cCreate = value;
      , (error) => 
         console.log(error + new Date().toTimeString());
      )
      .delay(1000)
      .retry(10)
      .subscribe(
        value => this.cCreate = value,
        error => 
          this.cCreate = false;
        ,
        () => 
      );
  

控制台结果是:

【问题讨论】:

【参考方案1】:

delay() 用于在 observable 发出的事件之间引入延迟。但是 observable 永远不会发出任何事件。它只是立即出错。

您要查找的是retryWhen(),它允许决定在多长时间后重试:

RxJS 5:

  .retryWhen(errors => errors.delay(1000).take(10))

RxJS 6:

import  retryWhen, delay, take  from 'rxjs/operators'
someFunction().pipe(
  // ...
  retryWhen(errors => errors.pipe(delay(1000), take(10)))
)

这将在 10 次尝试后完成整个 observable。如果你想在 10 次尝试后使整个 observable 出错,retryWhen 回调返回的 observable 必须抛出:

RxJS 5:

  .retryWhen(errors => errors.delay(1000).take(10).concat(Observable.throw()))

RxJS 6:

import  retryWhen, delay, take, concatMap, throwError  from 'rxjs/operators'
someFunction().pipe(
  // ...
  retryWhen(errors => errors.pipe(delay(1000), take(10), concatMap(throwError)))
)

【讨论】:

errors的类型是observable&lt;any&gt;,它没有complete方法 我误解了文档。现在应该可以了。见plnkr.co/edit/HTpcxkH9MqKqAsVep2na?p=preview 这里是较新 Angular/RxJS 版本的完整代码:pipe( retryWhen(error =&gt; error.pipe(delay(1000), take(3), concat(Observable.throw(error)))), catchError(myErrorHandler.bind(this)) ) 401错误要停止重试怎么办? 重投不正常。【参考方案2】:

补充@JB Nizet 的答案。如果你是在 rxjs 5+ 中使用 lettable 操作符编写的,那么将其结构化为

retryWhen(errors =&gt; errors.pipe(delay(1000), take(5)))

【讨论】:

知道.concat(Observable.throw())) 的等价物是什么吗? @EdBordin 如果我猜我会说....pipe(delay(1000), take(5), concat(Observable.throw())) 谢谢!非常接近:pipe(delay(1000), take(5), concat(Observable.throw('error message seems to be required'))) @EdBordin @DavidTheProgrammer 任何想法如何使用 RxJS 6 和已弃用的“实例 concat()”来正确处理?我得到了retryWhen(errors =&gt; concat(errors.pipe(delay(750), take(2)), throwError(errors)))),但它不会解开原始错误。 无论如何我也可以获得错误索引吗?我想增加后续错误的延迟。【参考方案3】:

所有这些都是 RxJS 6+


TL;DR

您可以使用此包中经过全面测试的运算符,或向下滚动查看源代码 :)

npm i rxjs-boost
import  retryWithDelay  from 'rxjs-boost/operators';

obs$.pipe(
  // will retry 4 times with a 1s delay before each try:
  retryWithDelay(1000, 4)
);

标准

由于大多数(或者可能没有)其他答案不符合我的所有标准,我将在下面列出我的解决方案。目标:

如果没有抛出错误,则定期发出并完成。 ✅ 如果抛出错误,重试x 次。 ✅ 在每次重试之前有y 的延迟。 ✅ 返回最后发出的错误。(很多其他答案都在努力解决这个问题。)✅ 使用strict: true 正确输入 - 但这很难搞砸。 ✅

解决方案

与其他答案一样,我们将使用retryWhen 运算符来捕获错误。要跟踪重复次数可以使用scan 运算符。为了限制重复次数,我们只需在 map 运算符中抛出一个错误。

原始来源使用throwIf,但在这种情况下,我们可以简单地使用来自rxjs-boost 的retryWithDelay。

最后我们将使用delay 运算符来添加不同执行之间的延迟:

import  MonoTypeOperatorFunction  from 'rxjs';
import  delay as delayOperator, map, retryWhen, scan  from 'rxjs/operators';

export function retryWithDelay<T>(
  delay: number,
  count = 1
): MonoTypeOperatorFunction<T> 
  return (input) =>
    input.pipe(
      retryWhen((errors) =>
        errors.pipe(
          scan((acc, error) => ( count: acc.count + 1, error ), 
            count: 0,
            error: undefined as any,
          ),
          map((current) => 
            if (current.count > count) 
              throw current.error;
            
            return current;
          ),
          delayOperator(delay)
        )
      )
    );


来源

rxjs-boost Original Source Spec File with Tests - 可能会派上用场

【讨论】:

'rxjs-boost/operators' 导入它不起作用。我必须从'rxjs-boost/lib/operators' 导入它。【参考方案4】:

我得出这个结论,是为了用http管道中的其他操作重试

import delay as _delay, map, retryWhen from 'rxjs/operators';

export const delayedRetry = (delay, retries = 1) => retryWhen(result => 
    let _retries = 0;
    return result.pipe(
      _delay(delay),
      map(error => 
        if (_retries++ === retries) 
          throw error;
        
        return error;
      ),
    );
  ,
);

用法

    http.pipe(
      delayedRetry(1500, 2),
      catchError((err) => 
        this.toasterService.error($localize`:@@not-saved:Could not save`);
        return of(false);
      ),
      finalize(() => this.sending = false),
    ).subscribe((success: boolean) => 
        if (success === true) 
           this.toasterService.success($localize`:@@saved:Saved`);
        
      
    );

【讨论】:

我有一些类型错误,但是像这样调整它,修复它:export function delayedRetry&lt;T&gt;(delay: number, retries = 1): MonoTypeOperatorFunction&lt;T&gt; return (input) =&gt; input.pipe( retryWhen((errors) =&gt; let retriesInner = 0; return errors.pipe( delayOperator(delay), map((error) =&gt; if (retriesInner++ === retries) throw error; return error; ), ); ), ); 【参考方案5】:

对于ngrx5+,我们可以创建运算符:


function retryRequest(constructor: () => Observable, count: number, delayTime: number) 
  let index = 0;
  return of(1) // we need to repeat not the result of constructor(), but the call of constructor() itself
    .pipe(
      switchMap(constructor),
      retryWhen(errors => errors.pipe(
        delay(delayTime),
        mergeMap(error => 
          if (++index > count) 
            return throwError(error);
          
          return of(error);
        )
      ))
    );

【讨论】:

【参考方案6】:

我最近遇到了这个问题,发现可以改进接受的解决方案。

Observable.pipe(
     retryWhen(errors => errors.pipe(
      delay(1000),
      take(10))),
    first(v => true),
    timeout(10000))

它本质上所做的是如上所述重试,但这会立即完成,而不会使用“第一个”运算符添加任何(错误)值。

如果在超时时间内找不到值,则会引发错误。

【讨论】:

【参考方案7】:

适用于 rxjs 版本 6.3.3

https://stackblitz.com/edit/http-basics-8swzpy

打开控制台并查看重试次数

示例代码

import  map, catchError, retryWhen, take, delay, concat  from 'rxjs/operators';
import  throwError  from 'rxjs';


export class ApiEXT 

    static get apiURL(): string  return 'http://localhost:57886/api'; ;
    static httpCLIENT: HttpClient;

 static POST(postOBJ: any, retryCOUNT: number = 0, retryINTERVAL: number = 1000) 
        return this.httpCLIENT
            .post(this.apiURL, JSON.stringify(postOBJ))
            .pipe(
                map(this.handleSUCCESS),
                retryWhen(errors => errors.pipe(delay(retryINTERVAL), take(retryCOUNT), concat(throwError("Giving up Retry.!")))),
                catchError(this.handleERROR));
    


  private static handleSUCCESS(json_response: string): any 
        //TODO: cast_and_return    
        return JSON.parse(json_response);

    

 private static handleERROR(error: Response) 
        let errorMSG: string;
        switch (error.status) 
            case -1: errorMSG = "(" + error.status + "/" + error.statusText + ")" + " Server Not Reachable.!"; break;
            default: errorMSG = "(" + error.status + "/" + error.statusText + ")" + " Unknown Error while connecting with server.!"; break;
        
        console.error(errorMSG);
        return throwError(errorMSG);
    


【讨论】:

【参考方案8】:

这可能对你有帮助

let values$ = Rx.Observable.interval(1000).take(5);
let errorFixed = false;

values$
.map((val) => 
   if(errorFixed)  return val; 
   else if( val > 0 && val % 2 === 0) 
      errorFixed = true;
      throw  error : 'error' ;

    else 
      return val;
   
)
.retryWhen((err) => 
    console.log('retrying again');
    return err.delay(1000).take(3); // 3 times
)
.subscribe((val) =>  console.log('value',val) );

【讨论】:

这是无限次调用,我只需要重试10次。【参考方案9】:

RxJS 提供了retry 操作符,当出现错误时,它会为给定的计数重新订阅 Observable。在抛出错误之前 Observable 被retry 运算符重新订阅给定的计数,如果仍然存在错误,则抛出错误。 retry 对于多次点击 URL 很有用。有可能由于网络带宽的原因,URL 一次没有返回成功的数据,当它重新连接时,可能会成功返回数据。如果重新绑定后 Observable 中仍然存在错误,则可以使用 catchError 返回带有用户定义的默认数据的 Observable。

getBook(id: number): Observable<Book> 
  return this.http.get<Book>(this.bookUrl + "/" + id).pipe(
     retry(3),
     catchError(err => 
      console.log(err);
      return of(null);
     )
  );

【讨论】:

如何使用延迟重试,在我的情况下只有重试有效【参考方案10】:

我使用retryWhenObservable.Interval 提出了以下解决方案,但在此解决方案中,订阅的error 函数从不调用,

this.branchService.getCanCreate()
  .do((value) => 
    this.cCreate = value;
  , (error) => 
    console.log('do', error + new Date().toTimeString());
  )
  .retryWhen(errors => 
    return Observable.interval(1000).take(3).concat(Observable.throw('error')));
  )
  .subscribe(
    value => 
      this.cCreate = !!value
      console.log('success', new Date().toTimeString());
    ,
    error => 
      console.log('subscribe', error + new Date().toTimeString());
      this.cCreate = false;
    ,
    () => 
      console.log('finally', new Date().toTimeString());
    
  );

【讨论】:

以上是关于带有延迟功能的 Rxjs 重试的主要内容,如果未能解决你的问题,请参考以下文章

Observable 错误,执行不同的 observable,重试原始 observable (Angular, RxJs6)

Rxjs 可观察等待,直到满足某些条件

为啥我们在重试请求之间应用延迟

重试延迟 - RxSwift

URLSession dataTaskPublisher 在特定错误延迟后重试

重试与延迟调度系统-场景与设计