响应式编程和Rxjs库介绍

Posted 前端手艺人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了响应式编程和Rxjs库介绍相关的知识,希望对你有一定的参考价值。

为什么要用响应式编程?

整合了Promise, 回调, 事件回调,Web Workers,Web Sockets

适用于强数据驱动,异步,动画等应用

什么是响应式编程?

谈响应式编程前,我们先纵向拓展一下。我们写一个程序的时候,我们的思维模式是怎么样的。

这里举一个例子: 假设我们到肯德基买东西吃。买了东西,一般给个收据。我们怎么实现这个收据对象。

设计API

收据对象:可能会有print方法,打印出我们的总消费金额。

计算器:还需要一个计算器。我们的收据对象需要知道多少钱,我们可能需要计算器去算。

实现

一种可能的实现

// 收据对象

class Receipt {

   constructor(calculator) {

       this.calc = calculator;

   }

   print(itemA, itemB) {

       const total = this.calc.sum(itemA, itemB);

       console.log(`total receipt £${total}`);

   }

}

// 计算器

class Calculator {

   sum(...items) {

       var result = 0;

       for(var i=0; i<items.length;i++) {

               result += items[i];

       }

       return result;

   }

}

// 使用

const pizza = 6.00;

const beer = 5.00;

const calc = new Calculator();

const receipt = new Receipt(calc);

receipt.print(pizza, beer);

Q:  宿主对象,这里的收据对象,依赖了计算器,并且需要知道使用对象计算器的API,也就是这里的sum。  这个其实是传统的依赖注入方式: 本质上都是函数参数的形式引入我们依赖的对象。 在javascript里面我们也经常这么干,依赖一个回调函数。

Q: sum方法简单的for循环。这里我们用i精确地记录了每一个步骤。而且使用的时候我们调用print,这种方式是命令式编程。

优化1

// 收据对象

class Receipt {

   constructor() {

   }

   print(total) {

       console.log(`total receipt £${total}`);

   }

}

// 计算器

class Calculator {

   sum(...items) {

       var result = 0;

       u._$forEach(items, function(i) {

           result += i;

       });

       return result;

   }

}

// 使用

const pizza = 6.00;

const beer = 5.00;

const calc = new Calculator();

const receipt = new Receipt();

receipt.print(calc.sum(pizza, beer));

收据对象内部不依赖计算器了,通过外部传递进去; 不关心变量i了,这是一种声明式编程,就是告诉你要做什么,不关系具体怎么做。 题外话:函数式编程是声明式编程的一种,但它不仅仅是声明式,它还有其他概念:比如纯函数(不可变数据结构:path copying; 结构共享),curry化等。代码中应尽量采用函数式的方式编写。

优化2

class Calculator {

   constructor(...items) {

       return Rx.Observable.from(items).reduce((acc, item) => (acc + item));

   }

}

class Receipt {

   constructor(observable$) {

       observable$.subscribe(value => console.log(`total receipt: £${value}`))

   }

}

const pizza = 6.00;

const beer = 5.00;

const calc = new Calculator([pizza, beer]);

const receipt = new Receipt(calc);

以上是采用响应式编程的方式。数据就像流水一样被传递。 任何东西都可以变为流。

原理

迭代器模式+观察者模式的结合,涉及两个主要对象:observable(被观察者:报纸), observer(观察者:订阅者) observable通过调用observer的next,complete,error三个方法通知消息给观察者。

迭代器模式+观察者模式的结合,涉及两个主要对象:

observable(被观察者:报纸), observer(观察者:订阅者)

observable通过调用observer的next,complete,error三个方法通知消息给观察者。

每次你创建一个Observable,相当于创建一份空白报纸。报纸的数据源,就是由传递给Observable的函数提供(如果是通过操作符创建Observable,也可以认为传递进去的参数就是数据源)。

数据源可以被操作符操作,然后数据就像流水一般被传递和处理。

最终流给每一个订阅者。

简单实现:

// Observable

function Observable (observer) {

  observer.next('传递数据给观察者');

}

// Observer

var Observer = {

   next(value) {

       alert(`收到${value}`);

   }

};

// 订阅

Observable(Observer);

实现示例1:参照rxjs的用法

把构造方法里面的抽取一个函数,让外部传进来;观察者通过subscribe方法传入。 然后调用fn,把观察者作为参数。这样消息就传给观察者了。

// Observable

function Observable(fn) {

  this.fn = fn;

}

Observable.prototype.subscribe = function(observer) {

   this.fn(observer);

}

// Observer

var Observer = {

   next(value) {

       alert(`收到${value}`);

   }

};

new Observable(function(observer2) {

   observer.next('123');

}).subsrcibe(Observer);

实现示例2:操作符of用法简单实现

// Observable

function Observable(fn) {

  this.fn = fn;

}

Observable.of = function() {

   var args = [].slice.call(arguments);

   return new Observable(function(observer) {

       u._$forEach(args, function(i) {

           observer.next(i);

       });

   })

}

Observable.prototype.subscribe = function(observer) {

   this.fn(observer);

}

// Observer

var Observer = {

   next(value) {

       alert(`收到${value}`);

   }

};

Observable.of(1,2,3).subscribe(Observer);

操作符本质上就是函数,会返回一个新的Observable对象。rxjs5的时候,一般我们引用一个操作符的时候,会引用一个文件,它会把它挂在的原型上,以便链式调用。但rxjs6新增加了一个pipe函数,它接收一系列操作符,可以避免打包的时候额外依赖了一些无用的文件。

注意点:

Q:实现示例1中的observer2是不是传进去的observer?

事实上的实现是另外一个对象,一个代理或者叫安全对象,它也有next等api。为什么要弄一个一个新的? 主要原因有很多: + 我们的observer对象不一定都有next, error, complete方法,当然observer也可能是一个函数。 + 希望调用complete后或者取消订阅后,再调用next什么事也不要发生 + 异常处理

Q:订阅这个过程是有生命周期的,比如订阅开始,取消订阅等等,需要管理这个生命周期。

真实的实现会用一个subscription对象,用来管理生命周期。subscribe方法的返回是一个subscription对象,可以取消订阅。

observable, proxyobserver(observer2),subscription三者的关系:

Q: complete方法和unsubscribe的区别

complete执行后,所有的订阅者都不会接收到消息;unsubscribe只是说你这个观察者不再接手消息了。

Rxjs中还有一个Subject对象,这个对象既是一种Observable,也是一种Observer。

Q: 链式调用是如何实现的

因为操作符会返回一个新的observable对象(见实现示例2),所以链式调用是通过原型链实现的。

Q:Promise和Obserable的区别

Promise只能resolve一个值,但是Observable的subscribe方法接收到多个值。 也就说subscribe函数有点特殊,一般调用一个函数只会有一个输出,但是它有多个。

MVVM框架中如何应用响应式编程

架构和设计的区别:

架构指的是各个元素(model view) 之间是如何交互的;

设计指的是采用某个库,算法,设计模式。

响应式编程是一种设计。如何在我们的MVVM框架中应用我们的这种设计。

1 将事件回调函数转为事件流

var result = document.getElementById('result');

var source = Rx.Observable.fromEvent(document, 'mousemove');

var subscription = source.subscribe(e => result.innerhtml = e.clientX + ', ' + e.clientY);

2 将http请求转为流

var observable1$ = Rx.Observable.create(function(observer) {

 this.cache.getList({

   onload: function(data) {

     observer.next(data);

     observer.complete();

   },

   onerror: function(error) {

     observer.error(error);

   }

 });

});

var observable2$ = Rx.Observable.create(function(observer) {

 request("xxx")

   .then(function(data) {

     observer.next(data);

     observer.complete();

   })

   .catch(function(error) {

     observer.error(error);

   });

});

var subscription = merge(observable1$, observable2$).subscribe(function(

 result

) {

 console.log(result);

});

可以结合一些操作符,实现多个http请求并行,或者一个请求先执行,然后第二个请求等等。

3 更新视图之前操作数据

4 多个对象之间通过流传递数据

父子组件之间:传统的方式是通过$watch,现在多了一种方式流。 它有一个Backpressure背压概念:比如我们父组件每1s产生一个数据,子组件每1s就要更新一次UI,如果我们不做任何控制的话。但是流可以控制这个东西,再父组件控制下一下,每隔5s返回一下数据。debounce or throttle or sampleTime。另外就是不需要的数据能过滤一下。

5 promise方式改为observable方式 promise方式:

function _$isWechatMiniProgram(cb1, cb2) {

   if (wx && wx.miniProgram && wx.miniProgram.getEnv) {

       new Promise(function(resolve, reject) {

           wx.miniProgram.getEnv(function(res) {

               if (res.miniprogram) {

                   resolve();

               } else {

                   reject();

               }

           })

       }._$bind(this)).then(function() {

           cb1 && cb1();

       }).catch(function(e) {

           cb2 && cb2();

       });

   } else {

       cb2 && cb2();

   }

}

_$isWechatMiniProgram(function(){}, function(){})

observable方式:

function _$isWechatMiniProgram() {

   if (wx && wx.miniProgram && wx.miniProgram.getEnv) {

       return Rx.Observable.create(function(observer) {

           wx.miniProgram.getEnv(function(res) {

               observer.next(res.miniprogram);

           });

       })

   }

}

_$isWechatMiniProgram().subscribe(functon(isMiniProgram) {

   if(isMiniProgram) {

       cb1();

   }else {

       cb2();

   }

});

还有一种方式,通过forEach操作符:

function getData() {

 return Rx.Observable.create(function() {

       fetch()

 });

}

async function execute() {

 await getData().forEach(v => console.log(v));

 console.log('finish');

}

execute();

现有的实现框架

rxjs

实现

请参看我的一个实现XingMXTeam/sinnple-rx

关注知乎账户:毛毛星 获取更多内容

以上是关于响应式编程和Rxjs库介绍的主要内容,如果未能解决你的问题,请参考以下文章

RxJS入门之函数响应式编程

作为前端,你需要知道 RxJS(响应式编程-流)

rxjs学习响应式编程理解

响应式编程实战——新版RxJS实现真正双击事件流

前端响应式编程与实时计算:从 RxJS 到 Flink

响应式编程实战——RxJS 手动停止事件流的正确方式