RxJava学习入门1.基本概念和常用的创建操作符
Posted 编程圈子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava学习入门1.基本概念和常用的创建操作符相关的知识,希望对你有一定的参考价值。
RxJava学习入门1.基本概念和常用的创建操作符
一、简介
RxJava是基于Jvm的响应式编程框架, 本文基于RxJava2.0。
1. RxJava的一些优势:
- 响应式编程
- 为异步而生
- 支持链式调用
- 简化异常处理
适应于网络请求、数据库读写、文件读写、定时任务等。
2. 几个重要的概念
(1) 观察者: Observer
观察事件变化并处理的主要角色。
消费者: Consumer,理解为一种特殊的观察者。
(2) 被观察者: 触发事件并决定什么时候发送事件的主要角色。
异常和完成也是一种事件。
- Observable,Flowable,Single,Completable,Maybe五种被观察者。
- Flowable支持背压
- Signle,Completable,Maybe是简化版的Observable。
(3) 订阅:观察者和被观察者建立关联
观察者订阅被观察者。
3. 六大类操作符
- 创建操作符
- 转换操作符
- 组合操作符
- 功能操作符
- 过滤操作符
操作符,通俗理解就是用来创建Observable的一些重载方法。
二、 创建操作符
1. 创建demo项目
引入rxjava2.0的包:
implementation "io.reactivex.rxjava2:rxjava:2.2.21"
implementation "io.reactivex.rxjava2:rxandroid:2.0.2"
创建一个测试用例。
2. 创建操作符示例
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MainActivityTest
@Test
public void test01()
Observable.create(new ObservableOnSubscribe<Object>()
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception
// 事件产生的地方
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
).subscribe(new Observer<Object>()
@Override
public void onSubscribe(@NotNull Disposable d)
System.out.println("建立订阅时调用: onSubscribe");
@Override
public void onNext(@NotNull Object o)
System.out.println("调用 onNext");
@Override
public void onError(@NotNull Throwable e)
System.out.println("调用 onError");
@Override
public void onComplete()
System.out.println("订阅执行完成 onComplete");
);
执行效果:
建立订阅时调用: onSubscribe
调用 onNext
调用 onNext
调用 onNext
订阅执行完成 onComplete
说明:Observer 去观察 Observable,可以在接口中写观察后的逻辑代码。为了方便书写和复用,后面将Observer定义单独拿出来。
3. 消费者
消费者与订阅者相比会简化处理逻辑。
public class MainActivityTest
@Test
public void test01()
Observer observer = new Observer<Object>()
@Override
public void onSubscribe(@NotNull Disposable d)
System.out.println("建立订阅时调用: onSubscribe");
@Override
public void onNext(@NotNull Object o)
System.out.println("调用 onNext");
@Override
public void onError(@NotNull Throwable e)
System.out.println("调用 onError");
@Override
public void onComplete()
System.out.println("订阅执行完成 onComplete");
;
Observable.create(new ObservableOnSubscribe<Object>()
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception
// 事件产生的地方
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
// emitter.onError(new Throwable("异常"));
emitter.onComplete();
).subscribe(new Consumer<Object>()
@Override
public void accept(Object o) throws Exception
System.out.println("accept 对象" + o);
, new Consumer<Throwable>()
@Override
public void accept(Throwable throwable) throws Exception
System.out.println("异常发生" + throwable);
);
运行效果:
accept 对象1
accept 对象2
accept 对象3
4. just 操作符
just操作符用来大幅简化Observable的创建,但最多只能传10个事件。
// 这个Observer 后面代码将会复用。
Observer observer = new Observer<Object>()
@Override
public void onSubscribe(@NotNull Disposable d)
System.out.println("建立订阅时调用: onSubscribe");
@Override
public void onNext(@NotNull Object o)
System.out.println("调用 onNext:" + o);
@Override
public void onError(@NotNull Throwable e)
System.out.println("调用 onError");
@Override
public void onComplete()
System.out.println("订阅执行完成 onComplete");
;
@Test
public void testJust()
Observable.just("1","ABC","2")
.subscribe(observer);
执行效果:
建立订阅时调用: onSubscribe
调用 onNext:1
调用 onNext:ABC
调用 onNext:2
订阅执行完成 onComplete
5. fromArray操作符,类似于just,但数组不受10个限制
这里沿用上面的observer。
@Test
public void testArray()
Observable.fromArray("1","ABC","2")
.subscribe(observer);
6. fromIterable 迭代器操作符
@Test
public void testTerable()
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
Observable.fromIterable(list)
.subscribe(observer);
7. fromFuture操作符
@Test
public void testFuture()
Observable.fromFuture(new Future<Object>()
@Override
public boolean cancel(boolean b)
return false;
@Override
public boolean isCancelled()
return false;
@Override
public boolean isDone()
return false;
@Override
public Object get() throws ExecutionException, InterruptedException
return "abc";
@Override
public Object get(long l, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException
return null;
)
.subscribe(observer);
8. fromCallable
@Test
public void testCallable()
Observable.fromCallable(new Callable<Object>()
@Override
public Object call() throws Exception
return "ABC";
)
.subscribe(observer);
以上是关于RxJava学习入门1.基本概念和常用的创建操作符的主要内容,如果未能解决你的问题,请参考以下文章