RxJava学习入门2.转换组合功能操作符

Posted 编程圈子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava学习入门2.转换组合功能操作符相关的知识,希望对你有一定的参考价值。

RxJava学习入门2.转换、组合、功能操作符

一、转换操作符

1. map

对Observable的事件进行处理,产生新的事件,再次发射。

    @Test
    public void testMap()

        Observable.just("Hello")   // 这个消息将被map处理
                .map(new Function<String, Object>()

                    @Override
                    public Object apply(@NotNull String s) throws Exception 
                        System.out.println("中间人:" + s);
                        return "map function";   // map生成的事件再给外面观察者使用
                    
                ).subscribe(observer);
    

运行效果:

建立订阅时调用: onSubscribe
中间人:Hello
调用 onNext:map function
订阅执行完成 onComplete

2. flatMap

对事件进行处理,产生新的Observable。

    @Test
    public void testFlatMap()

        Observable.just("Hello","1","2","3")
                .flatMap(new Function<String, ObservableSource<?>>()

                    @Override
                    public ObservableSource apply(@NotNull String s) throws Exception 
                        return Observable.just("新的Observable:" + s);
                    
                ).subscribe(observer);
    
建立订阅时调用: onSubscribe
调用 onNext:新的Observable
订阅执行完成 onComplete

3. concatMap

类似于flatMap,concatMap转出来的事件是有序的,flatMap是无序的。

4. buffer

把事件合并发送。

    @Test
    public void testBuffer()

        Observable.just("Hello","1","2","3","4","5","6","7","8","9")
                .buffer(4)
                .subscribe(observer);
    

运行效果:

建立订阅时调用: onSubscribe
调用 onNext:[Hello, 1, 2, 3]
调用 onNext:[4, 5, 6, 7]
调用 onNext:[8, 9]
订阅执行完成 onComplete

二、组合操作符

1. concat

把Observable的事件组合在一起发射。

    @Test
    public void testConcat()

        Observable.concat(Observable.just("1"),
                Observable.just("2"),
                Observable.just("3"))
        .subscribe(observer);
    

运行效果:

建立订阅时调用: onSubscribe
调用 onNext:1
调用 onNext:2
调用 onNext:3
订阅执行完成 onComplete

可以看到订阅和完成只执行了一次。

2. concatArray

3. merge

和concat区别是,concat是串行发送,merge并行发送。

三、功能操作符

下面的操作示例演示在执行耗时操作时如何 进行线程切换。

1. 普通的订阅事件,程序在同一个线程运行

    Observer observer = new Observer<Object>() 
        @Override
        public void onSubscribe(@NotNull Disposable d) 
            System.out.println("建立订阅时调用: onSubscribe,线程:" + Thread.currentThread());
        

        @Override
        public void onNext(@NotNull Object o) 
            System.out.println("调用 onNext:" + o + "线程:" + Thread.currentThread());
        

        @Override
        public void onError(@NotNull Throwable e) 
            System.out.println("调用 onError,线程:" + Thread.currentThread());
        

        @Override
        public void onComplete() 
            System.out.println("订阅执行完成 onComplete,线程:" + Thread.currentThread());
        
    ;

    @Test
    public void testTool()

        Observable.create(new ObservableOnSubscribe<Object>() 
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception 
                emitter.onNext("1");
                emitter.onNext("2");
            
        )
            .subscribe(observer);
    

运行效果:

建立订阅时调用: onSubscribe,线程:Thread[main,5,main]
调用 onNext:1线程:Thread[main,5,main]
调用 onNext:2线程:Thread[main,5,main]

2. subscribeOn、observerOn 进行线程调度

  • subscribeOn : observable 在哪个线程执行,如果有多个以第一个为准。
  • observerOn:Subscriber在哪个线程执行,链式操作时对后续操作起作用。

@RunWith(androidJUnit4.class)
public class ExampleInstrumentedTest 
    private String TAG = ExampleInstrumentedTest.class.getSimpleName();

    @Test
    public void useAppContext() 
        // Context of the app under test.
        Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
        assertEquals("com.cn.whr.iot.app.rxjavaleran", appContext.getPackageName());
    

    Observer observer = new Observer<Object>() 
        @Override
        public void onSubscribe(@NotNull Disposable d) 
            Log.i(TAG, "建立订阅时调用,线程:" + Thread.currentThread().getName());
        

        @Override
        public void onNext(@NotNull Object o) 
            Log.i(TAG, "调用 onNext:" + o + ",线程:" + Thread.currentThread().getName());
        

        @Override
        public void onError(@NotNull Throwable e) 
            Log.i(TAG, "调用 onError,线程:" + Thread.currentThread().getName());
        

        @Override
        public void onComplete() 
            Log.i(TAG, "订阅执行完成 onComplete,线程:" + Thread.currentThread().getName());
        
    ;


    @Test
    public void testTool()

        Observable.create(new ObservableOnSubscribe<Object>() 
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception 
                Thread.sleep(2000);
                emitter.onNext("1");
                emitter.onNext("2");
            
        )
                // 线程调度 这里让 subscribe 运行在新线程
                .subscribeOn(Schedulers.newThread())
                // 下游放到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Object, Object>() 
                    @Override
                    public Object apply(@NotNull Object o) throws Exception 
                        Log.i(TAG, "map apply,线程:" + Thread.currentThread().getName());
                        return "map ";
                    
                )
                .subscribe(observer);
        try 
            Thread.sleep(3000);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

运行效果:

建立订阅时调用,线程:Instr: androidx.test.runner.AndroidJUnitRunner
map apply,线程:main
调用 onNext:map ,线程:main
map apply,线程:main
调用 onNext:map ,线程:main

再看多加一个observeOn的效果:

        Observable.create(new ObservableOnSubscribe<Object>() 
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception 
                Thread.sleep(2000);
                emitter.onNext("1");
                emitter.onNext("2");
            
        )
                // 线程调度 这里让 subscribe 运行在新线程
                .subscribeOn(Schedulers.newThread())
                // 下游放到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Object, Object>() 
                    @Override
                    public Object apply(@NotNull Object o) throws Exception 
                        Log.i(TAG, "map apply,线程:" + Thread.currentThread().getName());
                        return "map ";
                    
                )
                .observeOn(Schedulers.io())
                .subscribe(observer);

执行效果:

建立订阅时调用,线程:Instr: androidx.test.runner.AndroidJUnitRunner
map apply,线程:main
map apply,线程:main
调用 onNext:map ,线程:RxCachedThreadScheduler-1
调用 onNext:map ,
  • Schedulers.newThread() 调度器创建新线程
  • AndroidSchedulers.mainThread() 安卓主线程
  • Schedulers.io() 调试器创建的可复用的线程池

3. doOnNext

调度onNext线程。
示例:

        Observable.create(new ObservableOnSubscribe<Object>() 
            @Override
            public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception 
                Thread.sleep(2000);
                emitter.onNext("1");
                emitter.onNext("2");
            
        )
                .doOnNext(new Consumer<Object>() 
                    @Override
                    public void accept(Object o) throws Exception 
                        System.out.println("doOnNext,线程:" + Thread.currentThread().getName());
                    
                )
                // 线程调度 这里让 subscribe 运行在新线程
                .subscribeOn(Schedulers.newThread())
                // 下游放到主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Object, Object>() 
                    @Override
                    public Object apply(@NotNull Object o) throws Exception 
                        Log.i(TAG, "map apply,线程:" + Thread.currentThread().getName());
                        return "map ";
                    
                )
                .observeOn(Schedulers.io())
                .subscribe(observer);

运行效果:

建立订阅时调用,线程:Instr: androidx.test.runner.AndroidJUnitRunner
doOnNext,线程:RxNewThreadScheduler-1
map apply,线程:main
doOnNext,线程:RxNewThreadScheduler-1
map apply,线程:main
调用 onNext:map ,线程:RxCachedThreadScheduler-1
调用 onNext:map ,线程:RxCachedThreadScheduler-1

四、过滤与条件操作符

1. filter

根据一定条件过滤事件。


    @Test
    public void testTool()

        Observable.range(1,10)
                .filter(new Predicate<Integer>() 
                    @Override
                    public boolean test(@NotNull Integer integer) throws Exception 
                        return integer<5;
                    
                )
                .subscribe(observer);
    

运行效果:

建立订阅时调用: onSubscribe,线程:Thread[main,5,main]
调用 onNext:1线程:Thread[main,5,main]
调用 onNext:2线程:Thread[main,5,main]
调用 onNext:3线程:Thread[main,5,main]
调用 onNext:4线程:Thread[main,5,main]
订阅执行完成 onComplete,线程:Thread[main,5,main]

2. all 判断所有事件是否全满足了条件

全满足就返回true,否则返回false。

    @Test
    public void testTool()

        Observable.range(1,10)
                .all(new Predicate<Integer>() 
                    @Override
                    public boolean test(@NotNull Integer integer) throws Exception 
                        return integer<11; // 11 打印true, 5 打印false
                    
                )
                .subscribe(new Consumer<Boolean>() 
                    @Override
                    public void accept(Boolean aBoolean) throws Exception 
                        System.out.println(aBoolean);
                    
                );
    

3. takeWhile 发射数据到某条件时停止

    @Test
    public void testTool()

        Observable.range(1,10)
                .takeWhile(new Predicate<Integer>() 
                    @Override
                    public boolean test(@NotNull Integer integer) throws Exception 
                        return integer<5;
                    
                )
                .subscribe(observer);
    
建立订阅时调用: onSubscribe,线程:Thread[main,5,main]
调用 onNext:1线程:Thread[main,5,main]
调用 onNext:2线程:Thread[main,5,main]
调用 onNext:3线程:Thread[main,5,main]
调用 onNext:4线程:Thread[main,5,main]
订阅执行完成 onComplete,线程:Thread[main,5,main]

4. skipWhile 跳过条件,直到满足条件再发射

以上是关于RxJava学习入门2.转换组合功能操作符的主要内容,如果未能解决你的问题,请参考以下文章

RxJava- 操作符之组合Observable

Android函数响应式编程最新RxJava-操作符入门

RxJava学习入门1.基本概念和常用的创建操作符

rxjava-几类变换2

RxJava进阶四(组合类操作符)

RxJava进阶二(转换类操作符)