RxJava使用入门

Posted 优才网

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava使用入门相关的知识,希望对你有一定的参考价值。

RxJava在当下android开发者中非常流行的,今天先来简单介绍一下并结合自己的代码给出一些见解。

一、关系对象

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer(观察者)、subscribe(订阅)、事件。ObservableObserver通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知Observer

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted()onError()

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava
    规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

  • onError(): 事件队列异常。在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。

  • 在一个正确运行的事件序列中,onCompleted()onError()
    有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError()
    二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer的抽象类:SubscriberSubscriberObserver接口进行了一些扩展,但他们的基本使用方式是完全一样的。
不仅基本使用方式一样,实质上,在 RxJava 的subscribe过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 ObserverSubscriber是完全一样的。它们的区别对于使用者来说主要有两点:

  • onStart(): 这是 Subscriber增加的方法。它会在subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在  Subscriber 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

  • unsubscribe(): 这是Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed()先判断一下状态。unsubscribe()这个方法很重要,因为在 subscribe()之后, Observable会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如onPause() onStop()等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

二、创建 Observable

Observable即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create()方法来创建一个 Observable ,并为它定义事件触发规则:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

create()方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:

  • just(T...): 将传入的参数依次发送出来。

Observable observable = Observable.just("Hello", "Hi", "Aloha");// 将会依次调用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);// 将会依次调用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();

This page shows methods that create Observables.

just( )— convert an object or several objects into an Observable that emits that object or those objects

from( ) — convert an Iterable, a Future, or an Array into an Observable

create( ) — advanced use only! create an Observable from scratch by means of a function, consider fromEmitter instead
fromEmitter()— create safe, backpressure-enabled, unsubscription-supporting Observable via a function and push events.
defer( )— do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription
range( )— create an Observable that emits a range of sequential integers
interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval
timer( )— create an Observable that emits a single item after a given delay
empty( )— create an Observable that emits nothing and then completes
error( )— create an Observable that emits nothing and then signals an error
never( )— create an Observable that emits nothing at all

三、 Subscribe (订阅)

创建了 ObservableObserver 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

observable.subscribe(observer);// 或者:observable.subscribe(subscriber);

有人可能会注意到, subscribe()这个方法有点怪:它看起来是『observalbe 订阅了 observer/ subscriber』而不是『observer/ subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable),虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

Observable.subscribe(Subscriber)的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);    return subscriber;
}

可以看到,subscriber()做了3件事:

调用Subscriber.onStart()。这个方法在前面已经介绍过,是一个可选的准备方法。
调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。
将传入的 Subscriber 作为Subscription 返回。这是为了方便 unsubscribe()
整个过程中对象间的关系如下图:

除了 subscribe(Observer)subscribe(Subscriber)subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {    // onError()
    @Override
    public void call(Throwable throwable) {        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()observable.subscribe(onNextAction);// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()observable.subscribe(onNextAction, onErrorAction);// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1Action0Action0是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于onCompleted()方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted()方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。Action1也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj)onError(Throwable error)也是单参数无返回值的,因此Action1可以将onNext(obj)onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0Action1在 API 中使用最广泛,但 RxJava 是提供了多个ActionX形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

四、场景示例

a. 打印字符串数组

将字符串数组 names中的所有字符串依次打印出来:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });
  • MyCode:

public class MainActivity extends AppCompatActivity {    private static final String TAG = MainActivity.class.getSimpleName();
    TextView mTextView;

    String[] strArr = new String[]{"a", "b", "c"};    @Override
    protected void onCreate(Bundle savedInstanceState) {        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mTextView = (TextView) findViewById(R.id.show);

    }    public void trigger(View view) {
        Observable.from(strArr).subscribe(new Action1<String>() {            @Override
            public void call(String s) {
                Log.e(TAG, "call:" + s);
                mTextView.setText(s);
            }
        });
    }

}
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }    @Override
    public void onCompleted() {
    }    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

正如上面两个例子这样,创建出 ObservableSubscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

推荐阅读:





以上是关于RxJava使用入门的主要内容,如果未能解决你的问题,请参考以下文章

RxJava的一些入门学习分享

知识整理这可能是最好的RxJava 2.x 入门教程

史上最浅显易懂的RxJava入门教程

这可能是最好的RxJava 2.x 入门教程

知识整理这可能是最好的RxJava 2.x 入门教程

知识整理这可能是最好的RxJava 2.x 入门教程