Rxjava入门:简单介绍与使用

Posted xzj_2013

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava入门:简单介绍与使用相关的知识,希望对你有一定的参考价值。

概述

根据RxJava 在 GitHub 的介绍:

    RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
	// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

RxJava 是一个 基于事件流、实现异步操作的库,是一种基于观察者模式的响应式编程框架
RxJava的特点就是可以非常简便的实现异步调用,类似于android中的 AsyncTask 、Handler作用;
同时由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:

  • 逻辑简洁
  • 实现优雅
  • 使用简单
    更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅

Rxjava原理介绍

Rxjava 基于 一种扩展的观察者模式
在RxJava中主要有4个角色:

  • 被观察者(Observable)------产生事件
  • 订阅(Subscribe)----连接 被观察者 & 观察者
  • 观察者(Observer) -----接收事件,并给出响应动作
  • 事件(Event) ------被观察者 & 观察者 沟通的载体

RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作

原理分析:
一般我们使用RxJava都是编写类似如下的代码

Observable.create(new ObservableOnSubscribe()) //创建一个事件流,参数是我们创建的一个事件源
                .map(...)//有时我们会需要使用操作符进行变换
                .subscribeOn(Schedulers.io())//指定事件源代码执行的线程
                .observeOn(AndroidSchedulers.mainThread())//指定订阅者代码执行的线程
                .subscribe(new Observer())//参数是我们创建的一个订阅者,在这里与事件流建立订阅关系

Observable 是RxJava描述的事件流,在链式调用中非常清晰,事件从创建到加工处理再到被订阅者接收到,就是一个接一个的Observable形成的一个事件流。在上面代码中,每一步方法的调用,都会返回一个新的Observable给下一步,这个是RxJava源码的基础。同样是链式调用,但它与常见的Builder模式不太一样,每个操作符,每次线程切换,每步都会新建一个Observable而非直接加工上一步的Observable返回给下一步。(在源码中不同的加工会创建不同的Observable,比如map()会创建一个ObservableMap,subscribeOn()会创建一个ObservableSubscribeOn,但它们实际上都是Observable的子类)。

ObservableOnSubscribe 是这个事件流的源头,称之为事件源,一般由自己创建并传入。创建时,需要重写其subscribe()方法,为了和Observable中的subscribe()方法区别,在下面贴出的代码中将其改名为call()。在调用链中有时会用到各种操作符进行一些变换,事实上每个操作符都会重写这么一个call()方法,相对于创建事件源时在这里写入的源业务代码,这些操作符在这里要做的事是由RxJava钦定的,一般是连接事件流的上下游。在这里将准备好被订阅的数据,并调用subscribe()参数中ObservableEmitter的onNext(),onCompleted()或onError()通知订阅者数据准备情况。

Observer 是整个事件流的订阅者,也就是说,它将会订阅前面事件创建,加工以后的最终结果。它也是由我们创建的,并且要重写它的onNext(),onCompleted(),onError()和onSubscribe(),在接下来的分析中简化一些,只关注onNext()。创建出了Observer以后,将会使用经过上面所有步骤的最后一步生成的Observable,调用它的subscribe(),与事件源产生联系。

使用步骤

  1. 创建被观察者 (Observable )

  2. 创建观察者 (Observer )并 定义响应事件的行为
    Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:

    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
    • onError(Exception ex)
      当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
    • onComplete
      正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

    根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射,onCompleted和onError被称作通知。

  3. 订阅(Subscribe)连接观察者和被观察者

Rxjava的基本实现

  • 使用前先在Android Studio中配置gradle:

dependencies 
    ......
    implementation "io.reactivex.rxjava2:rxjava:2.2.8"
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

其中Rxandroid是Rxjava在Android平台的扩展,它包括了一些能够简化Android开发的工具;

  • 创建一个被观察者Observable:
        //1. 创建一个Observable  可被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() 

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception 
                //在订阅触发时发送了发送了两条数据
                if(!emitter.isDisposed())
                    emitter.onNext("hello rxjava2");
                    emitter.onNext("小伙伴们,你们好");
                
                emitter.onComplete();
            
        );
  • 创建一个观察者Observer:
        //2.创建一个观察者
        Observer<String> Observer = new Observer<String>() 
            @Override
            public void onSubscribe(Disposable d) 
                Log.i("Eric","onSubscribe: " + d);
            

            @Override
            public void onNext(String d) 
                Log.i("Eric","onNext: " + d);
            

            @Override
            public void onError(Throwable e) 
                Log.i("Eric","onError: " + e.getMessage() );
            

            @Override
            public void onComplete() 
                Log.i("Eric","onComplete: ");
            
        ;
  • observer(观察者) 订阅 observable(被观察者)
        //observer(观察者) 订阅 observable(被观察者)
        observable.subscribe(Observer);
  • 查看输出
com.eric.rxjavademo I/Eric: onSubscribe: CreateEmitternull
com.eric.rxjavademo I/Eric: onNext: 发射一条消息:
com.eric.rxjavademo I/Eric: onNext: 你好!Rxjava
com.eric.rxjavademo I/Eric: onComplete:

以上是关于Rxjava入门:简单介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章

RxJava使用入门

精通高级RxJava 2响应式编程思想

这可能是最好的RxJava 2.x 入门教程

Carson带你学Android:手把手带你入门神秘的Rxjava

知识整理这可能是最好的RxJava 2.x 入门教程

这是一篇清晰 & 易懂的 Rxjava 入门教程