RxJS-Observable设计思想中运用的设计模式

Posted Brian Huang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJS-Observable设计思想中运用的设计模式相关的知识,希望对你有一定的参考价值。

RxJS 是一个库,它通过使用Observable序列来编写异步和基于事件的程序。其中Observable的设计主要运用到的设计模式有观察者模式(Observer pattern )和迭代器模式(Iterator pattern)。

1.观察者模式(Observer pattern)

1.1 什么是观察者模式?

观察者模式又叫发布订阅模式(Publish/Subscribe),它是一种一对多的关系,让多个观察者(Observer)同时监听一个主题(Subject),这个主题也就是被观察者(Observable),被观察者的状态发生变化时就会通知所有的观察者,使得它们能够接收到更新的内容。 

1.2 观察者模式主要是为了解决什么问题?

1)定义对象之间的一对多依赖关系而不使对象紧密耦合。
2)确保当一个对象改变状态时,自动更新开放数量的从属对象。
3)一个对象应该可以通知开放式数量的其他对象。

1.3 RxJS的Observable中观察模式实现源码 ("rxjs": "~6.4.0" (TypeScript))

Observable 与 Observer 之间的订阅发布关系(观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。

1.3.1 Observable源码

// 基于源代码有删减
/**
 *表示任意时间内的任意一组值。这是RxJS最基本的构件
 *
 * @class Observable<T>
 */
export declare class Observable<T> implements Subscribable<T> {
    
    /**
     * @constructor
     * @param {Function} 订阅当被初始订阅时被调用的函数。这个函数有一个订阅者,它有新的值
     *可以\'next\',也可以调用\'error\'方法来引发错误,或者\'complete\'可以被调用来通知成功完成。
     */
    constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic);
    /**
     * 通过调用Observable构造函数创建一个新的cold Observable
     * @static true
     * @owner Observable
     * @method create
     * @param {Function} subscribe? 要传递给Observable构造函数的订阅函数
     * @return {Observable} a new cold observable
     * @nocollapse
     * @deprecated use new Observable() instead
     */
    static create: Function;
    /**
     * 创建一个新的可观察对象,将这个可观察对象作为源,并将传递的操作符定义为新可观察对象的操作符。
     * @method lift
     * @param {Operator} operator the operator defining the operation to take on the observable
     * @return {Observable} a new observable with the Operator applied
     */
    lift<R>(operator: Operator<T, R>): Observable<R>;
    
    /** @deprecated Use an observer instead of a complete callback */
    subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription;
    
    /** @deprecated This is an internal implementation detail, do not use. */
    _trySubscribe(sink: Subscriber<T>): TeardownLogic;
    /**
     * @method forEach
     * @param {Function} next a handler for each value emitted by the observable
     * @param {PromiseConstructor} [promiseCtor] a constructor function used to instantiate the Promise
     * @return {Promise} a promise that either resolves on observable completion or
     *  rejects with the handled error
     */
    forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void>;
    /** @internal This is an internal implementation detail, do not use. */
    _subscribe(subscriber: Subscriber<any>): TeardownLogic;
    //省略重载的方法
    pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
    //省略重载的方法
    toPromise<T>(this: Observable<T>): Promise<T>;
   
}
// 基于源代码有删减
export interface Subscribable<T> {
    
    /** @deprecated Use an observer instead of a complete callback */
    subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable;
    //省略重载的方法
}

1.3.2  Subject

// 基于源代码有删减
/*
* * Subject 是一个特殊类型的可观察对象,允许值被多播给许多观察者。主题就像事件发射器。 * * 每一个Subject都是可观察的(Observable)和观察者(Observer)。您可以订阅一个主题,还可以调用next来获取提要值以及error和complete。 * * @class Subject<T> */ export declare class Subject<T> extends Observable<T> implements SubscriptionLike { observers: Observer<T>[]; closed: boolean; isStopped: boolean; hasError: boolean; thrownError: any; constructor(); /**@nocollapse * @deprecated use new Subject() instead */ static create: Function; lift<R>(operator: Operator<T, R>): Observable<R>; next(value?: T): void; error(err: any): void; complete(): void; unsubscribe(): void; /** @deprecated This is an internal implementation detail, do not use. */ _trySubscribe(subscriber: Subscriber<T>): TeardownLogic; /** @deprecated This is an internal implementation detail, do not use. */ _subscribe(subscriber: Subscriber<T>): Subscription; /** * 创建一个以这个主题为源的新观察对象。您可以这样做来创建主题的自定义observer逻辑,并对使用Observable的代码隐藏它。code that uses the Observable. * @return {Observable} Observable that the Subject casts to */ asObservable(): Observable<T>; } /** SUBSCRIPTION INTERFACES */ export interface Unsubscribable { unsubscribe(): void; } export interface SubscriptionLike extends Unsubscribable { unsubscribe(): void; readonly closed: boolean; }

1.3.3 Observer

export interface Observer<T> {
    closed?: boolean;
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

Observable 与 Observer 之间的订阅发布关系(观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。 

查看Subject源代码,我们发现Subject既可以作为Observable又可以作为Observer, 在angular项目里面,组件之间的异步通讯可以使用这一特性去实现。

1.4 观察者模式一些优秀的框架的应用场景:Zookeeper事件通知节点、Spring事件驱动、消息订阅通知、分布式配置中心等等。

2. 迭代器模式(Iterator pattern)

2.1 什么是迭代器模式?

 迭代器(Iterator)模式又叫游标(Sursor)模式,迭代器具有 next 方法,可以顺序访问一个聚合对象中的各个元素,而不需要暴露该对象的内部表现。

2.2 迭代器模式解决了什么问题?

 迭代器模式可以把迭代的过程从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即使不了解对象的内部构造,也可以通过迭代器提供的方法顺序访问其每个元素。

 2.3 RxJS的Observable中迭代器模式实现源码

在 RxJS 中,Observer 除了有 next 方法来接收 Observable 的事件外,还可以提供了另外的两个方法:error() 和 complete(),与迭代器模式一一对应。

export interface Observer<T> {
    closed?: boolean;
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

// 结合迭代器Iterator进行理解:
next() => Observer 提供一个 next 方法来接收 Observable 流,是一种 push 形式;而 Iterator 是通过调用 iterator.next() 来拿到值,是一种 pull 的形式。
complete() => 当不再有新的值发出时,将触发 Observer 的 complete 方法;而在 Iterator 中,则需要在 next 的返回结果中,当返回元素 done 为 true 时,则表示 complete。
error() => 当在处理事件中出现异常报错时,Observer 提供 error 方法来接收错误进行统一处理;Iterator 则需要进行 try catch 包裹来处理可能出现的错误。

 2.4 基于ES6实现Iterator 

"use strict";

class Iterator {
    index = 0;
    list = [];
    constructor(container){
      this.list = container.list;
      this.index = 0;
    }
  
    next() {
      if(this.hasNext()){
        return {
          value: this.list[this.index++],
          done: false
        }
      }
      return {value: null,done: true}
    }
  
    hasNext() {
      if(this.index >= this.list.length){
        return false;
      }
      return true;
    }
  }
  
  
  class Container {
    list = [];
    constructor(list) {
      this.list = list;
    }
  
    getIterator() {
      return new Iterator(this);
    }
  }

  let container = new Container([1,2,3,4,5]);
  let iter = container.getIterator();

  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());
  console.log(iter.next());

执行结果:

 通过上边的示例代码我们可以得知,我们不了解对象的内部构造,但是可以通过调用迭代器提供的 next() 方法就能顺序访问其每个元素。

3.那么基于观察者模式+迭代器模式的组合是什么?

Observable 与 Observer实现观察者 + 迭代器模式,数据的逐渐传递,传递与影响其实就是流的表现。RxJS 提供 of 的方法来自定义创建一个 Observable,可以使用 next 来发出流。

import { of } from \'rxjs\';

of(10, 20, 30)
.subscribe(
  next => console.log(\'next:\', next),
  err => console.log(\'error:\', err),
  () => console.log(\'the end\'),
);
// result:
// \'next: 10\'
// \'next: 20\'
// \'next: 30\'

以上全部就是对RxJS中Observable运用的设计的模式的分析,参考来源:

1. RxJS -API List https://rxjs.dev/api

2. 从观察者模式到迭代器模式系统讲解 RxJS Observable(一) https://www.ucloud.cn/yun/104556.html

3. Rx.js实现原理浅析  https://www.cnblogs.com/tangzhirong/p/7424777.html

以上是关于RxJS-Observable设计思想中运用的设计模式的主要内容,如果未能解决你的问题,请参考以下文章

为啥我的图像出现在 Android Studio 设计视图中,但在手机中运行时却没有?

在代码中运行使用设计器制作的 QTabWidget

粗浅看 Tomcat中设计模式分析

佰新网络科技广州分公司如何在设计中运用好重复的手法

高效延时消息设计与实现的场景

一种光学指纹识别系统的设计方案