带有延迟功能的 Rxjs 重试
Posted
技术标签:
【中文标题】带有延迟功能的 Rxjs 重试【英文标题】:Rxjs Retry with Delay function 【发布时间】:2017-12-12 05:52:56 【问题描述】:我正在尝试将 retry
与 delay
函数一起使用,我希望函数会在 1000 毫秒延迟后调用,但它没有,这可能是什么错误?
查看控制台输出,时间是 16:22:48。
我预计那里是 16:22:48、16:22:59 ...
canCreate: boolean;
getSomeFunction(): Observable<boolean>
return new Observable<boolean>(
observer =>
const canCreate = null; // this is just null for now, will some value later
if (canCreate == null)
observer.error('error');
else
observer.next(true);
observer.complete();
)
this.getSomeFunction()
.do((value) =>
this.cCreate = value;
, (error) =>
console.log(error + new Date().toTimeString());
)
.delay(1000)
.retry(10)
.subscribe(
value => this.cCreate = value,
error =>
this.cCreate = false;
,
() =>
);
控制台结果是:
【问题讨论】:
【参考方案1】:delay()
用于在 observable 发出的事件之间引入延迟。但是 observable 永远不会发出任何事件。它只是立即出错。
您要查找的是retryWhen()
,它允许决定在多长时间后重试:
RxJS 5:
.retryWhen(errors => errors.delay(1000).take(10))
RxJS 6:
import retryWhen, delay, take from 'rxjs/operators'
someFunction().pipe(
// ...
retryWhen(errors => errors.pipe(delay(1000), take(10)))
)
这将在 10 次尝试后完成整个 observable。如果你想在 10 次尝试后使整个 observable 出错,retryWhen 回调返回的 observable 必须抛出:
RxJS 5:
.retryWhen(errors => errors.delay(1000).take(10).concat(Observable.throw()))
RxJS 6:
import retryWhen, delay, take, concatMap, throwError from 'rxjs/operators'
someFunction().pipe(
// ...
retryWhen(errors => errors.pipe(delay(1000), take(10), concatMap(throwError)))
)
【讨论】:
errors
的类型是observable<any>
,它没有complete
方法
我误解了文档。现在应该可以了。见plnkr.co/edit/HTpcxkH9MqKqAsVep2na?p=preview
这里是较新 Angular/RxJS 版本的完整代码:pipe( retryWhen(error => error.pipe(delay(1000), take(3), concat(Observable.throw(error)))), catchError(myErrorHandler.bind(this)) )
401错误要停止重试怎么办?
重投不正常。【参考方案2】:
补充@JB Nizet 的答案。如果你是在 rxjs 5+ 中使用 lettable 操作符编写的,那么将其结构化为
retryWhen(errors => errors.pipe(delay(1000), take(5)))
【讨论】:
知道.concat(Observable.throw()))
的等价物是什么吗?
@EdBordin 如果我猜我会说....pipe(delay(1000), take(5), concat(Observable.throw()))
谢谢!非常接近:pipe(delay(1000), take(5), concat(Observable.throw('error message seems to be required')))
@EdBordin @DavidTheProgrammer 任何想法如何使用 RxJS 6 和已弃用的“实例 concat()”来正确处理?我得到了retryWhen(errors => concat(errors.pipe(delay(750), take(2)), throwError(errors))))
,但它不会解开原始错误。
无论如何我也可以获得错误索引吗?我想增加后续错误的延迟。【参考方案3】:
所有这些都是 RxJS 6+
TL;DR
您可以使用此包中经过全面测试的运算符,或向下滚动查看源代码 :)
npm i rxjs-boost
import retryWithDelay from 'rxjs-boost/operators';
obs$.pipe(
// will retry 4 times with a 1s delay before each try:
retryWithDelay(1000, 4)
);
标准
由于大多数(或者可能没有)其他答案不符合我的所有标准,我将在下面列出我的解决方案。目标:
如果没有抛出错误,则定期发出并完成。 ✅ 如果抛出错误,重试x
次。 ✅
在每次重试之前有y
的延迟。 ✅
返回最后发出的错误。(很多其他答案都在努力解决这个问题。)✅
使用strict: true
正确输入 - 但这很难搞砸。 ✅
解决方案
与其他答案一样,我们将使用retryWhen 运算符来捕获错误。要跟踪重复次数可以使用scan 运算符。为了限制重复次数,我们只需在 map 运算符中抛出一个错误。
原始来源使用throwIf,但在这种情况下,我们可以简单地使用来自rxjs-boost 的retryWithDelay。
最后我们将使用delay 运算符来添加不同执行之间的延迟:
import MonoTypeOperatorFunction from 'rxjs';
import delay as delayOperator, map, retryWhen, scan from 'rxjs/operators';
export function retryWithDelay<T>(
delay: number,
count = 1
): MonoTypeOperatorFunction<T>
return (input) =>
input.pipe(
retryWhen((errors) =>
errors.pipe(
scan((acc, error) => ( count: acc.count + 1, error ),
count: 0,
error: undefined as any,
),
map((current) =>
if (current.count > count)
throw current.error;
return current;
),
delayOperator(delay)
)
)
);
来源
rxjs-boost Original Source Spec File with Tests - 可能会派上用场【讨论】:
从'rxjs-boost/operators'
导入它不起作用。我必须从'rxjs-boost/lib/operators'
导入它。【参考方案4】:
我得出这个结论,是为了用http管道中的其他操作重试
import delay as _delay, map, retryWhen from 'rxjs/operators';
export const delayedRetry = (delay, retries = 1) => retryWhen(result =>
let _retries = 0;
return result.pipe(
_delay(delay),
map(error =>
if (_retries++ === retries)
throw error;
return error;
),
);
,
);
用法
http.pipe(
delayedRetry(1500, 2),
catchError((err) =>
this.toasterService.error($localize`:@@not-saved:Could not save`);
return of(false);
),
finalize(() => this.sending = false),
).subscribe((success: boolean) =>
if (success === true)
this.toasterService.success($localize`:@@saved:Saved`);
);
【讨论】:
我有一些类型错误,但是像这样调整它,修复它:export function delayedRetry<T>(delay: number, retries = 1): MonoTypeOperatorFunction<T> return (input) => input.pipe( retryWhen((errors) => let retriesInner = 0; return errors.pipe( delayOperator(delay), map((error) => if (retriesInner++ === retries) throw error; return error; ), ); ), );
【参考方案5】:
对于ngrx5+,我们可以创建运算符:
function retryRequest(constructor: () => Observable, count: number, delayTime: number)
let index = 0;
return of(1) // we need to repeat not the result of constructor(), but the call of constructor() itself
.pipe(
switchMap(constructor),
retryWhen(errors => errors.pipe(
delay(delayTime),
mergeMap(error =>
if (++index > count)
return throwError(error);
return of(error);
)
))
);
【讨论】:
【参考方案6】:我最近遇到了这个问题,发现可以改进接受的解决方案。
Observable.pipe(
retryWhen(errors => errors.pipe(
delay(1000),
take(10))),
first(v => true),
timeout(10000))
它本质上所做的是如上所述重试,但这会立即完成,而不会使用“第一个”运算符添加任何(错误)值。
如果在超时时间内找不到值,则会引发错误。
【讨论】:
【参考方案7】:适用于 rxjs 版本 6.3.3
https://stackblitz.com/edit/http-basics-8swzpy
打开控制台并查看重试次数
示例代码
import map, catchError, retryWhen, take, delay, concat from 'rxjs/operators';
import throwError from 'rxjs';
export class ApiEXT
static get apiURL(): string return 'http://localhost:57886/api'; ;
static httpCLIENT: HttpClient;
static POST(postOBJ: any, retryCOUNT: number = 0, retryINTERVAL: number = 1000)
return this.httpCLIENT
.post(this.apiURL, JSON.stringify(postOBJ))
.pipe(
map(this.handleSUCCESS),
retryWhen(errors => errors.pipe(delay(retryINTERVAL), take(retryCOUNT), concat(throwError("Giving up Retry.!")))),
catchError(this.handleERROR));
private static handleSUCCESS(json_response: string): any
//TODO: cast_and_return
return JSON.parse(json_response);
private static handleERROR(error: Response)
let errorMSG: string;
switch (error.status)
case -1: errorMSG = "(" + error.status + "/" + error.statusText + ")" + " Server Not Reachable.!"; break;
default: errorMSG = "(" + error.status + "/" + error.statusText + ")" + " Unknown Error while connecting with server.!"; break;
console.error(errorMSG);
return throwError(errorMSG);
【讨论】:
【参考方案8】:这可能对你有帮助
let values$ = Rx.Observable.interval(1000).take(5);
let errorFixed = false;
values$
.map((val) =>
if(errorFixed) return val;
else if( val > 0 && val % 2 === 0)
errorFixed = true;
throw error : 'error' ;
else
return val;
)
.retryWhen((err) =>
console.log('retrying again');
return err.delay(1000).take(3); // 3 times
)
.subscribe((val) => console.log('value',val) );
【讨论】:
这是无限次调用,我只需要重试10次。【参考方案9】:RxJS 提供了retry
操作符,当出现错误时,它会为给定的计数重新订阅 Observable。在抛出错误之前 Observable 被retry
运算符重新订阅给定的计数,如果仍然存在错误,则抛出错误。 retry
对于多次点击 URL 很有用。有可能由于网络带宽的原因,URL 一次没有返回成功的数据,当它重新连接时,可能会成功返回数据。如果重新绑定后 Observable 中仍然存在错误,则可以使用 catchError
返回带有用户定义的默认数据的 Observable。
getBook(id: number): Observable<Book>
return this.http.get<Book>(this.bookUrl + "/" + id).pipe(
retry(3),
catchError(err =>
console.log(err);
return of(null);
)
);
【讨论】:
如何使用延迟重试,在我的情况下只有重试有效【参考方案10】:我使用retryWhen
和Observable.Interval
提出了以下解决方案,但在此解决方案中,订阅的error
函数从不调用,
this.branchService.getCanCreate()
.do((value) =>
this.cCreate = value;
, (error) =>
console.log('do', error + new Date().toTimeString());
)
.retryWhen(errors =>
return Observable.interval(1000).take(3).concat(Observable.throw('error')));
)
.subscribe(
value =>
this.cCreate = !!value
console.log('success', new Date().toTimeString());
,
error =>
console.log('subscribe', error + new Date().toTimeString());
this.cCreate = false;
,
() =>
console.log('finally', new Date().toTimeString());
);
【讨论】:
以上是关于带有延迟功能的 Rxjs 重试的主要内容,如果未能解决你的问题,请参考以下文章
Observable 错误,执行不同的 observable,重试原始 observable (Angular, RxJs6)