使用 Angular observable 定期查询 web api 并订阅

Posted

技术标签:

【中文标题】使用 Angular observable 定期查询 web api 并订阅【英文标题】:Query a web api periodically with Angular observable and subscribe 【发布时间】:2021-11-08 10:29:00 【问题描述】:

我有一个 Launch 方法,它接受一些参数并启动一个引擎。 当引擎启动时,它会返回实例名称。 使用此实例名称,我想定期查询另一个服务,例如每 2 秒以了解状态是否更改为“成功”或“失败” 我在第一个订阅中做了一个 do while 循环,但它没有按预期工作。

instanceStatus: string = "Initialized";
instanceName:string = "InstanceName";


Launch(sessionId: string, projectName: string, f: string[]) 
    this.service.Launch(sessionId, projectName, this.f)
      .pipe(first())
      .subscribe(
        instanceName => 
          localStorage.setItem('instanceName', instanceName);
          this.instanceName = instanceName;
          setTimeout(() => 
            do 
              this.service.getEngineStatus(this.instanceName)
                .pipe(first())
                .subscribe(
                  status => 
                    this.instanceStatus = status;
                    console.log(status);
                    console.log(this.instanceStatus);
                    this.loadingService.showSpinner( text: 'Modeling is running...' );
                    if (this.instanceStatus === "Succeeded") 
                      this.messageService.add( severity: Severity.Success, summary: 'Fault modeling completed', detail: 'Via MessageService' );
                      this.messageService.clear();
                    
                  
                );
             while (this.instanceStatus !== "Succeeded")
          , 2000);
        
      );
  



 getEngineStatus(instanceName:string): Observable<string> 
    this.serviceUrl = URL + `?instance=` + instanceName;
    return this._http.get<string>(this.serviceUrl);
  

【问题讨论】:

这能回答你的问题吗? How To Do Polling with Angular 2 Observables 谢谢利亚姆,但它已经过时了。 'interval' 属性不再存在于 Observable 上。 它确实存在,只是创建方式不同,因为更高版本的 rxjs learnrxjs.io/learn-rxjs/operators/creation/interval 【参考方案1】:

你看过interval creation operator吗?我相信这可能是您需要使用的。我用这个运算符试了一下,代码如下:

Launch(sessionId: string, projectName: string, f: string[]) 
  this.service
    .Launch(sessionId, projectName, this.f)
    .pipe(
      first(),
      tap((instanceName) => 
        localStorage.setItem('instanceName', instanceName);
        this.instanceName = instanceName;
      ),
      switchMap(() => interval(2000)),
      takeWhile(() => !['Succeeded', 'Failed'].contains(this.instanceStatus)),
      tap(() => 
        this.loadingService.showSpinner(
          text: 'Modeling is running...',
        );
      ),
      switchMap(() => this.service.getEngineStatus(this.instanceName))
    )
    .subscribe((status) => 
      this.instanceStatus = status;
      this.loadingService.hideSpinner();
      console.log(status);
      console.log(this.instanceStatus);

      if (this.instanceStatus === 'Succeeded') 
        this.messageService.add(
          severity: Severity.Success,
          summary: 'Fault modeling completed',
          detail: 'Via MessageService',
        );
        this.messageService.clear();
      
    );

【讨论】:

您好,谢谢,我试过这个,但它甚至没有查询第二个服务。我也做了那个改变!['Succeeded', 'Failed'].includes(this.instanceStatus)),【参考方案2】:

    避免嵌套订阅。而是尝试使用更高阶的映射运算符,例如 switchMap

    不要使用forwhile 之类的JS 语句,而是使用intervaltimer 之类的RxJS 函数进行轮询。

    使用tap 操作符来做一些副作用,比如将数据推送到本地存储。

import  Observable, timer  from 'rxjs';
import  tap, finalize, switchMap, takeWhile  from 'rxjs/operators';

POLL_INTERVAL = 2000;
instanceStatus: string = "Initialized";
instanceName:string = "InstanceName";

this.Launch(sample, sample, sample).pipe(
  switchMap((instanceName: any) => 
    timer(0, POLL_INTERVAL).pipe(                                         // <-- start immediately and poll every 'n' secs
      switchMap(() => this.service.getEngineStatus(instanceName)),
      takeWhile((instanceStatus: any) => instanceStatus === 'Succeeded'), // <-- stop poll when status !== 'Succeeded'
      finalize(() =>                                                     // <-- run when polling stops
        this.messageService.add(
          severity: Severity.Success,
          summary: 'Fault modeling completed',
          detail: 'Via MessageService'
        );
        this.messageService.clear();
      )
    )
  ).subscribe(
    next: (instanceStatus: any) => this.instanceStatus = instanceStatus
  );
)

Launch(sessionId: string, projectName: string, f: string[]): Observable<any> 
  return this.service.Launch(sessionId, projectName, this.f).pipe(
    first(),
    tap((instanceName: any) => 
      localStorage.setItem('instanceName', instanceName);
      this.instanceName = instanceName;
    )
  );

更新(感谢@Liam):

    do 现在是 tap 运算符。 .subscribe() 必须位于pipe() 函数之后的最后一级。它在里面是错误的。请重试该代码。

【讨论】:

没有重载匹配这个调用。 Overload 1 of 3, '(project: (value: string, index: number) => ObservableInput): OperatorFunction',给出以下错误。 '(instanceName: string) => Subscription' 类型的参数不可分配给 '(value: string, index: number) => ObservableInput' 类型的参数。类型“订阅”不可分配给类型“ObservableInput”。 Overload 3 of 3, '(project: (value: string, index: number) => ObservableInput, resultSelector: (outerValue: string, innerValue: any, outerIndex: number, innerIndex: number) => unknown): OperatorFunction<...>',给出以下错误。 '(instanceName: string) => Subscription' 类型的参数不可分配给 '(value: string, index: number) => ObservableInput' 类型的参数。类型“订阅”不可分配给类型“ObservableInput” 您似乎正在尝试将订阅分配给可观察对象。请显示您正在尝试的完整代码。也请在您的原始问题和此评论部分中更新它。 谢谢迈克尔,我试过你的代码,得到了以下错误。 @Liam:谢谢,我在写答案时完全忽略了do/tap 运算符。 subscription 是在发布之前未检查答案的结果。我已经更新了帖子。【参考方案3】:

据我所知,我怀疑这样的事情会成为为您找到答案的一部分。

这里的关键是计时器在status !== "Succeeded" 时每2 秒发出一个值。一旦status === "Succeeded" true 用于takeWhile 中的第二个参数,将在它关闭计时器之前允许最终发射(不再每 2 秒检查一次)。

剩下的就是这样。

launch(sessionId: string, projectName: string, f: string[]): void 
  this.service.launch(sessionId, projectName, f).pipe(

    first(),
    tap(instanceName => 
      localStorage.setItem('instanceName', instanceName);
      this.instanceName = instanceName;
    ),
    switchMap((instanceName:string) => timer(0,2000).pipe(
      exhaustMap(_ => this.service.getEngineStatus(instanceName).pipe(
        first()
      )),
      takeWhile((status:string) => status !== "Succeeded", true),
      map(status => (instanceName, status))
    ))

  ).subscribe((instanceName, status) => 
    this.loadingService.showSpinner( text: 'Modeling is running...' );
    if (status === "Succeeded") 
      this.messageService.add( 
        severity: Severity.Success, 
        summary: 'Fault modeling completed', 
        detail: 'Via MessageService' 
      );
      this.messageService.clear();
    
  );


更新

这里有一些代码可以满足您的需求,但完全是独立的。这不是基于您的代码(因为我无法运行/测试我无权访问的代码)。

服务中的计时器模拟请求和接收响应之间的一些延迟。

type Status = "Succeeded" | "Other"

interface Service 
  // We'll pretend we can't query privateStatus directly, for example
  // it could be data on a server that you can only access via getStatus
  privateStatus: Status,
  launch: (sessionId: string) => Observable<string>,
  getStatus: (instanceName: string) => Observable<Status>


class ArbiratryClass 

  service: Service;
  readonly PING_INTERVAL = 2000; 

  constructor()
    this.service = 
      privateStatus: "Other",
      launch: (sessionId: string) => timer(8000).pipe(
        mapTo(""),
        tap(status => this.service.privateStatus = "Succeeded"),
        filter(_ => false),
        startWith(`Hello $sessionId`)
      ),
      getStatus: _ => timer(500).pipe(mapTo(this.service.privateStatus))
    
  

  arbiratryInit(sessionId: string) 

    // Launch session, then query the status every ping_Interval 
    // and print the result to the console.
    this.service.launch(sessionId).pipe(
      switchMap(instanceName => timer(0,this.PING_INTERVAL).pipe(
        exhaustMap(_ => this.service.getStatus(instanceName)),
        takeWhile(status => status !== "Succeeded", true),
        map(status => (instanceName, status))
      ))
    ).subscribe(console.log);

  
  


new ArbiratryClass().arbiratryInit("ABCD_1234");

运行此命令时控制台的输出:

 instanceName: 'Hello ABCD_1234', status: 'Other' 
 instanceName: 'Hello ABCD_1234', status: 'Other' 
 instanceName: 'Hello ABCD_1234', status: 'Other' 
 instanceName: 'Hello ABCD_1234', status: 'Other' 
 instanceName: 'Hello ABCD_1234', status: 'Succeeded' 

【讨论】:

getEngineStatus 永远不会被查询。调试器不进入switchMap @TropicalViking 出于显而易见的原因,我无法真正调试您的代码。我没有访问权限,所以我无法知道为什么您尝试的方法不起作用。也许我打错了?也许您的代码在其他地方有错误?不知道。我所做的是创建一个工作示例。您可以在我的答案的更新中看到这一点。

以上是关于使用 Angular observable 定期查询 web api 并订阅的主要内容,如果未能解决你的问题,请参考以下文章

在 Angular 中使用 Observer 作为 Observable

Angular2 - 在另一个 Observable 中使用 Observable 返回方法的值

Angular 2 - 使用 Observable 时未加载模型类? [复制]

Angular 2 路由器使用 Observable 解析

Angular *ngFor 使用异步管道绑定到 observable - 发生了啥?

Angular 6:将 Observable 响应传递给另一个 Observable