RxJava1 学习笔记

Posted 若兰明月

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava1 学习笔记相关的知识,希望对你有一定的参考价值。

RxJava1 学习笔记

标签(空格分隔): 开源项目 android开发


观察者模式UML类图

观察者场景

  • 1、一个方面的操作依赖另一个方面的状态 变化
  • 2、如果在更改一个对象的时候,需要同时连带改变其他的对象
  • 3、当一个对象必须通知其他的对象,但是你又希望这个对象和其他被通知的对象是松散耦合的

RxJava四要素

  • 1、被观察者
  • 2、观察者
  • 3、订阅(被观察者和观察者之间的契约)
  • 4、事件

事件

  • 响应式编程
  • 生活中的对应
 //第一步:创建被观察者:create
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() 
        @Override
        public void call(Subscriber<? super String> subscriber) 
            subscriber.onNext("Hello");
            subscriber.onNext("Imooc");
            subscriber.onCompleted();
        
    );

    //通过just方法来创建被观察者
    Observable observableJust = Observable.just("Hello", "Imooc");

    //通过from方法来创建被观察者
    String[] parameters = "hello", "Imooc";
    Observable observableForm = Observable.from(parameters);


    //第二步:创建观察者
    Observer<Object> observer = new Observer<Object>() 
        @Override
        public void onCompleted() 

        

        @Override
        public void onError(Throwable e) 

        

        @Override
        public void onNext(Object o) 

        
    ;

    public void doRxJava() 
        //第三步:订阅
        observable.subscribe(observer);
    


    //也可以下方的方式创建
     Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 

            
        ).subscribe(new Observer<String>() 
            @Override
            public void onCompleted() 

            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onNext(String s) 

            
        );

三个关键对象和一个核心的方法

  • 1、Observable(被观察者)
  • 2、OnSubscribe
    • Observable(被观察者)内部的一个对象,可以看成被观察者通知观察者的nofity()方法。
  • 3、Subscriber(观察者)
  • 4、subscribe()
    • 订阅方法 完成被观察者和观察者的订阅

第一步,创建Observable,通过Observable.create()方法,当然create()里面的参数就是OnSubscribe参数
这个参数通过RxJavahooks.onCreate() //这个RxJavahooks是1.3版本之后加入的(类似一个抽象的代理类)  最终还是转换成Onservable这个类型 
做完这方法之后,就可以对Onservable立面的onSubscribe这个参数进行赋值了
 protected Observable(OnSubscribe<T> f) 
        this.onSubscribe = f;
    

第二步就是创建观察者

  //第二步:创建观察者
    Observer<Object> observer = new Observer<Object>() 
        @Override
        public void onCompleted() 

        

        @Override
        public void onError(Throwable e) 

        

        @Override
        public void onNext(Object o) 

        
    ;


    Subscriber<String> subscriber = new Subscriber<String>() 
        @Override
        public void onCompleted() 

        

        @Override
        public void onError(Throwable e) 

        

        @Override
        public void onNext(String s) 

        
    ;

可以看到有两种方式创建观察者,一种是直接创建Observer,另一种就是创建Subscriber,其中Subscriber是实现了Observer这个接口,不过即使使用Observer创建观察者,最终内部还是把他转换成Subscriber这个。(下方这个就是Observer被订阅的方法)

 public final Subscription subscribe(final Observer<? super T> observer) 
 //可以看到这个地方  如果是匹配成Subscriber  那么进行subscribe()这个方法的传递
        if (observer instanceof Subscriber) 
            return subscribe((Subscriber<? super T>)observer);
        
        if (observer == null) 
            throw new NullPointerException("observer is null");
        
        //如果  不满足以上的  那么就直接返回
        return subscribe(new ObserverSubscriber<T>(observer));
    

Subscriber

//实现了Observer和Subscription这两个接口
abstract class Subscriber<T> implements Observer<T>, Subscription

//该类里面有
private final SubscriptionList subscriptions; 也就是各种订阅事件集合,当我们取消订阅的时候会调用Subscription里面的unsubscribe()方法去进行订阅取消。

 if (!unsubscribed) 
            List<Subscription> list;
            synchronized (this) 
                if (unsubscribed) 
                    return;
                
                unsubscribed = true;
                list = subscriptions;
                subscriptions = null; //置空
            
            // we will only get here once
            unsubscribeFromAll(list);
        

进行订阅subscribe(Subscriber

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) 
     // validate and proceed 进行验证
        if (subscriber == null) 
            throw new IllegalArgumentException("subscriber can not be null");
        
        if (observable.onSubscribe == null) 
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        

        // new Subscriber so onStart it  这个是一个空方法 我们可以在进行订阅之前进行相关操作 例如loading()
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped   对subscriber进行非SafeSubscriber的处理   SafeSubscriber就是对Subscriber进行了一系列的处理(包装) 更加安全
        if (!(subscriber instanceof SafeSubscriber)) 
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try 
            // allow the hook to intercept and/or decorate
            //调用call()方法 进行订阅
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            //然后进行Subscription的返回  返回类型是Subscription  我们知道这个Subscription是每一个观察者必须实现的一个接口,也就是说我们完成一次订阅  就会在Subsriptionlist里面进行增加一个观察者,当我们需要取消某个订阅的时候  就可以采用unscribered()方法进行取消需要取消的那个订阅  
            return RxJavaHooks.onObservableReturn(subscriber);
         catch (Throwable e) 
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) 
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
             else 
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try 
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                 catch (Throwable e2) 
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                
            
            return Subscriptions.unsubscribed();
        
    

RxJava的相关操作符

变换操作符

  • 就是将事件序列中的对象或者整个序列进行加工处理

Map操作符

  • 就是把一个事件转换成另一个事件
  • map()函数接收一个Func1类型的参数,然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。
Observable
// just()方法return ScalarSynchronousObservable.create(value);  而ScalarSynchronousObservable是Observable的一个子类 所以也是一个Observable  可以进行链式调用的基础
.just("map/iamge/image.png")
                //这里的map里面的Func1的这个函数接收两个参数
                //第一个参数就是Observable传递的String类型的参数
                //第二个参数是我们需要转换成的参数  这里是Bitmap
                .map(new Func1<String, Bitmap>() 
                    @Override
                    public Bitmap call(String filePath) 
                        return getBitmap(filePath);
                    
                )
                //subscribe这个是完成订阅的核心之一
                .subscribe(new Action1<Bitmap>() 
            @Override
            public void call(Bitmap bitmap) 
                showBitmap(bitmap);
            
        );

flatMap操作符

  • 1、将传入的事件对象转换成一个Observable对象
  • 2、这是不会直接发送这个Observable,而是将这个Observable激活让它自己开始发送事件
  • 3、每一个创建出来的Observable发送的事件,都会汇入同一个Observable

    public Subscription flatMap() 
        return Observable.just("dddd", "ddddd", "dadafadf")
                //传入的参数类型
                //第一个参数是String类型  也就是上面just里面的参数类型的item类型
                //第二个类型则是Observable类型 
                .flatMap(new Func1<String, Observable<String>>() 
                    @Override
                    public Observable<String> call(String s) 
                        return createIvservable(s);
                    
                ).subscribe(new Action1<String>() 
                    @Override
                    public void call(String s) 
    
                    
                );
    

    RxJava线程控制

    Android多线程编程原则

    • 第一、不要阻塞UI线程
    • 第二、不要在UI线程之外访问UI组件

    Android多线程

  • 解决
    • handler / asynctask
    • rxjava:线程控制

RxJava遵循线程不变原则

线程控制符

  • Schedulers.immediate()
    • 默认不切换操作
  • Schedulers.newThread()
    • 总是去切换线程,默认在新线程里面进行操作
  • Schedulers.io()
    • 读取文件,io操作 类似第二个newThread()这个方法,但是io()里面设定有一个无数量上限的线程池,可以复用更多,在大多数情况下,这个io()方法比我们调用newThread()方法效率更高
  • Schedulers.computation()

    • 主要用于计算使用
  • AndroidSchedulers.mainThread()

    • 切换到主线程

RxJava如何进行线程控制

  • 1、subscribeOn()
    • 指定我们的subscribe ,也就是我们订阅观察者的时候所发生的线程,也就是我们的Observable里面的onSubscribe被激活的时候所处的线程。
  • 2、observeOn()
    • 指定我们的subscribe所运行所在的线程,或者称为事件消费所在的线程。

observeOn()指定的是他之后的操作所在的线程,observeOn()是支持多次调用的。subscribeOn()则是只能调用一次

SubscribeOn方法
  • 1、会生成一个新的Observable
  • 2、onSubscribe会在目标Subscriber订阅的时候使用传入的Scheduler的worker作为线程调度执行者
  • 3、在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber
  • 4、这个Subscriber又会通知到目标Subscriber,从而完成我们的subscribeOn的过程
ObserveOn和subscribeOn方法
  • subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的
  • observeOn是通过operator操作符的形式去完成线程切换的,所以他的作用域和其他操作符一样,是调用observeOn之后的链路
  • observeOn()指定的是它之后的操作所在的线程,通过observeOn()的多次调用,程序实现了线程的多次切换
  • subscribeOn()的位置放在哪里都可以,但是他只能调用一次,原因就是subscribeOn是通过新建的Observable的方式

以上是关于RxJava1 学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

TouTiao开源项目 分析笔记5

RxJava 2.x 理解-1

2016年4月3日_JAVA学习笔记

RxJava入门之路

将js进行到底:node学习笔记2

java学习笔记之初识多线程