Rxjava入门:简单介绍与使用
Posted xzj_2013
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava入门:简单介绍与使用相关的知识,希望对你有一定的参考价值。
概述
根据RxJava 在 GitHub 的介绍:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
RxJava 是一个 基于事件流、实现异步操作的库,是一种基于观察者模式的响应式编程框架
RxJava的特点就是可以非常简便的实现异步调用,类似于android中的 AsyncTask 、Handler作用;
同时由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:
- 逻辑简洁
- 实现优雅
- 使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
Rxjava原理介绍
Rxjava 基于 一种扩展的观察者模式
在RxJava中主要有4个角色:
- 被观察者(Observable)------产生事件
- 订阅(Subscribe)----连接 被观察者 & 观察者
- 观察者(Observer) -----接收事件,并给出响应动作
- 事件(Event) ------被观察者 & 观察者 沟通的载体
RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作
原理分析:
一般我们使用RxJava都是编写类似如下的代码
Observable.create(new ObservableOnSubscribe()) //创建一个事件流,参数是我们创建的一个事件源
.map(...)//有时我们会需要使用操作符进行变换
.subscribeOn(Schedulers.io())//指定事件源代码执行的线程
.observeOn(AndroidSchedulers.mainThread())//指定订阅者代码执行的线程
.subscribe(new Observer())//参数是我们创建的一个订阅者,在这里与事件流建立订阅关系
Observable 是RxJava描述的事件流,在链式调用中非常清晰,事件从创建到加工处理再到被订阅者接收到,就是一个接一个的Observable形成的一个事件流。在上面代码中,每一步方法的调用,都会返回一个新的Observable给下一步,这个是RxJava源码的基础。同样是链式调用,但它与常见的Builder模式不太一样,每个操作符,每次线程切换,每步都会新建一个Observable而非直接加工上一步的Observable返回给下一步。(在源码中不同的加工会创建不同的Observable,比如map()会创建一个ObservableMap,subscribeOn()会创建一个ObservableSubscribeOn,但它们实际上都是Observable的子类)。
ObservableOnSubscribe 是这个事件流的源头,称之为事件源,一般由自己创建并传入。创建时,需要重写其subscribe()方法,为了和Observable中的subscribe()方法区别,在下面贴出的代码中将其改名为call()。在调用链中有时会用到各种操作符进行一些变换,事实上每个操作符都会重写这么一个call()方法,相对于创建事件源时在这里写入的源业务代码,这些操作符在这里要做的事是由RxJava钦定的,一般是连接事件流的上下游。在这里将准备好被订阅的数据,并调用subscribe()参数中ObservableEmitter的onNext(),onCompleted()或onError()通知订阅者数据准备情况。
Observer 是整个事件流的订阅者,也就是说,它将会订阅前面事件创建,加工以后的最终结果。它也是由我们创建的,并且要重写它的onNext(),onCompleted(),onError()和onSubscribe(),在接下来的分析中简化一些,只关注onNext()。创建出了Observer以后,将会使用经过上面所有步骤的最后一步生成的Observable,调用它的subscribe(),与事件源产生联系。
使用步骤
-
创建被观察者 (Observable )
-
创建观察者 (Observer )并 定义响应事件的行为
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:- onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。 - onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。 - onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射,onCompleted和onError被称作通知。
- onNext(T item)
-
订阅(Subscribe)连接观察者和被观察者
Rxjava的基本实现
- 使用前先在Android Studio中配置gradle:
dependencies
......
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
其中Rxandroid是Rxjava在Android平台的扩展,它包括了一些能够简化Android开发的工具;
- 创建一个被观察者Observable:
//1. 创建一个Observable 可被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception
//在订阅触发时发送了发送了两条数据
if(!emitter.isDisposed())
emitter.onNext("hello rxjava2");
emitter.onNext("小伙伴们,你们好");
emitter.onComplete();
);
- 创建一个观察者Observer:
//2.创建一个观察者
Observer<String> Observer = new Observer<String>()
@Override
public void onSubscribe(Disposable d)
Log.i("Eric","onSubscribe: " + d);
@Override
public void onNext(String d)
Log.i("Eric","onNext: " + d);
@Override
public void onError(Throwable e)
Log.i("Eric","onError: " + e.getMessage() );
@Override
public void onComplete()
Log.i("Eric","onComplete: ");
;
- observer(观察者) 订阅 observable(被观察者)
//observer(观察者) 订阅 observable(被观察者)
observable.subscribe(Observer);
- 查看输出
com.eric.rxjavademo I/Eric: onSubscribe: CreateEmitternull
com.eric.rxjavademo I/Eric: onNext: 发射一条消息:
com.eric.rxjavademo I/Eric: onNext: 你好!Rxjava
com.eric.rxjavademo I/Eric: onComplete:
以上是关于Rxjava入门:简单介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章