一起来看 rxjs

Posted 阿里云云栖社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一起来看 rxjs相关的知识,希望对你有一定的参考价值。

更新日志

  • 2018-05-26 校正
  • 2016-12-03 第一版翻译

过去你错过的 Reactive Programming 的简介

你好奇于这名为Reactive Programming(反应式编程)的新事物, 更确切地说,你想了解它各种不同的实现(比如 [Rx*], [Bacon.js], RAC 以及其它各种各样的框架或库)

学习它比较困难, 因为比较缺好的学习材料(译者注: 原文写就时, RxJs 还在 v4 版本, 彼时社区对 RxJs 的探索还不够完善). 我在开始学习的时候, 试图找过教程, 不过能找到的实践指南屈指可数, 而且这些教程只不过隔靴搔痒, 并不能帮助你做真正了解 RxJs 的基本概念. 如果你想要理解其中一些函数, 往往代码库自带的文档帮不到你. 说白了, 你能一下看懂下面这种文档么:

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])

按照将元素的索引合并的方法, 把一个 "observable 队列 " 中的作为一个新的队列加入到 "observable 队列的队列" 中, 然后把 "observable 队列的队列" 中的一个 "observable 队列" 转换成一个 "仅从最近的 ‘observable 队列‘ 产生的值构成的一个新队列."

这是都是什么鬼?

我读了两本书, 一本只是画了个大致的蓝图, 另一本则是一节一节教你 "如何使用 Reactive Libarary" . 最后我以一种艰难的方式来学习 Reactive Programming: 一遍写, 一遍理解. 在我就职于 Futurice 的时候, 我第一次在一个真实的项目中使用它, 我在遇到问题时, 得到了来自同事的支持.

学习中最困难的地方是 以 Reactive(反应式) 的方式思考. 这意思就是, 放下你以往熟悉的编程中的命令式和状态化思维习惯, 鼓励自己以一种不同的范式去思考. 至今我还没在网上找到任何这方面的指南, 而我认为世界上应该有一个说明如何以 Reactive(反应式) 的方式思考的教程, 这样你才知道要如何开始使用它. 在阅读完本文后之后. 请继续阅读代码库自带的文档来指引你之后的学习. 我希望, 这篇文档对你有所帮助.

"什么是 Reactive Programming(反应式编程)?"

在网上可以找到大量对此糟糕的解释和定义. Wikipedia 的 意料之中地泛泛而谈和过于理论化. Stackoverflow 的 圣经般的答案也绝对不适合初学者. Reactive Manifesto 听起来就像是要给你公司的项目经理或者是老板看的东西. 微软的 Rx 术语 "Rx = Observables + LINQ + Schedulers" 也读起来太繁重, 太微软了, 以至于你看完后仍然一脸懵逼. 类似于 "reactive" 和 "propagation" 的术语传达出的含义给人感觉无异于你以前用过的 MV* 框架和趁手的语言已经做到的事情. 我们现有的框架视图当然是会对数据模型做出反应, 任何的变化当然也是要冒泡的. 要不然, 什么东西都不会被渲染出来嘛.

所以, 让我们撇开那些无用的说辞, 尝试去了解本质.

Reactive programming(反应式编程) 是在以异步数据流来编程

当然, 这也不是什么新东西. 事件总线或者是典型的点击事件确实就是异步事件流, 你可以对其进行 observe(观察) 或者做些别的事情. 不过, Reactive 是比之更优秀的思维模型. 你能够创建任何事物的数据流, 而不只是从点击和悬浮事件中. "流" 是普遍存在的, 一切都可能是流: 变量, 用户输入, 属性, 缓存, 数据结构等等. 比如, 想象你的 Twitter 时间线会成为点击事件同样形式的数据流.

熟练掌握该思维模型之后, 你还会接触到一个令人惊喜的函数集, 其中包含对任何的数据流进行合并、创建或者从中筛选数据的工具. 它充分展现了 "函数式" 的魅力所在. 一个流可以作为另一个流的输入. 甚至多个流可以作为另一个流的输入. 你可以合并两个流. 你可以筛选出一个仅包含你需要的数据的另一个流. 你可以从一个流映射数据值到另一个流.

让我们基于 "流是 Reactive 的中心" 这个设想, 来细致地做看一下整个思维模型, 就从我们熟知的 "点击一个按钮" 事件流开始.

技术分享图片

每个流是一个按时序不间断的事件序列. 它可能派发出三个东西: (某种类型的)一个数值, 一个错误, 或者一个 "完成" 信号. 说到 "完成" , 举个例子, 当包含了这个按钮的当前窗口/视图关闭时, 也就是 "完成" 信号发生时.

我们仅能异步地捕捉到这些事件: 通过定义三种函数, 分别用来捕捉派发出的数值、错误以及 "完成" 信号. 有时候后两者可以被忽略, 你只需定义用来捕捉数值的函数. 我们把对流的 "侦听" 称为订阅(subscribing), 我们定义的这三种函数合起来就是观察者, 流则是被观察的主体(或者叫"被观察者"). 这正是设计模式中的观察者模式.

描述这种方式的另一种方式用 ASCII 字符来画个导图, 在本教程的后续的部分也能看到这种图形.

--a---b-c---d---X---|->

a, b, c, d 代表被派发出的值
X 代表错误
| 代表"完成"信号
---> 则是时间线

这些都是是老生常谈了, 为了不让你感到无聊, 现在来点新鲜的东西: 我们将原生的点击事件流进行变换, 来创建新的点击事件流.

首先, 我们做一个计数流, 来指明一个按钮被点击了多少次. 在一般的 Reactive 库中, 每个流都附带了许多诸如mapfilterscan 等的方法. 当你调用这些方法之一(比如比如clickStream.map(f))时, 它返回一个基于 clickStream 的新的流. 它没有对原生的点击事件做任何修改. 这种(不对原有流作任何修改的)特性叫做immutability(不可变性), 而它和 Reactive(反应式) 这个概念的契合度之高好比班戟和糖浆(译者注: 班戟就是薄煎饼, 该称呼多见于中国广东地区. 此句意为 immutability 与 Reactive 两个概念高度契合). 这样的流允许我们进行链式调用, 比如clickStream.map(f).scan(g):

  clickStream: ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

map(f) 方法根据你提供的函数f替换每个被派发的元素形成一个新的流. 在上例中, 我们将每次点击都映射为计数 1. scan(g) 方法则在内部运行x = g(accumulated, current), 以某种方式连续聚合处理该流之前所有的值, 在该例子中, g 就是简单的加法. 然后, 一次点击发生时, counterStream 就派发一个点击数的总值.

为了展示 Reactive 真正的能力, 我们假设你想要做一个 "双击事件" 的流. 或者更厉害的, 我们假设我们想要得到一个 "三击事件流" , 甚至推广到更普遍的情况, "多击流". 现在, 深呼吸, 想象一下按照传统的命令式和状态化思维习惯要如何完成这项工作? 我敢说那会烦死你了, 它必须包含各种各样用来保持状态的变量, 以及一些对周期性工作的处理.

然而, 以 Reactive 的方式, 它会非常简单. 事实上, 这个逻辑只不过是四行代码. 不过让我们现在忘掉代码.无论你是个初学者还是专家, 借助导图来思考, 才是理解和构建流最好的方法.

技术分享图片

图中的灰色方框是将一个流转换成另一个流的方法. 首先, 每经过 "250毫秒" 的 "事件静默" (简单地说, 这是在 buffer(stream.throttle(250ms)) 完成的. (现在先)不必担心对这点的细节的理解, 我们主要是演示下 Reactive 的能力.), 我们就得到了一个 "点击动作" 的列表, 即, 转换的结果是一个列表的流, 而从这个流中我们应用 map() 将每个列表映射成对应该队列的长度的整数值. 最后, 我们使用 filter(x >= 2) 方法忽略掉所有的 1. 如上: 这 3 步操作将产生我们期望的流. 我们之后可以订阅("侦听")它, 并按我们希望的处理方式处理流中的数据.

我希望你感受到了这种方式的美妙. 这个例子只是一次不过揭示了冰山一角: 你可以将相同的操作应用到不同种类的流上, 比如 API 返回的流中. 除此以外, 还有许多有效的函数.

"为什么我应该采用反应式编程?"

Reactive Programming (反应式编程) 提升了你代码的抽象层次, 你可以更多地关注用于定义业务逻辑的事件之间的互相依赖, 而不必写大量的细节代码来处理事件. RP(反应式编程)的代码会更简洁明了.

在现代网页应用和移动应用中, 这种好处是显而易见的, 这些场景下, 与数据事件关联的大量 UI 事件需要被高频地交互. 10 年前, 和 web 页面的交互只是很基础地提交一个长长的表单给后端, 然后执行一次简单的重新渲染. 在这 10 年间, App 逐渐变得更有实时性: 修改表单中的单个字段能够自动触发一次到后端的保存动作, 对某个内容的 "点赞" 需要实时反馈到其他相关的用户......

现今的 App 有大量的实时事件, 它们共同作用, 以带给用户良好的体验. 我们要能简洁处理这些事件的工具, 而 Reactive Programming 方式我们想要的.

举例说明如何以反应式编程的方式思考

现在我们进入到实战. 一个真实的手把手教你如何以 RP(反应式编程) 的方式来思考的例子. 注意这里不是随处抄来的例子, 不是半吊子解释的概念. 到这篇教程结束为止, 我们会在写出真正的功能性代码的同时, 理解我们做的每个动作.

我选择了 javascript 和 RxJS 作为工具, 原因是, JavaScript 是当下最为人熟知的语言, 而 [Rx*] 支持多数语言和平台 (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等). , 无论你的工具是什么, 你可以从这篇教程中收益.

实现一个"建议关注"盒子

在 Twitter 上, 有一个 UI 元素是建议你可以关注的其它账户.

技术分享图片

我们将着重讲解如何模仿出它的核心特性:

  • 在页面启动时, 从 API 中加载账户数据, 并展示三个推荐关注者
  • 在点击"刷新"时, 加载另外的三个推荐关注的账户, 形成新三行
  • 在点击一个账户的 "x" 按钮时, 清除该账户并展示一个新的
  • 每一行显示账户的头像和到他们主页的链接

我们可以忽略其它的特性和按钮, 它们都是次要的. 另外, Twitter 最近关闭了非认证请求接口, 作为替代, 我们使用 [Github 的 API] 来构建这个关注别人 UI.(注: 到本稿的最新的校正为止, github 的该接口对非认证用户启用了一段时间内访问频次限制)

如果你想尽早看一下完整的代码, 请点击[样例代码].

请求和回复

你如何用 Rx 处理这个问题?

首先, (几乎) 万物皆可为流 .这是 "Rx 口诀". 让我们从最容易的特性开始: "在页面启动时, 从 API 中加载账户数据". 这没什么难得, 只需要(1) 发一个请求, (2) 读取回复, (3) 渲染回复的中的数据. 所以我们直接把我们我们的请求当做流. 一开始就用流也许颇有"杀鸡焉用牛刀"的意味, 但为了理解, 我们需要从基本的例子开始.

在应用启动的时候, 我们只需要一个请求, 因此如果我们将它作为一个数据流, 它将会只有一个派发的值. 我们知道之后我们将有更多的请求, 但刚开始时只有一个.

--a------|->

其中 a 是字符串 ‘https://api.github.com/users‘

这是一个将请求的 URL 的流. 无论请求何时发生, 它会告诉我们两件事: 请求发生的时刻和内容. 请求执行之时就是事件派发之时, 请求的内容就是被派发的值: 一个 URL 字符串.

创建这样一个单值流对 [Rx*] 来说非常简单, 官方对于流的术语, 是 "Observable"(可被观察者), 顾名思义它是可被观察的, 但我觉得这名字有点傻, 所以我称呼它为 _流_.

var requestStream = Rx.Observable.just(‘https://api.github.com/users‘);

但现在, 这只是一个字符串流, 不包含其他操作, 所以我们需要要在值被派发的时候做一些事情. 这依靠对流的订阅.

requestStream.subscribe(function(requestUrl) {
  // 执行该请求
  jQuery.getJSON(requestUrl, function(responseData) {
    // ...
  });
}

注意我们使用了 jQuery Ajax 回调(我们假定你应已对此有了解)来处理请求操作的异步性. 但稍等, Rx 就是处理 异步 数据流的. 难道这个请求的回复不就是一个在未来某一刻会带回返回数据的流么? 从概念上讲, 它看起来就是的, 我们来尝试写一下.

requestStream.subscribe(function(requestUrl) {
  // 执行该请求
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .done(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .always(function() { observer.onCompleted(); });
  });

  responseStream.subscribe(function(response) {
    // 对回复做一些处理
  });
}

Rx.Observable.create() 所做的是自定义一个流, 这个流会通知其每个观察者(或者说其"订阅者" )有数据产生 (onNext()) 或发生了错误 (onError()). 我们需要做的仅仅是包装 jQuery Ajax Promise. 稍等, 这难道是说 Promise 也是一个 Observable?

 

是的. Observable 就是一个 Promise++ 对象. 在 Rx 中, 通过运行 var stream = Rx.Observable.fromPromise(promise) 你就可以把一个 Promise 转换成一个 Observable. 仅有的区别在于 Observables 不符合 Promises/A+ 标准, 但他们在概念上是不冲突的. 一个 Promise 就是一个仅派发一个值的 Observable. Rx 流就是允许多次返回值的 Promise.

这个例子很可以的, 它展示了 Observable 是如何至少有 Promise 的能力. 因此如果你喜欢 Promise, 请注意 Rx Observable 也可以做到同样的事.

现在回到我们的例子上, 也许你已经注意到了, 我们在一个中 subscribe() 调用了另一个 subscribe(), 这有点像回调地狱. 另外, responseStream 的创建也依赖于 requestStream. 但正如前文所述, 在 Rx 中有简单的机制来最流作变换并支持从其他流创建一个新的流, 接下来我们来做这件事.

到目前为止, 你应该知道的对流进行变换的一个基础方法是 map(f), 将 "流 A" 中的每一个元素作 f() 处理, 然后在 "流 B" 中生成一一对应的值. 如果我们这样处理我们的请求和回复流, 我们可以把请求 URL 映射到回复的 Promise (被当做是流) 中.

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

这下我们创建了一个叫做 元流 (流的流) 的奇怪的东西. 不必对此感到疑惑, 元流, 就是其中派发值是流的流. 你可以把它想象成 指针): 每个被派发的值都是对其它另一个流的 指针 . 在我们的例子中, 每个请求的 URL 都被映射为一个指针, 指向一个个包含 URL 对应的返回数据的 promise 流.

技术分享图片

这个元流看上去有点让人迷惑, 而且对我们根本没什么用. 我们只是想要一个简单的回复流, 其中每个派发的值都应是一个 JSON 对象, 而不是一个包含 JSON 对象的 Promise. 现在来认识 Flatmap: 它类似于 map(), 但它是把 "分支" 流中派发出的的每一项值在 "主干" 流中派发出来, 如此, 它就可以对元流进行扁平化处理.(译者注: 这里, "分支" 流指的是元流中每个被派发的值, "主干" 流是指这些值有序构成的流, 由于元流中的每个值都是流, 作者不得不用 "主干" 和 "分支" 这样的比喻来描述元流与其值的关系). 在此, Flatmap 并不是起到了"修正"的作用, 元流也并不是一个 bug, 相反, 它们正是 Rx 中处理异步回复流的工具.

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

技术分享图片

漂亮. 因为回复流是依据请求流定义的, 设想之后
有更多的发生在请求流中的事件, 不难想象, 就会有对应的发生在回复流中的的回复事件:

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(小写的是一个请求, 大写的是一个回复)

现在我们终于得到了回复流, 我们就可以渲染接收到的数据

responseStream.subscribe(function(response) {
  // 按你设想的方式渲染 `response` 为 DOM
});

整理一下到目前为止的代码, 如下:

var requestStream = Rx.Observable.just(‘https://api.github.com/users‘);

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // 按你设想的方式渲染 `response` 为 DOM
});

刷新按钮

现在我们注意到, 回复中的 JSON 是一个包含 100 个用户的列表. [Github 的 API] 只允许我们指定一页的偏移量, 而不能指定读取的一页中的项目数量, 所以我们只用到 3 个数据对象, 剩下的 97 个只能浪费掉. 我们暂时忽略这个问题, 之后我们会看到通过缓存回复来处理它.

每次刷新按钮被点击的时候, 请求流应该派发一个新的 URL, 因此我们会得到一个新的回复. 我们需要两样东西: 一个刷新按钮的点击事件流(口诀: 万物皆可成流), 并且我们需要改变请求流以依赖刷新点击流. 好在, RxJs 拥有从事件监听器产生 Observable 的工具.

var refreshButton = document.querySelector(‘.refresh‘);
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, ‘click‘);

既然刷新点击事件自身不带任何 API URL, 我们需要映射每次点击为一个实际的 URL. 现在我们将请求流改成刷新点击流, 这个流被映射为每次带有随机的偏移参数的、到 API 的请求.

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

如果我直接这样写, 也不做自动化测试, 那这段代码其实有个特性没实现. 即请求不会在页面加载完时发生, 只有当刷新按钮被点击的时候才会. 但其实, 两种行为我们都需要: 刷新按钮被点击的时候的请求, 或者是页面刚打开时的请求.

两种场景下需要不同的流:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

var startupRequestStream = Rx.Observable.just(‘https://api.github.com/users‘);

但我们如何才能"合并"这两者为同一个呢? 有一个 merge() 方法. 用导图来解释的话, 它看起来像是这样的.

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

那我们要做的事就变得很容易了:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

var startupRequestStream = Rx.Observable.just(‘https://api.github.com/users‘);

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

也有另外一种更干净的、不需要中间流的写法:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  })
  .merge(Rx.Observable.just(‘https://api.github.com/users‘));

甚至再短、再有可读性一点:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  })
  .startWith(‘https://api.github.com/users‘);

startWith() 会照你猜的那样去工作: 给流一个起点. 无论你的输入流是怎样的, 带 startWith(x) 的输出流总会以 x 作为起点. 但我这样做还不够 [DRY], 我把 API 字符串写了两次. 一种修正的做法是把 startWith() 用在 refreshClickStream 上, 这样可以从"模拟"在页面加载时一次刷新点击事件.

var requestStream = refreshClickStream.startWith(‘startup click‘)
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

漂亮. 如果你现在回头去看我说 "有个特性没实现" 的那一段, 你应该能看出那里的代码和这里的代码的区别仅仅是多了一个 startWith().

使用流来建立"3个推荐关注者"的模型

到现在为止, 我们只是写完了一个发生在回复流的 subscribe() 中的 推荐关注者 的 UI. 对于刷新按钮, 我们要解决一个问题: 一旦你点击了"刷新", 现在的三个推荐关注者仍然没有被清理. 新的推荐关注者只在请求内回复后才能拿到, 不过为了让 UI 看上去令人舒适, 我们需要在刷新按钮被点击的时候就清理当前的推荐关注者.

refreshClickStream.subscribe(function() {
  // 清理 3 个推荐关注者的 DOM 元素
});

稍等一下. 这样做不太好, 因为这样我们就有两个会影响到推荐关注者的 DOM 元素的 subscriber (另一个是 responseStream.subscribe()), 这听起来不符合 Separation of concerns. 还记得 Reactive 口诀吗?

技术分享图片

在 "万物皆可为流" 的指导下, 我们把推荐关注者构建为一个流, 其中每个派发出来的值都是一个包含了推荐关注人数据的 JSON 对象. 我们会对三个推荐关注者的数据分别做这件事. 像这样来写:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 从列表中随机获取一个用户
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

至于获取另外两个用户的流, 即 suggestion2Stream 和 suggestion3Stream, 只需要把 suggestion1Stream 复制一遍就行了. 这不够 [DRY], 不过对我们的教程而言, 这样能让我们的示例简单些, 同时我认为, 思考如何在这个场景下避免重复编写 suggestion[N]Stream 也是个好的思维练习, 就留给读者去考虑吧.

我们让渲染的过程发生在回复流的 subscribe() 中, 而是这样做:

suggestion1Stream.subscribe(function(suggestion) {
  // 渲染第 1 个推荐关注者
});

回想之前我们说的 "刷新的时候, 清理推荐关注者", 我们可以简单地将刷新单击事件映射为 "null" 数据(它代表当前的推荐关注者为空), 并且在 suggestion1Stream 做这项工作, 如下:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 从列表中随机获取一个用户
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

在渲染的时候, 我们把 null 解释为 "没有数据", 隐藏它的 UI 元素.

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // 隐藏第 1 个推荐关注者元素
  }
  else {
    // 显示第 1 个推荐关注者元素并渲染数据
  }
});

整个情景是这样的:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

其中 N 表示 null

(译者注: 注意, 当 refreshClickStream 产生新值, 即用户进行点击时, null 的产生总是立刻发生在 refreshClickStream 之后; 而 refreshClickStream => requestStream => responseStream, responseStream 中的值, 是发给 API 接口的异步请求的结果, 这个结果的产生往往会需要花一点时间, 必然在 null 之后, 因此可以达到 "为了让 UI 看上去令人舒适, 我们需要在刷新按钮被点击的时候就清理当前的推荐关注者" 的效果).

稍微完善一下, 我们会在页面启动的时候也会渲染 "空" 推荐关注人. 为此可以 startWith(null) 放在推荐关注人的流里:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 从列表中随机获取一个用户
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

最后我们得到的流:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

关闭推荐关注人, 并利用已缓存的回复数据

目前还有一个特性没有实现. 每个推荐关注人格子应该有它自己的 ‘x‘ 按钮来关闭它, 然后加载另一个数据来代替. 也许你的第一反应是, 用一种简单方法: 在点击关闭按钮的时候, 发起一个请求, 然后更新这个推荐人:

var close1Button = document.querySelector(‘.close1‘);
var close1ClickStream = Rx.Observable.fromEvent(close1Button, ‘click‘);
// close2Button 和 close3Button 重复此过程

var requestStream = refreshClickStream.startWith(‘startup click‘)
  .merge(close1ClickStream) // 把关闭按钮加在这里
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

然而这没不对. (由于 refreshClickStream 影响了所有的推荐人流, 所以)该过程会关闭并且重新加载_所有的_推荐关注人, 而不是仅更新我们想关掉的那一个. 这里有很多方式来解决这个问题, 为了玩点炫酷的, 我们会重用之前的回复数据中别的推荐人. API 返回的数据每页包含 100 个用户, 但我们每次只用到其中的 3 个, 所以我们有很多有效的刷新数据可以用, 没必要再请求新的.

再一次的, 让我们用流的思维来思考. 当一个 ‘close1‘点击事件发生的时候, 我们使用 responseStream中 最近被派发的 回复来从回复的用户列表中随机获取一个用户. 如下:

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

在 [Rx*] 中, 有一个合成器方法叫做 combineLatest, 似乎可以完成我们想做的事情. 它把两个流 A 和 B 作为其输入, 而当其中任何一个派发值的时候, combineLatest 会把两者最近派发的值 a 和 b 按照 c = f(x,y) 的方法合并处理再输出, 其中 f 是你可以定义的方法. 用图来解释也许更清楚:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

在该例中, f 是一个转换为全大写的函数

我们可以把 combineLatest() 用在 close1ClickStream 和 responseStream 上, 因此一旦 "关闭按钮1" 被点击(导致 close1ClickStream 产生新值), 我们都能得到最新的返回数据, 并在 suggestion1Stream中产生一个新的值. 由于 combineLatest() 的对称性的, 任何时候, 只要 responseStream 派发了一个新的回复, 它也将合并最新的一次 ‘关闭按钮1被点击‘ 事件来产生一个新的推荐关注人. 这个特性非常有趣, 因为它允许我们简化我们之前的 suggestion1Stream , 如下:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

在上述思考中, 还有一点东西被遗漏. combineLatest() 使用了两个数据源中最近的数据, 但是如果这些源中的某个从未派发过任何东西, combineLatest() 就不能产生一个数据事件到输出流. 如果你再细看上面的 ASCII 图, 你会发现当第一个流派发 a 的时候, 不会有任何输出. 只有当第二个流派发 b 的时候才能产生一个输出值.

有几种方式来解决该问题, 我们仍然采取最简单的一种, 就是在页面启动的时候模拟一次对 ‘关闭按钮1‘ 按钮的点击:

var suggestion1Stream = close1ClickStream.startWith(‘startup click‘) // 把对"关闭按钮1"的点击的模拟加在这里
  .combineLatest(responseStream,
    function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

总结整理

现在我们的工作完成了. 完整的代码如下所示:

var refreshButton = document.querySelector(‘.refresh‘);
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, ‘click‘);

var closeButton1 = document.querySelector(‘.close1‘);
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, ‘click‘);
// close2 和 close3 是同样的逻辑

var requestStream = refreshClickStream.startWith(‘startup click‘)
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return ‘https://api.github.com/users?since=‘ + randomOffset;
  });

var responseStream = requestStream
  .flatMap(function (requestUrl) {
    return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
  });

var suggestion1Stream = close1ClickStream.startWith(‘startup click‘)
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
// suggestion2Stream 和 suggestion3Stream 是同样的逻辑

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // 隐藏第 1 个推荐关注者元素
  }
  else {
    // 显示第 1 个推荐关注者元素并渲染数据
  }
});

你可以在这里查看完整的[样例代码]

很惭愧, 这只是一个微小的代码示例, 但它的信息量很大: 它着重表现了, 如何对关注点进行适当的隔离, 从而对不同流进行管理, 甚至充分利用了返回数据的流. 这样的函数式风格使得代码像声明式多于像命令式: 我们并不用给出一个要执行的的结构化序列, 我们只是通过定义流之间的关系来表达系统中每件事物是什么. 举例来说, 通过 Rx, 我们告诉计算机 _suggestion1Stream 就是 点击关闭按钮1 的流, 与最近一个API返回的(用户中随机选择的一个)的用户的流, 刷新时产生 null 的流, 和应用启动时产生 null 的流的合并流_.

回想一下那些你熟稔的流程控制的语句(比如 ifforwhile), 以及 Javascript 应用中随处可见的基于回调的控制流. (只要你愿意, )你甚至可以在上文的 subscribe() 中不写 if 和 else, 而是(在 observable 上)使用 filter()(这一块我就不写实现细节了, 留给你作为练习). 在 Rx 中, 有很多流处理方法, 比如 mapfilterscanmergecombineLateststartWith, 以及非常多用于控制一个事件驱动的程序的流的方法. 这个工具集让你用更少的代码而写出更强大的效果.

接下来还有什么?

如果你愿意用 [Rx*] 来做反应式编程, 请花一些时间来熟悉这个 函数列表, 其中涉及如何变换, 合并和创建 Observables (被观察者). 如果你想以图形的方式理解这些方法, 可以看一下 弹珠图解 RxJava. 一旦你对理解某物有困难的时候, 试着画一画图, 基于图来思考, 看一下函数列表, 再继续思考. 以我的经验, 这样的学习流程非常有用.

一旦你熟悉了如何使用 [Rx] 进行变成, 理解冷热酸甜, 想吃就吃...哦不, 冷热 Observables 就很有必要了. 反正就算你跳过了这一节, 你也会回来重新看的, 勿谓言之不预也. 建议通过学习真正的函数式编程来磨练你的技巧, 并且熟悉影响各种议题, 比如"影响 [Rx] 的副作用"什么的.

不过, 实现了反应式编程的库并非并非只有 [Rx]. [Bacon.js] 的运行机制就很直观, 理解它不像理解 [Rx] 那么难; [Elm Language] 在特定的应用场景有很强的生命里: 它是一种会编译到 Javascript + html + CSS 的反应式编程语言, 它的特色在于 [time travelling debugger]. 这些都很不错.

Rx 在严重依赖事件的前端应用中表现优秀. 但它不只是只为客户端应用服务的, 在接近数据库的后端场景中也大有可为. 实际上, [RxJava 正是激活 Netflex 服务端并发能力的关键]. Rx 不是一个严格限于某种特定类型应用的框架或者是语言. 它其实是一种范式, 你可以在任何事件驱动的软件中实践它.

 


原文链接
本文为云栖社区原创内容,未经允许不得转载。




以上是关于一起来看 rxjs的主要内容,如果未能解决你的问题,请参考以下文章

[RxJS] Implement RxJS `switchMap` by Canceling Inner Subscriptions as Values are Passed Through(代码片段

超级有用的9个PHP代码片段

RXJS 操作员不使用 firebase

使用 Rxjs map 和 filter 一起从 json 中选择国家和州

选择片段 A 时如何在 ViewPager 上不一起启动所有片段

在RxJS中,Observer是否被注入到Observable执行中?