Rxjava基本使用

Posted dx我是大雄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava基本使用相关的知识,希望对你有一定的参考价值。

之前转载过一篇Rxjava的使用,现在自己写了一个demo分析下简单的使用,估计下次就该分析源码了


一.基本用法

(1)用的是new  Action1

//     observable call 1 currentThreadmain
//     subscrible callxcqw  我是大雄currentThreadmain
//     observable call 2 currentThreadmain
        Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                System.out.println("xcqw observable call 1 currentThread" + Thread.currentThread().getName());
                subscriber.onNext("xcqw  我是大雄");
                System.out.println("xcqw observable call 2 currentThread" + Thread.currentThread().getName());
            
        ).subscribe(new Action1<String>() 
            @Override
            public void call(String o) 
                System.out.println("xcqw subscrible call" + o + "currentThread" + Thread.currentThread().getName());
            
        );

(2)用的是new subscriber

//       observable call 1 currentThreadmain
//       onNext xcqw  我是大雄asdcurrentThreadmain
//       observable call 2 currentThreadmain
        Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                System.out.println("xcqw observable call 1 currentThread" + Thread.currentThread().getName());
                subscriber.onNext("xcqw  我是大雄asd");
                System.out.println("xcqw observable call 2 currentThread" + Thread.currentThread().getName());
            
        ).subscribe(new Subscriber<String>() 
            @Override
            public void onCompleted() 

            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onNext(String s) 
                System.out.println("xcqw onNext " + s + "currentThread" + Thread.currentThread().getName());
            
        );



二.指定Observable call 和 Subscriber指定直线线程

(1)observable 指定工作线程

    /**
     * 普通的调用 create  指定 observable call的执行线程
     * 可以看出指定的observale call 执行在子线程(Schedulers.io())
     * 而且 subscribe call 也执行在子线程(Schedulers.io(),如果想改变要用observeOn)
     */
    public void methodTwo() 
//        observable call 1 currentThreadRxioscheduler-2
//        subscrible callxcqw  我是大雄currentThreadRxIoScheduler-2
//        observable call 2 currentThreadRxIoScheduler-2
        Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                System.out.println("xcqw observable call 1 currentThread" + Thread.currentThread().getName());
                subscriber.onNext("xcqw  我是大雄");
                System.out.println("xcqw observable call 2 currentThread" + Thread.currentThread().getName());
            
        ).subscribeOn(Schedulers.io())
                .subscribe(new Action1<String>() 
                    @Override
                    public void call(String o) 
                        System.out.println("xcqw subscrible call" + o + "currentThread" + Thread.currentThread().getName());
                    
                );
    

(2)observale call  和 subscribe 指定线程

 /**
     * 普通的调用 create  指定 observable call的执行线程
     * 可以看出指定的observale call 执行在子线程(Schedulers.io())
     * 而且 subscribe call 也执行在子线程(Schedulers.io(),如果想改变要用observeOn)
     * observeOn(androidSchedulers.mainThread())
     */
    public void methodThree() 
//        observable call 1 currentThreadRxIoScheduler-2
//        observable call 2 currentThreadRxIoScheduler-2
//        subscrible callxcqw  我是大雄currentThreadmain
        Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                System.out.println("xcqw observable call 1 currentThread" + Thread.currentThread().getName());
                subscriber.onNext("xcqw  我是大雄");
                System.out.println("xcqw observable call 2 currentThread" + Thread.currentThread().getName());
            
        ).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() 
                    @Override
                    public void call(String o) 
                        System.out.println("xcqw subscrible call" + o + "currentThread" + Thread.currentThread().getName());
                    
                );
    




三.from的用法

  /**
     * from  将其它种类的对象和数据类型转换为Observable
     * 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
     * from默认不在任何特定的调度器上执行。然而你可以将Scheduler作为可选的第二个参数传递给Observable,
     * 它会在那个调度器上管理这个Future。
     * Javadoc: from(array))
     * Javadoc: from(Iterable))
     * Javadoc: from(Future))
     * Javadoc: from(Future,Scheduler))
     * Javadoc: from(Future,timeout, timeUnit))
     */
    public void methodFour() 
        //没有1/0
//        xcqw items1
//        xcqw items2
//        xcqw items3
//        xcqw items4
//        xcqw items5
//        xcqw items6

        //有1/0
//        xcqw onclick2
//        xcqw items1
//        xcqw throwablejava.lang.ArithmeticException: divide by zero
        Integer[] items = 1, 2, 3, 4, 5, 6;
        Observable myObservable = Observable.from(items);
        myObservable.subscribe(new Action1<Integer>() 
            @Override
            public void call(Integer items) 
                System.out.println("xcqw items" + items);
                int x = 1 / 0;
            

        , new Action1<Throwable>() 
            @Override
            public void call(Throwable throwable) 
                System.out.println("xcqw throwable" + throwable.toString());
            
        , new Action0() 
            @Override
            public void call() 

            
        );

    



四.just的用法

(1).

  /**
     * just
     * 将一个或多个对象转换成发射这个或这些对象的一个Observable
     * Just将单个数据转换为发射那个数据的Observable。
     * Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,
     * 而Just只是简单的原样发射,
     * 将数组或Iterable当做单个数据。
     */
//    注意:如果你传递null给Just,它会返回一个发射null值的Observable。
//    不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),
//    如果需要空Observable你应该使用Empty操作符。
//    RxJava将这个操作符实现为just函数,它接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable。
    public void methodFive() 
//        Observable.just(1, 2, 3)
//        xcqw Next: 1
//        xcqw Next: 2
//        xcqw Next: 3
//        xcqw Sequence complete

//        Observable.just(null)
//          onclick2
//          onNext null
//          onCompleted
        Observable.just(null)
//                .subscribe(new Subscriber<Integer>() 
//                    @Override
//                    public void onNext(Integer item) 
//                        System.out.println("xcqw Next: " + item);
//                    
//
//                    @Override
//                    public void onError(Throwable error) 
//                        System.err.println("xcqw Error: " + error.getMessage());
//                    
//
//                    @Override
//                    public void onCompleted() 
//                        System.out.println("xcqw Sequence complete.");
//                    
//                );
                .subscribe(new Subscriber<Object>() 
                    @Override
                    public void onCompleted() 
                        System.out.println("xcqw onCompleted ");
                    

                    @Override
                    public void onError(Throwable e) 
                        System.out.println("xcqw onError " + e.toString());
                    

                    @Override
                    public void onNext(Object o) 
                        System.out.println("xcqw onNext " + o);
                    
                );
    


(2)just执行线程

/**
     * just执行方法   不指定线程默认就是main
     */
    public void methodFive_one() 
//        xcqw onclick2
//        xcqw justMethod Threadmain
//        xcqw onNextUsername='daxiong', age=25, address='NC'
//        xcqw onComplete
        Observable.just(justMethod())
//                .subscribeOn(Schedulers.io()) //如果设置这个  just  依然是main  但是 onNext 就是在子线程
                .subscribe(new Subscriber<User>() 
                    @Override
                    public void onCompleted() 
                        System.out.println("xcqw onComplete");
                    

                    @Override
                    public void onError(Throwable e) 
                        System.out.println("xcqw onError" + e.toString());
                    

                    @Override
                    public void onNext(User user) 
                        System.out.println("xcqw onNext" + user.toString() + "thread" + Thread.currentThread().getName());
                    
                );
    



五.flatmap用法

 /**
     * flatmap  执行在主线程  无论observableOn什么线程
     * 扁平映射,将Observable发射的数据变换为Observables集合,
     * 然后将这些Observable发射的数据平坦化的放进一个单独的Observable,
     * 可以认为是一个将嵌套的数据结构展开的过程
     * <p/>
     * 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
     */
    public void method_five() 

        Observable.from(userList)
                .flatMap(new Func1<User, Observable<Course>>() 
                    @Override
                    public Observable<Course> call(User user) 
                        System.out.println("xcqw Observale thread" + Thread.currentThread().getName());
                        return Observable.from(user.courseList);
                    
                )

//        xcqw Observale threadmain
//        xcqw call course数学0threadmain
//        xcqw call course语文0threadmain
//        xcqw call course英语0threadmain
//        xcqw Observale threadmain
//        xcqw call course数学1threadmain
//        xcqw call course语文1threadmain
//        xcqw call course英语1threadmain
//        xcqw Observale threadmain
//        xcqw call course数学2threadmain
//        xcqw call course语文2threadmain
//        xcqw call course英语2threadmain
//                .subscribe(new Action1<Course>() 
//            @Override
//            public void call(Course course) 
//                System.out.println("xcqw call course"+course.courseName+"thread"+Thread.currentThread().getName());
//            
//        );


//        xcqw Observale threadmain
//        xcqw onNext course数学0threadmain
//        xcqw onNext course语文0threadmain
//        xcqw onNext course英语0threadmain
//        xcqw Observale threadmain
//        xcqw onNext course数学1threadmain
//        xcqw onNext course语文1threadmain
//        xcqw onNext course英语1threadmain
//        xcqw Observale threadmain
//        xcqw onNext course数学2threadmain
//        xcqw onNext course语文2threadmain
//        xcqw onNext course英语2threadmain
//                .subscribe(new Subscriber<Course>() 
//            @Override
//            public void onCompleted() 
//
//            
//
//            @Override
//            public void onError(Throwable e) 
//
//            
//
//            @Override
//            public void onNext(Course course) 
//                System.out.println("xcqw onNext course"+course.courseName+"thread"+Thread.currentThread().getName());
//            
//        );


//            xcqw Observale threadRxIoScheduler-2
//            xcqw Observale threadRxIoScheduler-2
//            xcqw Observale threadRxIoScheduler-2
//            xcqw onNext course数学0threadmain
//            xcqw onNext course语文0threadmain
//            xcqw onNext course英语0threadmain
//            xcqw onNext course数学1threadmain
//            xcqw onNext course语文1threadmain
//            xcqw onNext course英语1threadmain
//            xcqw onNext course数学2threadmain
//            xcqw onNext course语文2threadmain
//            xcqw onNext course英语2threadmain
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Course>() 
                    @Override
                    public void onCompleted() 

                    

                    @Override
                    public void onError(Throwable e) 

                    

                    @Override
                    public void onNext(Course course) 
                        System.out.println("xcqw onNext course" + course.courseName + "thread" + Thread.currentThread().getName());
                    
                );

    



六.map的用法(把一个对象转换成另外一个)

    public void methond_six_map() 
        Observable.from(userList)
                .map(new Func1<User, String>() 
                    @Override
                    public String call(User user) 
                        System.out.println("xcqw map call thread" + Thread.currentThread().getName());
                        return user.name;
                    
                ).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() 
                    @Override
                    public void call(String s) 
                        System.out.println("xcqw action call " + s + "thread" + Thread.currentThread().getName());
                    
                );


//        xcqw map call threadRxIoScheduler-2
//        xcqw map call threadRxIoScheduler-2
//        xcqw map call threadRxIoScheduler-2
//        xcqw action call 大熊0threadmain
//        xcqw action call 大熊1threadmain
//        xcqw action call 大熊2threadmain
//                .subscribeOn(Schedulers.io())
//                .observeOn(AndroidSchedulers.mainThread())
//                .subscribe(new Action1<String>() 
//                    @Override
//                    public void call(String s) 
//                        System.out.println("xcqw action call "+s+ "thread"+Thread.currentThread().getName());
//                    
//                );


//        xcqw map call threadRxIoScheduler-2
//        xcqw action call 大熊0threadRxIoScheduler-2
//        xcqw map call threadRxIoScheduler-2
//        xcqw action call 大熊1threadRxIoScheduler-2
//        xcqw map call threadRxIoScheduler-2
//        xcqw action call 大熊2threadRxIoScheduler-2
//            .subscribeOn(Schedulers.io())
//                    .subscribe(new Action1<String>() 
//                @Override
//                public void call(String s) 
//                    System.out.println("xcqw action call "+s+ "thread"+Thread.currentThread().getName());
//                
//            );


//            xcqw map call threadmain
//            xcqw action call 大熊0threadmain
//            xcqw map call threadmain
//            xcqw action call 大熊1threadmain
//            xcqw map call threadmain
//            xcqw action call 大熊2threadmain
//                .subscribe(new Action1<String>() 
//            @Override
//            public void call(String s) 
//                System.out.println("xcqw action call "+s+ "thread"+Thread.currentThread().getName());
//            
//        );
    



七.Filter的用法

 /**
     * 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
     */
    public void methodEightFilter() 

//        xcqw filter callthreadmain
//        xcqw subscribe call age0threadmain
//        xcqw filter callthreadmain
//        xcqw filter callthreadmain
//        Observable.from(userList)
//                .filter(new Func1<User, Boolean>() 
//                    @Override
//                    public Boolean call(User user) 
//                        System.out.println("xcqw filter call"+"thread"+Thread.currentThread().getName());
//                        return user.name.equals("大熊0");
//                    
//                ).subscribe(new Action1<User>() 
//            @Override
//            public void call(User user) 
//                System.out.println("xcqw subscribe call age"+user.age+"thread"+Thread.currentThread().getName());
//            
//        );


//        xcqw filter callthreadRxIoScheduler-2
//        xcqw subscribe call age0threadRxIoScheduler-2
//        xcqw filter callthreadRxIoScheduler-2
//        xcqw filter callthreadRxIoScheduler-2
//        Observable.from(userList)
//                .filter(new Func1<User, Boolean>() 
//                    @Override
//                    public Boolean call(User user) 
//                        System.out.println("xcqw filter call"+"thread"+Thread.currentThread().getName());
//                        return user.name.equals("大熊0");
//                    
//                ).subscribeOn(Schedulers.io())
//                .subscribe(new Action1<User>() 
//            @Override
//            public void call(User user) 
//                System.out.println("xcqw subscribe call age"+user.age+"thread"+Thread.currentThread().getName());
//            
//        );


//        xcqw filter callthreadRxIoScheduler-2
//        xcqw filter callthreadRxIoScheduler-2
//        xcqw filter callthreadRxIoScheduler-2
//        xcqw subscribe call age0threadmain
        Observable.from(userList)
                .filter(new Func1<User, Boolean>() 
                    @Override
                    public Boolean call(User user) 
                        System.out.println("xcqw filter call" + "thread" + Thread.currentThread().getName());
                        return user.name.equals("大熊0");
                    
                ).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<User>() 
                    @Override
                    public void call(User user) 
                        System.out.println("xcqw subscribe call age" + user.age + "thread" + Thread.currentThread().getName());
                    
                );
    




八.compose的用法

/**
     * compose都是在主线程执行 无论subscibeon 或者 observableon设置成子线程或者主线程执行
     */
    public void methodNineCompose() 
        initData();
//        xcqw onclick2
//        xcqw compose onNext threadname   RxIoScheduler-2
//        xcqw compose onNext threadname   RxIoScheduler-2
//        xcqw compose onNext threadname   RxIoScheduler-2
//        xcqw compose onCompleted threadname   RxIoScheduler-2
        Observable.from(userList).compose(new Observable.Transformer<User, User>() 
            @Override
            public Observable<User> call(Observable<User> userObservable) 
                System.out.println("xcqw compose Observable threadname " + Thread.currentThread().getName());
                return userObservable.subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io());
            
        ).subscribe(new Subscriber<User>() 
            @Override
            public void onCompleted() 
                System.out.println("xcqw compose onCompleted threadname   " + Thread.currentThread().getName());
            

            @Override
            public void onError(Throwable e) 
                System.out.println("xcqw compose onError threadname   " + Thread.currentThread().getName());
            

            @Override
            public void onNext(User user) 
                System.out.println("xcqw compose onNext threadname   " + Thread.currentThread().getName());
            
        );



注意:

Compose 和 flatmap的区别(个人理解就是Compose作用的是整个流,而flatmap作用的是转换的单个对象)

下面是网上的一些详细解释

这一段从这篇博客拷过来,此处谢谢大神http://blog.csdn.net/jdsjlzx/article/details/51508678

compose()和flatMap()有啥区别呢。他们都是发射出Observable,是不是就是说他们都可以复用一系列操作符呢?

The difference is that compose() is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:

中文翻译:

区别在于compose()是高等级的抽象,他操作的是整个流,而不是单一发射出的项目,这里有更多的解释: 
VS 
不同点在于compose()操作符拥有更高层次的抽象概念:它操作于整个数据流中,不仅仅是某一个被发送的事件。具体如下:

原文:

1 compose() is the only way to get the original Observable from the stream. Therefore, operators that affect the whole stream (like subscribeOn() and observeOn()) need to use compose().

In contrast, if you put subscribeOn()/observeOn() in flatMap(), it would only affect the Observable you create in flatMap() but not the rest of the stream.

2 compose() executes immediately when you create the Observable stream, as if you had written the operators inline. flatMap() executes when its onNext() is called, each time it is called. In other words, flatMap() transforms each item, whereas compose() transforms the whole stream.

3 flatMap() is necessarily less efficient because it has to create a new Observable every time onNext() is called. compose() operates on the stream as it is.

中文翻译1:

  1. compose()是唯一一个能从流中获取原生Observable 的方法,因此,影响整个流的操作符(像subscribeOn()和observeOn())需要使用compose(),相对的,如果你在flatMap()中使用subscribeOn()/observeOn(),它只影响你创建的flatMap()中的Observable,而不是整个流。

  2. 当你创建一个Observable流并且内联了一堆操作符以后,compose()会立即执行,flatMap()则是在onNext()被调用以后才会执行,换句话说,flatMap()转换的是每个项目,而compose()转换的是整个流。

  3. flatMap()一定是低效率的,因为他每次调用onNext()之后都需要创建一个新的Observable,compose()是操作在整个流上的。

以上是关于Rxjava基本使用的主要内容,如果未能解决你的问题,请参考以下文章

RXJAVA-FlatMap

RxJava 的基本使用

Rxjava基本使用

RxJava基本使用

Rxjava源码分析&实践RxJava基本原理分析之订阅流

RxJava系列6(从微观角度解读RxJava源码)