当页面不活动时,RxJS 6 暂停或缓冲可观察

Posted

技术标签:

【中文标题】当页面不活动时,RxJS 6 暂停或缓冲可观察【英文标题】:RxJS 6 Pause or buffer observable when the page is not active 【发布时间】:2019-11-23 00:23:26 【问题描述】:

所以我有一个流,假设是字母,我需要所有字母按正确的顺序将它们组合成一个单词。一切正常,直到用户更改选项卡、最小化浏览器或切换应用程序 - 行为几乎与我使用 setTimeout() 相同 - 弄乱了订单、丢失物品等。我试图通过使用 bufferWhen() 来实现我的目标、bufferToggle()takeUntil()publish()connect() 但没有成功。我也考虑过使用delayWhen,但它已被弃用并且可能不适合,因为它会立即停止流。我应该使用哪些功能以及如何使用?这是我的代码:

export class MyComponent implements AfterViewInit 
  private visibilityChange$ = fromEvent(document, 'visibilitychange').pipe(startWith('visible'), shareReplay( refCount: true, bufferSize: 1 ));
  private show$ = this.visibilityChange$.pipe(filter(() => document.visibilityState === 'visible'));
  private hide$ = this.visibilityChange$.pipe(filter(() => document.visibilityState === 'hidden'));

  public ngAfterViewInit() 
    const lettersStream$ = zip( // add delay for each letter
            from(['w', 'o', 'r', 'd']),
            interval(1000))
           // pause when hide$ fires, resume when show$
          .pipe(map(([letter, delayTime]) => letter))
          .subscribe(console.log);
  

我在stackblitz 上做了一个演示 - 我只想看看(当标签处于非活动状态时停止书写)该短语是如何写在屏幕上的。

【问题讨论】:

这样的? stackblitz.com/edit/angular-m3fvhr 嗯,没有。这些字母是可观察对象发出的连续值,您将它们设为可观察的of 而不是from。我得到“w”,然后 1 秒后下一个字母,除了更改浏览器选项卡造成的中断外,还需要保持顺序。 我看不到你是如何在letterStream$ 中使用show$hide$ observables 的。我只能看到提到他们的评论。我错过了什么吗? 这里的实际来源是什么?它是一个在标签改变时断开连接的WebSocket吗?或者它是一个文档事件还是什么?我很困惑是什么导致了这里的问题,或者为什么你需要在可见性变化时暂停这个流 源就像例子中一样,Observable 发射值,它没有断开连接,只是浏览器优先考虑可见选项卡,例如“d”项可以在“r”之前处理。 【参考方案1】:

由于我在我的 RxJS Snake Game 中做过类似的暂停/取消暂停操作,因此我会帮助您举例说明。

想法是将interval(1000) 作为事实来源,这意味着一切都将基于它。因此,我们的目标是让这个间隔暂停,基于我们需要停止在可见性隐藏上发出事件并继续在可见性显示上的事实。最后,为了让事情变得更容易,我们可以停止收听可见性隐藏的源间隔,并在可见性显示到达时再次开始收听。现在让我们看看具体的实现:

您也可以在RxJS Pause Observable 上使用修改后的 StackBlitz 演示代码。

import  of, interval, fromEvent, timer, from, zip, never  from 'rxjs';
import  delayWhen, tap, withLatestFrom, concatMap, take, startWith, distinctUntilChanged, switchMap, shareReplay, filter, map, finalize  from 'rxjs/operators';

console.log('-------------------------------------- STARTING ----------------------------')

class MyComponent 
  private visibilityChange$ = fromEvent(document, 'visibilitychange')
    .pipe(
      map(x => document.visibilityState),
      startWith('visible'),
      shareReplay(1)
    );

  private isVisible$ = this.visibilityChange$.pipe(
    map(x => x === 'visible'),
    distinctUntilChanged(),
  );

  constructor() 
    const intervalTime = 1000;
    const source$ = from('word or two'.split(''));
    /** should remove these .pipe(
        concatMap(ch => interval(intervalTime).pipe(map(_ => ch), take(1)))
      );*/

    const pausableInterval$ = this.isVisible$.pipe(
      switchMap(visible => visible ? interval(intervalTime) : never()),
    )

    const lettersStream$ = zip(pausableInterval$, source$).pipe(
      map(([tick, letter]) => letter),
    ).subscribe(letter => 
      this.writeLetter(letter);
    );
  

  private writeLetter(letter: string) 
    if (letter === ' ') letter = '\u00A0'; // fix for spaces
    document.body.innerText += letter;
  


const component = new MyComponent();

这是来自 StackBlitz 的确切代码,我复制到这里以便更好地为您解释。

现在让我们为您分解有趣的部分:

    看看visibilityChange$isVisible$。它们稍作修改,以便第一个基于document.visibilityState 发出字符串值'visible''hidden'。当document.visibilityState 等于'visible' 时,第二个发出真值。

    看看source$。它会发出一个字母,然后在 concatMapintervaltake(1) 的帮助下等待 1 秒,并执行此过程,直到文本中没有字符为止。

    看看pausableInterval$。基于this.isVisible$ 将根据document.visibilityState 改变,我们的pausableInterval$ 将每秒发射项目或由于never() 而根本不会发射任何东西。

    终于来看看lettersStream$。在zip() 的帮助下,我们将压缩pausableInterval$source$,因此我们将获得一封来自源的字母和一个来自可暂停间隔的刻度。如果pausableInterval$ 由于可见性改变而停止发射,zip 也会等待,因为它需要两个 Observable 一起发射才能向订阅发送事件。

【讨论】:

【参考方案2】:

对用例有点困惑,但这可能会解决它:

首先改为这样做:

private isVisible$ = this.visibilityChange$.pipe(
                       filter(() => document.visibilityState === 'visible'), 
                       distinctUntilChanged()); // just a safety operator

然后这样做:

const lettersStream$ = this.isVisible$.pipe(
        switchMap((isVisible) => (isVisible)
          ? zip( // add delay for each letter
              from(['w', 'o', 'r', 'd']),
              interval(1000))
            .pipe(map(([letter, delayTime]) => letter))
          : NEVER
        )
      ).subscribe(console.log);

每次可见性变化时只切换地图,如果可见则订阅源,如果不可见则什么都不做。

对于这个人为的例子,行为会有点不稳定,因为 from() 将始终发出相同的序列,但使用真正的非静态源,它应该按预期工作。

【讨论】:

我正在创建一个演示并在此处使用您的代码:stackblitz.com/edit/typescript-nsyvne?file=index.ts word or twowwwwww... 已打印并且正在继续... 这是由于from 作为来源的性质。每次订阅发生时,它都会重新发送它的整个流。但暂停按预期运行。它会取消订阅并在每次可见性更改时重新订阅。 好的,那么如何使用from呢? 你不能。您需要编写某种可以暂停的源,但是无法使用 from 作为源来停止然后在流中恢复排放。 from 通常对于 osbervables 在野外的行为方式不太现实,并且主要用于演示并且在实践中几乎没有晦涩的用例。目前还不清楚你到底想要什么。

以上是关于当页面不活动时,RxJS 6 暂停或缓冲可观察的主要内容,如果未能解决你的问题,请参考以下文章

在 Angular 6 中使用 rxjs 可观察对象的目的是啥?与 async/await 相比,rxjs 的优势是啥? [复制]

如何在可观察的地图中抛出错误(rxjs 6,ng6)

当用户在页面 javascript 或 php 中不活动 3 分钟时自动注销

处于暂停状态时如何强制 AVPlayer 暂停缓冲?

缺少可观察的方法 RxJS 5.0.0-beta.0

rxjs中常用的操作符