使用 Angular observable 定期查询 web api 并订阅
Posted
技术标签:
【中文标题】使用 Angular observable 定期查询 web api 并订阅【英文标题】:Query a web api periodically with Angular observable and subscribe 【发布时间】:2021-11-08 10:29:00 【问题描述】:我有一个 Launch 方法,它接受一些参数并启动一个引擎。 当引擎启动时,它会返回实例名称。 使用此实例名称,我想定期查询另一个服务,例如每 2 秒以了解状态是否更改为“成功”或“失败” 我在第一个订阅中做了一个 do while 循环,但它没有按预期工作。
instanceStatus: string = "Initialized";
instanceName:string = "InstanceName";
Launch(sessionId: string, projectName: string, f: string[])
this.service.Launch(sessionId, projectName, this.f)
.pipe(first())
.subscribe(
instanceName =>
localStorage.setItem('instanceName', instanceName);
this.instanceName = instanceName;
setTimeout(() =>
do
this.service.getEngineStatus(this.instanceName)
.pipe(first())
.subscribe(
status =>
this.instanceStatus = status;
console.log(status);
console.log(this.instanceStatus);
this.loadingService.showSpinner( text: 'Modeling is running...' );
if (this.instanceStatus === "Succeeded")
this.messageService.add( severity: Severity.Success, summary: 'Fault modeling completed', detail: 'Via MessageService' );
this.messageService.clear();
);
while (this.instanceStatus !== "Succeeded")
, 2000);
);
getEngineStatus(instanceName:string): Observable<string>
this.serviceUrl = URL + `?instance=` + instanceName;
return this._http.get<string>(this.serviceUrl);
【问题讨论】:
这能回答你的问题吗? How To Do Polling with Angular 2 Observables 谢谢利亚姆,但它已经过时了。 'interval' 属性不再存在于 Observable 上。 它确实存在,只是创建方式不同,因为更高版本的 rxjs learnrxjs.io/learn-rxjs/operators/creation/interval 【参考方案1】:你看过interval
creation operator吗?我相信这可能是您需要使用的。我用这个运算符试了一下,代码如下:
Launch(sessionId: string, projectName: string, f: string[])
this.service
.Launch(sessionId, projectName, this.f)
.pipe(
first(),
tap((instanceName) =>
localStorage.setItem('instanceName', instanceName);
this.instanceName = instanceName;
),
switchMap(() => interval(2000)),
takeWhile(() => !['Succeeded', 'Failed'].contains(this.instanceStatus)),
tap(() =>
this.loadingService.showSpinner(
text: 'Modeling is running...',
);
),
switchMap(() => this.service.getEngineStatus(this.instanceName))
)
.subscribe((status) =>
this.instanceStatus = status;
this.loadingService.hideSpinner();
console.log(status);
console.log(this.instanceStatus);
if (this.instanceStatus === 'Succeeded')
this.messageService.add(
severity: Severity.Success,
summary: 'Fault modeling completed',
detail: 'Via MessageService',
);
this.messageService.clear();
);
【讨论】:
您好,谢谢,我试过这个,但它甚至没有查询第二个服务。我也做了那个改变!['Succeeded', 'Failed'].includes(this.instanceStatus)),【参考方案2】:避免嵌套订阅。而是尝试使用更高阶的映射运算符,例如 switchMap
。
不要使用for
或while
之类的JS 语句,而是使用interval
或timer
之类的RxJS 函数进行轮询。
使用tap
操作符来做一些副作用,比如将数据推送到本地存储。
import Observable, timer from 'rxjs';
import tap, finalize, switchMap, takeWhile from 'rxjs/operators';
POLL_INTERVAL = 2000;
instanceStatus: string = "Initialized";
instanceName:string = "InstanceName";
this.Launch(sample, sample, sample).pipe(
switchMap((instanceName: any) =>
timer(0, POLL_INTERVAL).pipe( // <-- start immediately and poll every 'n' secs
switchMap(() => this.service.getEngineStatus(instanceName)),
takeWhile((instanceStatus: any) => instanceStatus === 'Succeeded'), // <-- stop poll when status !== 'Succeeded'
finalize(() => // <-- run when polling stops
this.messageService.add(
severity: Severity.Success,
summary: 'Fault modeling completed',
detail: 'Via MessageService'
);
this.messageService.clear();
)
)
).subscribe(
next: (instanceStatus: any) => this.instanceStatus = instanceStatus
);
)
Launch(sessionId: string, projectName: string, f: string[]): Observable<any>
return this.service.Launch(sessionId, projectName, this.f).pipe(
first(),
tap((instanceName: any) =>
localStorage.setItem('instanceName', instanceName);
this.instanceName = instanceName;
)
);
更新(感谢@Liam):
do
现在是 tap
运算符。
.subscribe()
必须位于pipe()
函数之后的最后一级。它在里面是错误的。请重试该代码。
【讨论】:
没有重载匹配这个调用。 Overload 1 of 3, '(project: (value: string, index: number) => ObservableInput): OperatorFunctiondo
/tap
运算符。 subscription
是在发布之前未检查答案的结果。我已经更新了帖子。【参考方案3】:
据我所知,我怀疑这样的事情会成为为您找到答案的一部分。
这里的关键是计时器在status !== "Succeeded"
时每2 秒发出一个值。一旦status === "Succeeded"
true
用于takeWhile
中的第二个参数,将在它关闭计时器之前允许最终发射(不再每 2 秒检查一次)。
剩下的就是这样。
launch(sessionId: string, projectName: string, f: string[]): void
this.service.launch(sessionId, projectName, f).pipe(
first(),
tap(instanceName =>
localStorage.setItem('instanceName', instanceName);
this.instanceName = instanceName;
),
switchMap((instanceName:string) => timer(0,2000).pipe(
exhaustMap(_ => this.service.getEngineStatus(instanceName).pipe(
first()
)),
takeWhile((status:string) => status !== "Succeeded", true),
map(status => (instanceName, status))
))
).subscribe((instanceName, status) =>
this.loadingService.showSpinner( text: 'Modeling is running...' );
if (status === "Succeeded")
this.messageService.add(
severity: Severity.Success,
summary: 'Fault modeling completed',
detail: 'Via MessageService'
);
this.messageService.clear();
);
更新
这里有一些代码可以满足您的需求,但完全是独立的。这不是基于您的代码(因为我无法运行/测试我无权访问的代码)。
服务中的计时器模拟请求和接收响应之间的一些延迟。
type Status = "Succeeded" | "Other"
interface Service
// We'll pretend we can't query privateStatus directly, for example
// it could be data on a server that you can only access via getStatus
privateStatus: Status,
launch: (sessionId: string) => Observable<string>,
getStatus: (instanceName: string) => Observable<Status>
class ArbiratryClass
service: Service;
readonly PING_INTERVAL = 2000;
constructor()
this.service =
privateStatus: "Other",
launch: (sessionId: string) => timer(8000).pipe(
mapTo(""),
tap(status => this.service.privateStatus = "Succeeded"),
filter(_ => false),
startWith(`Hello $sessionId`)
),
getStatus: _ => timer(500).pipe(mapTo(this.service.privateStatus))
arbiratryInit(sessionId: string)
// Launch session, then query the status every ping_Interval
// and print the result to the console.
this.service.launch(sessionId).pipe(
switchMap(instanceName => timer(0,this.PING_INTERVAL).pipe(
exhaustMap(_ => this.service.getStatus(instanceName)),
takeWhile(status => status !== "Succeeded", true),
map(status => (instanceName, status))
))
).subscribe(console.log);
new ArbiratryClass().arbiratryInit("ABCD_1234");
运行此命令时控制台的输出:
instanceName: 'Hello ABCD_1234', status: 'Other'
instanceName: 'Hello ABCD_1234', status: 'Other'
instanceName: 'Hello ABCD_1234', status: 'Other'
instanceName: 'Hello ABCD_1234', status: 'Other'
instanceName: 'Hello ABCD_1234', status: 'Succeeded'
【讨论】:
getEngineStatus 永远不会被查询。调试器不进入switchMap @TropicalViking 出于显而易见的原因,我无法真正调试您的代码。我没有访问权限,所以我无法知道为什么您尝试的方法不起作用。也许我打错了?也许您的代码在其他地方有错误?不知道。我所做的是创建一个工作示例。您可以在我的答案的更新中看到这一点。以上是关于使用 Angular observable 定期查询 web api 并订阅的主要内容,如果未能解决你的问题,请参考以下文章
在 Angular 中使用 Observer 作为 Observable
Angular2 - 在另一个 Observable 中使用 Observable 返回方法的值
Angular 2 - 使用 Observable 时未加载模型类? [复制]