RxAndroid 教程

Posted 颐和园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxAndroid 教程相关的知识,希望对你有一定的参考价值。

原文:RxAndroid Tutorial
作者: Artem Kholodnyi
译者:kmyhy

有人说你应该以积极的心态面对生活,而不是消极应对。但是,在 android 开发中恰恰相反。

响应式编程不仅仅是一个 API。它是一种全新的设计模式,非常有用。RxJava 是一个 Android 中的响应式实现。Android 是一个让你开始响应式编程的好地方。RxAndroid 使这一切更加简单,它将异步 UI 事件封装得更像 RxJava。

别担心——我打赌你一定会知道这些基本的响应式编程概念,哪怕你从前根本没听说过它们。

注:本教程需要扎实的 Android 和 Java 基础。要快速上手,请先看我们的 Android 开发教程,然后再来看本教程,就不会有任何问题了。

在本教程中,你将学习下列内容:

  • 什么是响应式编程
  • 什么是被观察者 observable
  • 将异步事件比如按钮的点击和文本框内容改变转换成被观察者
  • 传递被观察者
  • 过滤被观察者
  • 指定代码将被执行的线程
  • 将多个被观察者组合成一个

但愿你喜欢奶酪——因为我们将通过制作一个“查找奶酪”的app 来学习这些内容!

开始

此处下载开始项目,并用 Android Studio 打开它。

你的大部分工作只会在 CheeseAcdtivity.java 中进行。CheeseActivity 类继承了 BaseSearchActivity 类。花点时间看看 BaseSearchActivity,你会用到这些方法和属性:

  • showProgressBar(): 显示一个进度条。
  • hideProgressBar(): 隐藏进度条。
  • showResult(List result): 显示奶酪列表。
  • mCheeseSearchEngine: 一个 CheeseSearchEngine 对象。它有一个 search 方法用于搜索奶酪。它接收一个搜索关键字,返回一个找到的奶酪列表。

在设备或模拟器中运行 app。你应该看到一个空白的搜索界面:

什么是响应式编程

创建第一个被观察者之前,先来学点理论。

在命令式编程中,表达式被计算一次并对变量进行赋值:

int a = 2;
int b = 3;
int c = a * b; // c is 6

a = 10;
// c is still 6

而在响应式编程中,值被改变后会对所有东西产生影响。
你可能也做过一些响应式编程的事情——虽然不是有意识的。

在电子表格中为单元格赋值就好比在命令式编程中定义变量。
在电子表格为单元格定义表达式就好比在响应式编程中定义观察者。

我们可以用电子表格来举例:

b1 被赋值为 2,b2 被赋值为 3,b3 用一个表达式:b2xb3。当表达式中所引用的单元格中的值发生改变时,这个改变被观察到,b3 中的表达式会重新计算:

RxJava 的被观察者协议

RxJava 使用了观察者模式。

注:关于观察者模式,你可以翻阅Android 中的常见设计模式

在观察者模式中,必须实现两个关键的 RxJava 接口:观察者 Observer 和被观察者 Observable。当被观察者的状态改变,所有订阅了的观察者对象都会被通知。

Observable 接口有一个 subscribe() 方法,每个观察者调用这个方法来订阅通知。
在 Observer 接口中有 3 个方法会被 Observable 调用:

  • onNext(T value) 方法:提供一个类型为 T 的新对象给观察者
  • onComplete() 方法:当被观察者发送完这个对象之后通知给观察者。
  • onError(Throwable e) 方法:通知观察者被观察者发生了一个错误。

原则上,一种好的做法是当被观察者提交了 0 个或多个对象只有应当紧随着一个完成或错误事件。

听起来挺复杂,用例子来说明会更简单些。

一个网络请求的被观察者通常会传播单个对象并立即完成:

圆圈表示被观察者传播的对象,黑色方块表示完成或错误。

一个鼠标移动的被观察者会广播鼠标坐标,但永远不会有完成事件。

这里,你会看到多个对象被传播但不会有方块表示鼠标的完成事件或错误事件。

一旦被观察者发出完成事件之后,就停止传播任何对象。这是一个错误的被观察者范例,它违反了被观察者协议:

这是错误的,因为它在发出完成事件之后还发射了一个对象,违反了被观察者协议。

如何创建观察者

有许多库能够帮助你从几乎全部类型的事件中创建出观察者。但是,有时候你必须手动创建自己的观察者。而且,这也是我们进行学习的一个良好途径。

你可以用 Obervable.create() 方法创建一个被观察者。方法签名如下:

Observable<T> create(ObservableOnSubscribe<T> source)

简单漂亮,但什么意思?source 是什么东东?要理解这个签名,你需要知道什么是 ObservableOnSubscribe。它是一个接口,定义了一个协议:

public interface ObservableOnSubscribe<T> {
  void subscribe(ObservableEmitter<T> e) throws Exception;
}

就像 J.J. Abrams 主演的 “Lost” 或 “Westworld” 的一个剧集,在回答一个问题的同时不可避免地提出另外一个问题。创建 Observable 时要用到 “source”,而 source 需要暴露一个 subscribe() 方法,而 subsribe() 方法需要用一个 emitter 作为参数。但是,emitter 又是什么?

RxJava 的 Emitter 接口和 Observer 接口很像:

public interface Emitter<T> {
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();
}

一个 ObserverableEmitter,还会提供一个取消订阅的方法。

要形象地描述整个过程,想象一只能够调整水流的水龙头。水管就是一个被观察者,在你想用水的时候会淌出水流。你用水龙头来开关水流,那么它就是一个 ObservableEmitter,要将它连接到水管上,你必须调用 Observable.create()。这就是一只漂亮的水龙头了。

下面的例子会更加具体清晰。
让我们来创建第一个被观察者!

观察按钮点击

在 CheeseActivity 类中添加代码:

// 1
private Observable<String> createButtonClickObservable() {

  // 2
  return Observable.create(new ObservableOnSubscribe<String>() {

    // 3
    @Override
    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
      // 4
      mSearchButton.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View view) {
          // 5
          emitter.onNext(mQueryEditText.getText().toString());
        }
      });

      // 6
      emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
          // 7
          mSearchButton.setOnClickListener(null);
        }
      });
    }
  });
}

代码解释如下:

  1. 定义一个返回被观察者的方法,它会发送字符串。
  2. 用 Observable.create() 方法创建一个被观察者,传入一个新的 ObservableOnSubscribe 实例做参数。
  3. 重写 ObservableOnSubsribe 的 subscribe() 方法。
  4. 为 mSearchButton 指定一个 OnClickListener。
  5. 当点击事件发生,调用emitter 的 onNext 方法,传入 mQueryEditText 的字符串值。
  6. 在 Java 中保持这些引用会导致内存泄漏。因此一旦监听器不再使用时就移除监听器是一种好习惯。但我们在创建自己的被观察者时怎么移除它?为此,ObservableEmitter 提供了一个 setCancellabe() 方法。通过覆盖其中的 cancel() 方法,你可以在被观察者被销毁时执行你指定的动作,比如当被观察者发生完成事件或者所有观察者都被取消时。

对于 OnClickListener 来说,移除监听器的方法是 setOnClickListener(null)。

我们已经定义好自己的被观察者,接下来需要订阅它。在此之前,我们必须用到另一个接口 Consumer。它是一种用于接收 emitter 发送的值的简单方式。

public interface Consumer<T> {
  void accept(T t) throws Exception;
}

当你想简单订阅一个被观察者时,这个接口很好用。
Observable 接口有几个不同的 subscribe() 版本,参数各有不同。例如,你可以传入一个完整的观察者,但需要实现所有 required 方法。

如果要订阅的观察者只想接收 onNext() 方法广播的值,我们可以用另外一个 subscribe() 方法版本,它只需要一个 Consumer 参数(这个参数甚至被简单地命名为 onNext,更加凸显了这种关系)。

我们只需要在 activity 的 onStart() 方法中进行订阅。在 CheeseActivity.java 中添加代码:

@Override
protected void onStart() {
  super.onStart();
  // 1
  Observable<String> searchTextObservable = createButtonClickObservable();

  searchTextObservable
      // 2
      .subscribe(new Consumer<String>() {
        //3
        @Override
        public void accept(String query) throws Exception {
          // 4
          showResult(mCheeseSearchEngine.search(query));
        }
      });
}

导入 Consumer 时会出现歧义,请根据提示选择 import io.reactivex.functions.Consumer;

代码解释如下:

  1. 首先,调用刚刚编写的方法创建一个被观察者对象。
  2. 用 subscribe() 订阅这个对象,使用一个简单的 Consumer 参数。
  3. 重写 accept(),这个方法在被观察者对象发送一个对象时被调用。
  4. 进行搜索并显示结果。

运行 App。输入几个字符,按 Search 按钮。你会看到一个匹配的奶酪列表:

看得流口水了 ?:]

RxJava 线程模型

我们已经体验过响应式编程的为例了。现在有一个问题:当搜索按钮被按下后,UI 会有几秒钟的“冻结”。

你可以在 Android 监视器中看到:

08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread.

之所以这样,是因为我们在主线程中执行了搜索。如果在搜索是一个网络请求,Android 会抛出一个 NetworkOnMainThreadException 异常。我们需要解决这个问题。

RxJava 的一个神奇的地方是,它默认支持多线程,就像 AsyncTask。但是,如果我们不特别加以指定,RxJava 在它被调用的同一线程中进行所有工作。

你可以用 subScribeOn 和 observeOn 运算来改变这种行为。
subscribeOn 只应当在计算链上调用一次。否则,只有第一次有效。subscribeOn 用于指定被观察者应当在哪个线程中被订阅(即被创建)。如果被观察者被用于发送 Android View 中的事件,你需要确保订阅是在 Android UI 线程中进行的。

另外,你可以在运算链中多次调用 observeOn。observerOn 用于指定计算链中下一个操作应当在哪个线程中执行。例如:

myObservable // observable will be subscribed on i/o thread
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .map(/* this will be called on main thread... */)
  .doOnNext(/* ...and everything below until next observeOn */)
  .observeOn(Schedulers.io())
  .subscribe(/* this will be called on i/o thread */);

最常见的 Schedulers 包括:

  1. Schedulers.io(): 适合于 I/O 任务比如网络请求或磁盘读写。
  2. Schedulers.computation(): 适用于计算任务,比如时间循环和执行回调。
  3. AndroidSchedulers.mainThread() 在 UI 线程中执行下一个操作。

Map 操作

map 操作会对被观察者发送的每个对象应用一个函数,并返回另外一个发送函数返回结果的被观察者。我们可以用这个方法解决线程问题。

如果有一个被观察者叫做 numbers,用于发送一系列数字:

同时我们对它使用 Map 操作:

numbers.map(new Function<Integer, Integer>() {
  @Override
  public Integer apply(Integer number) throws Exception {
    return number * number;
  }
}

结果将变成:

Map操作只需要少量代码就能遍历多个对象。让我们来使用它吧!

在 CheeseActivity 类修改 onStart() 方法为:

@Override
protected void onStart() {
  super.onStart();
  Observable<String> searchTextObservable = createButtonClickObservable();

  searchTextObservable
      // 1
      .observeOn(Schedulers.io())
      // 2
      .map(new Function<String, List<String>>() {
        @Override
        public List<String> apply(String query) {
          return mCheeseSearchEngine.search(query);
        }
      })
      // 3
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<List<String>>() {
        @Override
        public void accept(List<String> result) {
          showResult(result);
        }
      });
}

出现导入冲突时,导入这个 Function:

import io.reactivex.functions.Function;

代码解释如下:

  1. 首先,指明下一个操作将在 I/O 线程中执行。
  2. 每一个搜索查询,返回一个结果列表。
  3. 最后指定操作链中的下一个操作在主线程中执行,而不是 I/O 线程。在 Android 中,所有和视图有关的代码都应当在主线程中执行

运行 app。哪怕在搜索过程中,UI 仍然可以进行响应。

在 doOnNext 中显示进度条

让我们来显示进度条。
这需要用到 doOnNext 操作。doOnNext 需要一个消费者参数,允许你每当被观察者发出一个对象时执行一次动作。
同样,在 CheeseActivity 中修改 onStart() 方法:

@Override
protected void onStart() {
  super.onStart();
  Observable<String> searchTextObservable = createButtonClickObservable();

  searchTextObservable
      // 1
      .observeOn(AndroidSchedulers.mainThread())
      // 2
      .doOnNext(new Consumer<String>() {
        @Override
        public void accept(String s) {
          showProgressBar();
        }
      })
      .observeOn(Schedulers.io())
      .map(new Function<String, List<String>>() {
        @Override
        public List<String> apply(String query) {
          return mCheeseSearchEngine.search(query);
        }
      })
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<List<String>>() {
        @Override
        public void accept(List<String> result) {
          // 3
          hideProgressBar();
          showResult(result);
        }
      });
}

代码解释如下:

  1. 确保下一个操作在主线程中进行。
  2. 添加一个 doOnNext 操作,这样 showProgressBar() 会每发送射一个新对象就被调用一次。
  3. 别忘了在显示结果列表之前调用 hideProgressBar()。

运行 app。当搜索开始后,你会看到进度条显示。

观察文本改变

如果我们想在用户输入字符的同时进行搜索,就像 Google 一样,怎样?
首先,需要订阅 TextView 的文本改变事件。在 CheeseActivity 添加方法:

//1
private Observable<String> createTextChangeObservable() {
  //2
  Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
      //3
      final TextWatcher watcher = new TextWatcher() {
        @Override
        public void beforeTextChanged(CharSequence s, int start, int count, int after) {}

        @Override
        public void afterTextChanged(Editable s) {}

        //4
        @Override
        public void onTextChanged(CharSequence s, int start, int before, int count) {
          emitter.onNext(s.toString());
        }
      };

      //5
      mQueryEditText.addTextChangedListener(watcher);

      //6
      emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
          mQueryEditText.removeTextChangedListener(watcher);
        }
      });
    }
  });

  // 7
  return textChangeObservable;
}

代码解释如下:

  1. 定义一个方法,用于返回一个文本改变事件的被观察者。
  2. 用 create() 方法创建一个 textChangeObservable,它需要一个 ObservableOnSubscribe 参数。
  3. 当某个观察者开始订阅时,第一件事情是创建一个 TextWatcher。
  4. 我们不关心 beforeTextChanged() 和 afterTextChanged() 方法。当用户进行输入时,onTextChanged() 方法触发,我们将新文本传递给观察者。
  5. 通过调用 addTextChangedListener(),将 watcher 添加到 TextView。
  6. 调用 emitter.setCancellable() 方法并在覆盖的 cancel() 方法中调用 removeTextChangedListener(),这将移除 watcher。
  7. 返回我们创建的被观察者。

要应用这个被观察者,在 CheeseActivity 的 onStart() 方法中将searchTextObservable 的定义修改为:

Observable<String> searchTextObservable = createTextChangeObservable();

运行app,当你在 textView 中输入的同时,搜索开始了:

限制查询长度

如果关键字太短,比如只有一个字符,搜索是完全没有意义的。要解决这个问题,我们需要用到 filter 操作。

filter 会在满足指定条件的情况下才传递对象。filter 使用一个谓词(Predicate)参数,谓词定义了一个对指定数据类型进行的测试,并返回一个布尔值。这里,我们将谓词类型指定为 String,并在字符长度大于 2 的时候才返回 true。

在 createTextChangeObservable() 中将 return textChangeObservable 一句修改为:

return textChangeObservable
    .filter(new Predicate<String>() {
      @Override
      public boolean test(String query) throws Exception {
        return query.length() >= 2;
      }
    });

出现导入冲突时,将 import 语句重定义为:

import io.reactivex.functions.Predicate;

代码和之前一样,但只有当输入超过 2 个字符才会进行查询。

运行 app,当你输入第二个字符以后,app 才会进行查询。

去抖动操作

我们也不想在关键字每发生一个字符的变化就查询服务器。

debounce (去抖动)是一种操作,它真正体现了响应式设计的优点。它和 filter 操作类似,他们都能够过滤由被观察者发出的对象。但对象是否应当被过滤并不取决于对象的类型,而是取决于对象是什么时候被发送的。
debounce 会在每个对象发送后等待一段时间,看是否有下一个对象发出。如果在等待时间内没有下一个对象发出,它会发出最后的一个对象:

在 createTextChangeObservable(),在 filter 操作后面添加一个 debounce 操作:

return textChangeObservable
    .filter(new Predicate<String>() {
      @Override
      public boolean test(String query) throws Exception {
        return query.length() >= 2;
      }
    }).debounce(1000, TimeUnit.MILLISECONDS);  // add this line

运行 app。你会看到只有输入后稍停一段时间,查询才会开始:

debounce 操作在最后一个查询关键字发出之前等待 1000 毫秒。

融合操作

我们创建了一个和按钮点击相关的被观察者,接着又创建了一个和文本框文字变化有关的观察者。我们为什么不让二者兼顾呢?

有多个操作可以进行被观察者的合并。最简单的一个是 merge。

merge 从多个观察者身上读取对象放到一个单独的观察者身上:

修改 onStart() 的开头部分:

Observable<String> buttonClickStream = createButtonClickObservable();
Observable<String> textChangeStream = createTextChangeObservable();

Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream);

运行 app。试一下文本框和查找按钮。当你输入 2个以上字符或者点击 Search 按钮时,都能触发搜索动作。

RxJava 和 Activity/Fragment 生命周期

还记得我们创建的 setCancellable 吗?它们只会在被观察者取消注册时触发。

Observable.subscribe() 会返回一个 Disposable。 Disposable接口有两个方法:

public interface Disposable {
  void dispose();  // 停止订阅
  boolean isDisposed(); // 如果资源被销毁(取消订阅)返回 true
}

在 CheeseActivity 添加变量:

private Disposable mDisposable;

在 onStart() 中,将 subscribe() 的返回值设置为 mDisposable(只改了第一行代码):

mDisposable = searchTextObservable // 修改这行
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer() {
@Override
public void accept(String s) {
showProgressBar();
}
})
.observeOn(Schedulers.io())
.map(new Function


因为我们在 onStart() 中订阅了这个被观察者,所以应该在 onStop() 中取消订阅。
在 CheeseActivity.java 添加方法:

```java
@Override
protected void onStop() {
  super.onStop();
  if (!mDisposable.isDisposed()) {
    mDisposable.dispose();
  }
}

就这么简单。

结束

请在这里下载结束项目。
在本教程中,我们学到了许多。但仍然只是 RxJava 的皮毛。例如,还有 RxBinding 库,包含了大量 Android View API,你只需要调用 RxView.clicks(viewVariable) 即可创建点击事件的被观察者。

更多 RxJava 请参考 ReactiveX 文档

有任何问题和建议,请在下面留言。

以上是关于RxAndroid 教程的主要内容,如果未能解决你的问题,请参考以下文章

RxJava 和 RxAndroid 四(RxBinding的使用)

如何在 android 中将 asynctask 代码更改为 rxandroid 代码?

谁来讲讲Rxjava,rxandroid中的操作符的作用

谁来讲讲Rxjava,rxandroid中的操作符的作用

谁来讲讲Rxjava,rxandroid中的操作符的作用

RxAndroid/java小记