取消订阅 RxJS Observables

Posted

技术标签:

【中文标题】取消订阅 RxJS Observables【英文标题】:Unsubscribe from RxJS Observables 【发布时间】:2017-03-26 09:05:18 【问题描述】:

我有这两个对象,我想停止监听它们的事件。我对 observables 和 RxJS 完全陌生,只是尝试使用 Inquirer 库。

这里是 RxJS API 供参考: http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html

如何取消订阅这些类型的 observable?

ConnectableObservable:

   ConnectableObservable 
     source: EventPatternObservable  _add: [Function], _del: [Function], _fn: undefined ,
     _connection: ConnectDisposable  _p: [Circular], _s: [Object] ,
     _source: AnonymousObservable  source: [Object], __subscribe: [Function: subscribe] ,
     _subject: 
      Subject 
        isDisposed: false,
        isStopped: false,
        observers: [Object],
        hasError: false  ,
  _count: 1,
  _connectableSubscription: 
   ConnectDisposable 
     _p: 
      ConnectableObservable 
        source: [Object],
        _connection: [Circular],
        _source: [Object],
        _subject: [Object] ,
     _s: AutoDetachObserver  isStopped: false, observer: [Object], m: [Object]   

FilterObservable:

FilterObservable 
  source: 
   RefCountObservable 
     source: 
      ConnectableObservable 
        source: [Object],
        _connection: [Object],
        _source: [Object],
        _subject: [Object] ,
     _count: 1,
     _connectableSubscription: ConnectDisposable  _p: [Object], _s: [Object]  ,
  predicate: [Function] 

我需要取消订阅这些对象:

'use strict';
var rx = require('rx');

function normalizeKeypressEvents(value, key) 
  return value: value, key: key || ;


module.exports = function (rl) 

  var keypress = rx.Observable.fromEvent(rl.input, 'keypress', normalizeKeypressEvents)
    .filter(function (e) 
      // Ignore `enter` key. On the readline, we only care about the `line` event.
      return e.key.name !== 'enter' && e.key.name !== 'return';
    );

  return 
    line: rx.Observable.fromEvent(rl, 'line'),

    keypress: keypress,

    normalizedLeftKey: keypress.filter(function (e) 
      return e.key.name === 'left';
    ).share(),

    normalizedRightKey: keypress.filter(function (e) 
      return e.key.name === 'right';
    ).share(),

    normalizedUpKey: keypress.filter(function (e) 
      return e.key.name === 'up' || e.key.name === 'k' || (e.key.name === 'p' && e.key.ctrl);
    ).share(),

    normalizedDownKey: keypress.filter(function (e) 
      return e.key.name === 'down' || e.key.name === 'j' || (e.key.name === 'n' && e.key.ctrl);
    ).share(),

    numberKey: keypress.filter(function (e) 
      return e.value && '123456789'.indexOf(e.value) >= 0;
    ).map(function (e) 
      return Number(e.value);
    ).share(),

    spaceKey: keypress.filter(function (e) 
      return e.key && e.key.name === 'space';
    ).share(),

    aKey: keypress.filter(function (e) 
      return e.key && e.key.name === 'a';
    ).share(),

    iKey: keypress.filter(function (e) 
      return e.key && e.key.name === 'i';
    ).share()
  ;
;

我目前最好的猜测是,没有像这样显式调用订阅:

var source = Rx.Observable.fromEvent(input, 'click');

var subscription = source.subscribe(
  function (x) 
    console.log('Next: Clicked!');
  ,
  function (err) 
    console.log('Error: %s', err);
  ,
  function () 
    console.log('Completed');
  );

但是,有这些调用:

events.normalizedUpKey.takeUntil(validation.success).forEach(this.onUpKey.bind(this));
events.normalizedDownKey.takeUntil(validation.success).forEach(this.onDownKey.bind(this));

所以我最好的猜测是我需要一种方法来取消/取消 takeUntil 调用。

【问题讨论】:

【参考方案1】:

没有必要也没有协议可以取消订阅 observable。实际上,我在您的问题中看到了代码,尤其是返回对象的一部分,其中包含了一堆由 share 组成的 observables。但是,这些 observables 仍然是 observables,而不是订阅,这意味着对于这些元素没有称为 unsubscribing 的概念。

因此,如果您在模块外部有一些新的类似订阅的代码,并且与事件 observables 完全一致,您显然可以取消订阅特定的订阅实例。

目前对于问题中的代码,在源observable上使用的方法都是操作符,比如.filter().share().takeUntil(),而不是订阅执行,实际上是方法返回新的observable。

如Rxjs官方文档所述,虽然.share()创建multicasted observables, 如果订阅者数量从1 减少到0 时,执行仍然可能会停止,而使用一些方便的运算符,其中代码中的.share() 也完全包含在内。

总之,无需担心在您的问题中取消订阅您的代码的事件。可能确实还有一个问题,听起来像您的问题中描述的问题:如果您使用.connect() 而不是.share()。就是ConnectableObservable的情况,需要手动取消事件绑定。

【讨论】:

【参考方案2】:

我同意第一个评论者所说的话。 但是感觉代码中的某处需要有这样的调用:

let subscription = normalizedUpKey.subscribe( data => console.log('data') );

你能做到的

subscription.unsubscribe()

开。当某事发生时,您怎么知道,或者那是 3rd 方库的一部分?

阅读更多关于 Rxjs 的信息。我建议你看看这本免费的书,https://www.gitbook.com/book/chrisnoring/rxjs-5-ultimate/details

【讨论】:

【参考方案3】:

如果您想取消订阅,您需要拥有Subscription 对象。这是每个Observable.subscribe() 调用返回的对象。例如:

let subscriber = Observable.subscribe(...);
...
subscriber.unsubscribe();

欲了解更多信息,请参阅:https://github.com/ReactiveX/rxjs/blob/master/doc/subscription.md

【讨论】:

是的,您能判断该调用是否在上述任何代码内部发生吗?很明显,我正在使用的代码有效地订阅了可观察的事件,我只是没有在代码中看到任何明确的 subscribe() 调用...... 例如,看起来 rx.Observable.fromEvent() 在这里工作,但我不知道如何停止它! 这有点帮助 => github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/… 我认为要获得可接受的答案,您必须帮助我撤消 takeUntil 调用 :)

以上是关于取消订阅 RxJS Observables的主要内容,如果未能解决你的问题,请参考以下文章

任何需要先取消订阅 RxJS()

取消 rxjs 中的 http 订阅

RxJS 迁移 5 到 6 - 使用 TakeUntil 取消订阅

取消订阅 Rxjs Observables

RxJS迁移5到6 - 取消订阅TakeUntil

Angular + RxJS:使用 takeUntil 与简单取消订阅?