RxJava学习入门1.基本概念和常用的创建操作符

Posted 编程圈子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava学习入门1.基本概念和常用的创建操作符相关的知识,希望对你有一定的参考价值。

RxJava学习入门1.基本概念和常用的创建操作符

一、简介

RxJava是基于Jvm的响应式编程框架, 本文基于RxJava2.0。

1. RxJava的一些优势:

  • 响应式编程
  • 为异步而生
  • 支持链式调用
  • 简化异常处理
    适应于网络请求、数据库读写、文件读写、定时任务等。

2. 几个重要的概念

(1) 观察者: Observer

观察事件变化并处理的主要角色。

消费者: Consumer,理解为一种特殊的观察者。

(2) 被观察者: 触发事件并决定什么时候发送事件的主要角色。

异常和完成也是一种事件。

  • Observable,Flowable,Single,Completable,Maybe五种被观察者。
  • Flowable支持背压
  • Signle,Completable,Maybe是简化版的Observable。

(3) 订阅:观察者和被观察者建立关联

观察者订阅被观察者。

3. 六大类操作符

  • 创建操作符
  • 转换操作符
  • 组合操作符
  • 功能操作符
  • 过滤操作符
    操作符,通俗理解就是用来创建Observable的一些重载方法。

二、 创建操作符

1. 创建demo项目

引入rxjava2.0的包:

    implementation "io.reactivex.rxjava2:rxjava:2.2.21"
    implementation "io.reactivex.rxjava2:rxandroid:2.0.2"

创建一个测试用例。

2. 创建操作符示例

import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MainActivityTest 

    @Test
    public void test01()
        Observable.create(new ObservableOnSubscribe<Object>() 
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception
                // 事件产生的地方
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            
        ).subscribe(new Observer<Object>() 
            @Override
            public void onSubscribe(@NotNull Disposable d) 
                System.out.println("建立订阅时调用: onSubscribe");
            

            @Override
            public void onNext(@NotNull Object o) 
                System.out.println("调用 onNext");
            

            @Override
            public void onError(@NotNull Throwable e) 
                System.out.println("调用 onError");
            

            @Override
            public void onComplete() 
                System.out.println("订阅执行完成 onComplete");
            
        );
    

执行效果:

建立订阅时调用: onSubscribe
调用 onNext
调用 onNext
调用 onNext
订阅执行完成 onComplete

说明:Observer 去观察 Observable,可以在接口中写观察后的逻辑代码。为了方便书写和复用,后面将Observer定义单独拿出来。

3. 消费者

消费者与订阅者相比会简化处理逻辑。


public class MainActivityTest 

    @Test
    public void test01()
        Observer observer = new Observer<Object>() 
            @Override
            public void onSubscribe(@NotNull Disposable d) 
                System.out.println("建立订阅时调用: onSubscribe");
            

            @Override
            public void onNext(@NotNull Object o) 
                System.out.println("调用 onNext");
            

            @Override
            public void onError(@NotNull Throwable e) 
                System.out.println("调用 onError");
            

            @Override
            public void onComplete() 
                System.out.println("订阅执行完成 onComplete");
            
        ;
        Observable.create(new ObservableOnSubscribe<Object>() 
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception
                // 事件产生的地方
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");

//                emitter.onError(new Throwable("异常"));
                emitter.onComplete();
            
        ).subscribe(new Consumer<Object>() 
            @Override
            public void accept(Object o) throws Exception 
                System.out.println("accept 对象" + o);
            
        , new Consumer<Throwable>() 
            @Override
            public void accept(Throwable throwable) throws Exception 
                System.out.println("异常发生" + throwable);
            
        );
    

运行效果:

accept 对象1
accept 对象2
accept 对象3

4. just 操作符

just操作符用来大幅简化Observable的创建,但最多只能传10个事件。

    // 这个Observer 后面代码将会复用。
    Observer observer = new Observer<Object>() 
        @Override
        public void onSubscribe(@NotNull Disposable d) 
            System.out.println("建立订阅时调用: onSubscribe");
        

        @Override
        public void onNext(@NotNull Object o) 
            System.out.println("调用 onNext:" + o);
        

        @Override
        public void onError(@NotNull Throwable e) 
            System.out.println("调用 onError");
        

        @Override
        public void onComplete() 
            System.out.println("订阅执行完成 onComplete");
        
    ;

    @Test
    public void testJust()
        Observable.just("1","ABC","2")
                .subscribe(observer);
    

执行效果:

建立订阅时调用: onSubscribe
调用 onNext:1
调用 onNext:ABC
调用 onNext:2
订阅执行完成 onComplete

5. fromArray操作符,类似于just,但数组不受10个限制

这里沿用上面的observer。

    @Test
    public void testArray()
        Observable.fromArray("1","ABC","2")
            .subscribe(observer);
    

6. fromIterable 迭代器操作符

    @Test
    public void testTerable()
        ArrayList<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        Observable.fromIterable(list)
            .subscribe(observer);
    

7. fromFuture操作符

    @Test
    public void testFuture()

        Observable.fromFuture(new Future<Object>() 
            @Override
            public boolean cancel(boolean b) 
                return false;
            

            @Override
            public boolean isCancelled() 
                return false;
            

            @Override
            public boolean isDone() 
                return false;
            

            @Override
            public Object get() throws ExecutionException, InterruptedException 
                return "abc";
            

            @Override
            public Object get(long l, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException 
                return null;
            
        )
            .subscribe(observer);
    

8. fromCallable

    @Test
    public void testCallable()

        Observable.fromCallable(new Callable<Object>() 
            @Override
            public Object call() throws Exception 
                return "ABC";
            
        )
            .subscribe(observer);
    

以上是关于RxJava学习入门1.基本概念和常用的创建操作符的主要内容,如果未能解决你的问题,请参考以下文章

RxJava 从入门到出轨

Android函数响应式编程最新RxJava-操作符入门

RxJava学习总结

RxJava学习入门2.转换组合功能操作符

RxJava学习入门2.转换组合功能操作符

Rxjava - 操作符,线程操作的简单使用