RXJS:向 Observable 添加一个函数以在订阅时执行(延迟)

Posted

技术标签:

【中文标题】RXJS:向 Observable 添加一个函数以在订阅时执行(延迟)【英文标题】:RXJS: Adding a function to Observable to execute when subscribed to (defer) 【发布时间】:2018-04-03 12:55:09 【问题描述】:

向 Observable 添加一个函数,以便在订阅时执行(延迟)

我有一个由事件组成的 Observable。在这种情况下,蓝牙通知。

我只想在有人订阅该 Observable 时运行一个函数 (startNotifictions)。

此代码在以前的版本中确实有效。它与 Ionic3 框架一起使用。它添加了一个新的运算符,在订阅时运行。现在转译器遇到了类型问题,抱怨了两次,即 .doOnSubscribe 在 typedef Observable any> 和 上不可用。

有人知道如何正确输入吗?可以延长吗? 试过直接用.defer,没用。

 // add operator doOnSubscribe to the event observable
        Observable.prototype.doOnSubscribe = function(onSubscribe) 
            let source = this;
            return Observable.defer(() => 
                onSubscribe();
                return source;
            );
        ;

        // return the Observable for the notify char, with startNotify on first subscribe
        getUartDataNote( Observable.fromEvent( this.uartChar, 'characteristicvaluechanged' )
            .doOnSubscribe(() => 
                console.log('starting note');
                this.uartChar.startNotifications();
            )
            .map( value => String.fromCharCode.apply( null, new Uint8Array( this.uartChar.value.buffer )))
            .takeUntil( Observable.fromEvent( this.gatt.device, 'gattserverdisconnected' ))
            .finally(() => 
                console.log( 'stream disconnected ');
                // not necessary: return this.uartChar.stopNotifications()
            )
            .share()
        );

【问题讨论】:

【参考方案1】:

如果您使用的是rxjs 版本 5,则可以使用纯函数实现 doOnSubscribe,而不是修补原型。这样你就不需要类型扩充了。

import defer from 'rxjs/observable/defer';
import Observable from 'rxjs/Observable';

/** Example
import from from 'rxjs/observable/from';

from([1, 2, 3])
    .pipe(doOnSubscribe(() => console.log('subscribed to stream')))
    .subscribe(x => console.log(x), null, () => console.log('completed'));
*/

export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) =>  Observable<T> 
    return function inner(source: Observable<T>): Observable<T> 
        return defer(() => 
          onSubscribe();

          return source;
        );
    ;

https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d

【讨论】:

【参考方案2】:

这是你编写类型扩充的方法。

export 

declare module 'rxjs/Observable' 
  interface Observable<T> 
    doOnSubscribe(onSubscribe: () => void): this;
  

TypeScript 手册的Declaration Merging 部分对此进行了说明。

【讨论】:

: this 让我大吃一惊。能否请您指出正在描述它的文件?我以前从未见过这样的东西。 @IgorSoloydenko 它是在 TypeScript 2.0 时间范围内添加的。它声明该方法返回接收者。它在类型级别启用部分高阶多态性。例如,如果您有一个构建器对象,那么使用它是有意义的。我只在这里使用它,因为它根据实现是正确的,并且比必须导入所有类型并重新声明泛型更简洁 @JuergenKienhoefer 感谢您提供链接,我已将其添加到答案中。

以上是关于RXJS:向 Observable 添加一个函数以在订阅时执行(延迟)的主要内容,如果未能解决你的问题,请参考以下文章

RxJs Observable:如果为空/过滤则执行函数

#RXJS# 基础

ionic - RXJS 错误:rxjs_Observable__.Observable.combineLatest 不是函数

Rxjs笔记三:Observer的简写形式以及退订Observable

Angular2 RxJS 得到“Observable_1.Observable.fromEvent 不是函数”错误

RxJs map函数导致上游observable被多次调用