尝试轮询服务器,但是由于算法中断,rxjs 方法可能不正确

Posted

技术标签:

【中文标题】尝试轮询服务器,但是由于算法中断,rxjs 方法可能不正确【英文标题】:attempting to poll a server however rxjs methods may be incorrect as algorithm breaks 【发布时间】:2021-03-08 18:56:07 【问题描述】:

所以我正在尝试轮询我的服务器。我试图每 5 秒轮询一次我的服务器。并且轮询在一分钟后超时。我在调试方法中有一些控制台日志,但即使在等待 5 秒后仍会触发的唯一 console.log 是 'validate stream start'

我遵循 this 教程,没有创建单独的服务,因为我的应用程序中的这一页只需要它。

我愿意打赌这是一个简单的错误或对这些 rxjs 运算符如何工作的误解。

我做错了什么?

startastream()
    this.startastreamboolan = true;
    let count = 12;
    this.http.get(environment.shochat_content_creator_set_valid_stream_start)
      .subscribe((req: any)=>
        console.log('validate stream start');
        timer(5000).pipe(
          switchMap(() =>
            console.log('timer start');
            if(count > 0)
              return this.http.get(environment.check_if_stream_is_active_on_mux)
                .subscribe(
                  (req: any)=>
                    this.streamready = true;
                    return 0;
                  ,
                  error => 
                    count = count -1;
                    console.log(count);

                  );
            
          ));
      );
   

【问题讨论】:

为什么不在第一个 get 请求中使用 switchMap 运算符。使用多个嵌套的 subscribe 方法是一种不好的做法。 【参考方案1】:

计时器

创建一个 Observable,它在 dueTime 之后开始发射,并在此后的每个 period 之后发射不断增加的数字。

timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>

类似于interval,但您可以指定何时开始排放。

在您的代码中,您忘记了timer 运算符的第二个参数,这意味着它只会等待5s,然后只发出一次。此外,您没有订阅timer,这就是它不再继续的原因。如果您想每隔5s 轮询您的服务器,您需要使用:

timer(0, 5000)

这里,计时器不会等待,而是会直接开始每隔5s发出值。


接下来,我看到您正在使用switchMap 创建内部流,这很好。但是,如果您的服务器需要超过 5s 来处理您的请求,您可能需要改用 mergeMap,因为它不会取消之前正在进行的内部流,这取决于您。

这里的错误是switchMap(或mergeMap)需要一个回调函数,必须返回一个Observable,然后运营商将自己订阅它。在这里,您将返回 Subscription

这是你可以做的:

const start$ = this.http.get(startUrl).pipe(
  tap(() => console.log('Stream start'))
);
const poll$ = this.http.get(pollUrl).pipe(
  tap(() => (this.streamReady = true)),
  catchError(error => 
    console.log(error);
    return EMPTY;
  )
);

start$.pipe(
  switchMap(() => timer(0, 5000).pipe(
    tap(() => console.log('Polling every 5s')),
    mergeMap(() => poll$)
  ))
).subscribe();

我在这里创建了两个Observablesstart$,负责启动流,poll$,负责轮询您的服务器。这里的想法是启动流,然后切换到timer 内部流,该流将每个5s 发出,然后再次切换到另一个将轮询服务器的内部流。

我没有在我的示例中包含count,因此流将永远使用此代码运行。为此,您应该查看takeUntil 运算符。

希望对你有帮助,欢迎提问!

【讨论】:

做得很好。谢谢你非常详细的回答。这需要时间,我很感谢你接受它!

以上是关于尝试轮询服务器,但是由于算法中断,rxjs 方法可能不正确的主要内容,如果未能解决你的问题,请参考以下文章

USN NFTS 更改通知事件中断

为啥我要使用 RxJS 的 interval() 或 timer() 轮询而不是 window.setInterval()?

每次传入可观察值(RxJS)时调用方法

Angular/RxJS:带有可观察对象的嵌套服务调用

负载均衡算法 — 轮询

CC2530的中断系统及外部中断应用