RxJava学习笔记---简单使用

Posted 逆水当行舟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava学习笔记---简单使用相关的知识,希望对你有一定的参考价值。

如果觉得一篇文章写得好,不要放到收藏夹里面,马上把它看完,如果两天内还没开始看,那就可以删掉了
如果觉得一样技术很好,那就马上去学,不要拖延,不要找借口。如果你一周内还没开始行动,还不如坦荡点放弃
恰如克林克兹所说:
与其感叹路难行,不如马上出发
去年就在看RxJava,每次看一点点,遇到障碍,感叹要理解的东西太麻烦就放弃了。今年决心要搞定它,学习的过程注定孤独单调,恰好现在开始写Blog,就顺便记录下学习笔记,以此自勉。在后面的2-4周计划:

  1. 明白RxJava的基本用法,API中常用类、函数、变换
  2. 学习使用RxJava写的开源App,提升熟练度
  3. 研究RxJava内部实现,研究他为什么要如此设计API,如此好用
  4. 用RxJava写一个App,开源

RxJava是什么

Rx是什么

Rx全称Reactive Extensions,译为响应式拓展。

微软最先提出这个概念,借用MSDN上的定义:Reactive Extensions(Rx)是一个类库,它集成了异步、基于可观察(observable)序列的事件驱动编程和LINQ-style的查询操作,使用Rx,开发人员

  • 可以用observable对象描述异步数据流
  • 使用LINQ操作符异步查询数据
  • 使用Schedulers控制异步过程中的并发
    简而言之
Rx = Observables + LINQ + Schedulers

Rx已经渗透到各个语言中,于是有了:
RxJava、RxJS、Rx.NET、UniRx、RxScala、RxCpp、 RxSwift、Rxphp、Ruby: Rx.rb、Python: RxPY等等
基于frameworks上还有
RxNetty、Rxandroid、RxCocoa

RxJava的定义

RxJava是Rx在Java语言上的扩展。
RxJava
– Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

译为:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

RxJava怎么用

build.gradle—Module文件中导入

    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'io.reactivex:rxjava:1.1.6'

RxJava观察者模式

RxJava观察者有4个基本概念组成:

  1. Observer 观察者
  2. Observable 被观察者
  3. subscribe() 订阅
  4. 事件

创建Observer

Observer译为观察者,直接创建实现

        Observer<String> observer = new Observer<String>() 
            @Override
            public void onCompleted() 
                Log.d(TAG, "onCompleted: ");
            

            @Override
            public void onError(Throwable e) 
                Log.d(TAG, "onError: " + e.toString());
            

            @Override
            public void onNext(String s) 
                Log.d(TAG, "onNext: " + s);
            
        ;

对比常用的给控件注册一个监听事件

        class MyListener extends View.OnClickListener 
            @Override
            public void onClick(View v) 
                Log.d(TAG, "onClick: ");
            
        

这里的RxJava的onNext()方法就对应MyListener的onClick()方法,不过比起后者Observer还多了onCompleted()onError()

  1. onNext()事件响应
  2. onCompleted()事件队列结束。不再有新的onNext()发出时,需要触发其作为结束。
  3. onError() 事件队列异常。事件队列处理中若出现异常,onError触发,队列终止。
    一个事件队列中,ErrorCompleted有且只有一个,并且是事件队列的最后一个。

创建Subscriber

 Subscriber<String> subscriber = new Subscriber<String>() 
            @Override
            public void onCompleted() 
                Log.d(TAG, "onCompleted: ");
            

            @Override
            public void onError(Throwable e) 
                Log.d(TAG, "onError: " + e.toString());
            

            @Override
            public void onNext(String s) 
                Log.d(TAG, "onNext: " + s);
            
        ;

基本使用的话,SubscriberObserver使用完全一致,实际上在RxJava的subscribe过程中,Observer也常常转换为Subscriber再使用。但是SubscriberOberver的功能升级
查看其实现代码

public abstract class Subscriber<T> implements Observer<T>, Subscription 
    ....
  1. 我们知道,Subscriber是一个抽象类,所以我们刚才实现的时候需要实现onCompletedonErroronNext三个方法
  2. 实现Observer<T>接口
  3. 实现Subscription接口
    Observer<T>接口的实现表明Observer能实现的东西他都可以实现
public interface Observer<T> 
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);

那么Subscription接口实现呢

public interface Subscription 
    void unsubscribe();
    boolean isUnsubscribed();

顾名思义,unsubscribe()表示反注册;isUnsubscribed()表示是否已经注册,反注册和是否已经注册出现了,哪注册呢,在这里

    private final SubscriptionList subscriptions;

    public final void add(Subscription s) 
        subscriptions.add(s);
    

    @Override
    public final void unsubscribe() 
        subscriptions.unsubscribe();
    

        @Override
    public final boolean isUnsubscribed() 
        return subscriptions.isUnsubscribed();
    

add()就是注册。通过SubscriptionList subscriptions方法转发实现,我们看看SubscriptionList是什么

public final class SubscriptionList implements Subscription 

    private LinkedList<Subscription> subscriptions;
    private volatile boolean unsubscribed;

    public SubscriptionList(Subscription s) 
        this.subscriptions = new LinkedList<Subscription>();
        this.subscriptions.add(s);
    

    @Override
    public boolean isUnsubscribed() 
        return unsubscribed;
    

    public void add(final Subscription s) 
        if (s.isUnsubscribed()) 
            return;
        
        if (!unsubscribed) 
            synchronized (this) 
                if (!unsubscribed) 
                    LinkedList<Subscription> subs = subscriptions;
                    if (subs == null) 
                        subs = new LinkedList<Subscription>();
                        subscriptions = subs;
                    
                    subs.add(s);
                    return;
                
            
        
        s.unsubscribe();
    


    @Override
    public void unsubscribe() 
        if (!unsubscribed) 
            List<Subscription> list;
            synchronized (this) 
                if (unsubscribed) 
                    return;
                
                unsubscribed = true;
                list = subscriptions;
                subscriptions = null;
            
            unsubscribeFromAll(list);
        
    

    private static void unsubscribeFromAll(Collection<Subscription> subscriptions) 
        if (subscriptions == null) 
            return;
        
        List<Throwable> es = null;
        for (Subscription s : subscriptions) 
            try 
                s.unsubscribe();
             catch (Throwable e) 
                if (es == null) 
                    es = new ArrayList<Throwable>();
                
                es.add(e);
            
        
        Exceptions.throwIfAny(es);
    

看到开始定义的

    private LinkedList<Subscription> subscriptions;
    private volatile boolean unsubscribed;

大概就能猜出来,就是通过链表来操作的,具体不再深究。回到主题比起Observer来说Subsriber大概有以下几个优点

  1. 可以使用unsubscribe()取消订阅。有什么好处呢?其实和我们使用广播的时候,注册和反注册是一样的道理,防止内存泄漏。
  2. 可以通过isUnsubscribed()获取当前事件是否注册,封装好的方法可以更容易去判断当前的事件注册与否的状态。
  3. onStart()可以在Subscriber刚开始的时候做一些准备工作。我们看到他是一个空实现
    public void onStart() 
        // do nothing by default
    

就如同在使用Volley或者okhttp的时候也有这种同名方法,做一些初始化操作在请求网络最开始的部分。

创建Observable

Observable.create

  Observable observable = Observable.create(new Observable.OnSubscribe() 
        @Override
        public void call(Object o) 
            subscriber.onNext("1");
            subscriber.onNext("2");
            subscriber.onNext("3");
            subscriber.onCompleted();
        
    );

储存一个OnSubscribe对象作为计划表,一旦Observable被订阅,call()方法就会被调用,然后按照对onNext()onCompleted()设定的顺序依次执行。

Observable.just

上面的例子可以等价于

    Observable observableJust = Observable.just("1", "2", "3");

just—将参数依次发出去

Observable.from

也等价于这种写法

    String[] tests = "1", "2", "3";
    Observable observableFrom = Observable.from(tests);

from—顾名思义从什么地方来

运行

使用

        observable.subscribe(subscriber);
        observable.subscribe(observer);

事件链就可以跑起来了。
这里subscribe()方法通过下面的方法转发

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) 
        //...省略部分代码
        //执行onStart初始方法
        subscriber.onStart();

        //执行Subscriber预设的call()方法
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

        //返回传入的subscriber
        return hook.onSubscribeReturn(subscriber);
    

打印数组

把数组内的值依次打印出来

    private static final Integer[] INT_ARRAYS = 1, 2, 3, 4, 5;

    Observable.from(INT_ARRAYS).subscribe(new Action1<Integer>() 
            @Override
            public void call(Integer integer) 
                Log.d(TAG, "call: " + integer);
            
        );

显示图片

把mipmap文件里的图片显示到界面

  private ImageView ivShow;

    @Override
    protected void onCreate(Bundle savedInstanceState) 
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        ivShow = (ImageView) findViewById(R.id.ivShow);
        Observable.create(new Observable.OnSubscribe<Drawable>() 
            @Override
            public void call(Subscriber<? super Drawable> subscriber) 
                Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            
        ).subscribe(new Observer<Drawable>() 
            @Override
            public void onCompleted() 

            

            @Override
            public void onError(Throwable e) 
                Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
            

            @Override
            public void onNext(Drawable drawable) 
                ivShow.setImageDrawable(drawable);
            
        );
    

但是上面的实现没有任何意义,因为在同一个线程里面使用还不如直接调用。RxJava最大的特性是前台回调,后台处理,是伴随着多线程应运而生的。

Scheduler

        //直接运行在当前线程
        Schedulers.immediate();

        //开启一个新线程执行任务

        //内部有一个没有上限的线程池,可以重用空闲线程
        //相当于    ExecutorService executorService = Executors.newCachedThreadPool();
        Schedulers.io();

        //有一个固定线程池,大小为CPU核心数+1
        //等价于  Executors.newFixedThreadPool(nCore+1)
        Schedulers.computation();

刚才的例子:
被创建的数组值在IO线程发出,在主线程处理打印

        Observable.from(INT_ARRAYS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() 
                    @Override
                    public void call(Integer integer) 
                        Log.d(TAG, "call: " + integer);
                    
                );

在IO线程加载图片,在主线程显示图片

        Observable
                .create(new Observable.OnSubscribe<Drawable>() 
                    @Override
                    public void call(Subscriber<? super Drawable> subscriber) 
                        Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
                        subscriber.onNext(drawable);
                        subscriber.onCompleted();
                    
                )
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Drawable>() 
                    @Override
                    public void onCompleted() 

                    

                    @Override
                    public void onError(Throwable e) 
                        Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
                    

                    @Override
                    public void onNext(Drawable drawable) 
                        ivShow.setImageDrawable(drawable);
                    
                );

以上是关于RxJava学习笔记---简单使用的主要内容,如果未能解决你的问题,请参考以下文章

RxJava学习笔记---简单使用

RxJava 学习笔记

Android :RxJava学习笔记之创建操作符

RxJava 学习笔记

RxJava1 学习笔记

RxJava1 学习笔记