手写Rxjava了解核心实现原理
Posted guangdeshishe
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写Rxjava了解核心实现原理相关的知识,希望对你有一定的参考价值。
RxJava简介
RxJava全称是Rective Extensions Java(基于Java实现的响应式扩展);Rx是一个响应式编程模型,目标是提供一致的编程接口,帮助开发者方便的处理异步数据流;其最早是微软.Net中LINQ的一个响应式扩展,2012年开始将.NET Rx迁移到JVM上面,并于13年二月份正式向外展示了RxJava。
响应式编程
响应式编程是一种基于异步数据流概念的编程模式;数据/事件就像一条河流,从源头一直往下流,在流动过程中,可以被观测、被过滤、被操作,或者与另一条流合并成一条新的流,最终流向大海被消费掉;
与响应式编程相对应的有同步式编程、异步式编程:
-
同步式编程:比如我们在主线程上请求一个网络接口,一直等到返回结果才能继续执行下一步,这就是同步式的
-
异步式编程:开启一个子线程去请求网络接口,主线程继续执行,然后定时去查询接口返回的结果
-
响应式编程:开启一个子线程去请求网络接口,注册监听后主线程继续执行,网络接口返回数据后,主动回调注册的监听方法,从而达到响应的目的
RxJava可以简单理解为就是观察者模式+异步处理+链式调用(流的概念)
为什么选择RxJava
在android平台上已经为开发者提供了AsyncTask,Handler等用于异步操作的类,为什么还要用RxJava呢?
因为RxJava是基于响应式编程模型,它提供了一系列的操作符,比如切换线程(subscribeOn/observerOn)、过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable等等,通过这些操作符和链式调用可以让代码更加的简洁,线程切换更加容易,处理数据就像工厂里的商品在流水线上不断的被加工处理,最后被消费掉。
例如有个需求是要通过请求网络接口获取酒店房源信息,并过滤出价格大于200的房源并展示出来,在没有使用Rxjava之前可能需要这样:
new Thread()
@Override
public void run()
super.run();
List<HotelInfo> hotelList = getHotelInfoList();//从网络获取酒店列表
for (HotelInfo hotelInfo : hotelList)
List<House> houses = hotelInfo.houses;//从酒店信息中取出房子列表
for (House house : houses)
if (house.price >= 200) //判断房子价格并过滤
runOnUiThread(new Runnable()
@Override
public void run()
//将符合条件房子的信息展示出来
displayAvailableHouse(house);
);
.start();
使用RxJava后:
Observable.fromIterable(getHotelInfoList())//获取数据
.flatMap(new Function<Hotel, Observable<Room>()
@Override
public Observable<Room> apply(Hotel hotel)//取出房源
return Observable.fromIterable(hotel.rooms);
)
.filter(new Predicate<Room>()
@Override
public boolean test(@NonNull Room room) throws Exception
return room.price >= 200;//过滤房源
)
.subscribeOn(Schedulers.io())//切换子线程
.observeOn(AndroidSchedulers.MainThread())//切换主线程
.subscribe(new Consumer<Room>()
@Override
public void accept(Room room)//展示数据
displayAvailableRoom(room);
);
如果结合lambda表达式则更加简洁:
Observable.fromIterable(getHotelInfoList())
.flatMap(hotel -> Observable.fromIterable(hotel.rooms))
.filter(room -> room.price >= 200)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::displayAvailableRoom(it));
通过对比可以看到,使用RxJava后,仅仅通过几个操作符就完成了一系列的操作:
-
没有了for循环,以及循环嵌套
-
不需要我们new线程,异步执行和切换主线程只需要subscribeOn和observeOn两个操作符指定所在线程即可
-
通过链式调用,数据的流向清晰可见,从网络获取酒店列表->从每个酒店取出所有房间信息->过滤出房价大于200的房间->展示出符合条件的房间
-
整体代码更加简洁美观
手写RxJava,了解其核心原理
目标
-
实现Rxjava最基本的观察者(Observer)和被观察者(Observable)
-
实现observerOn和subscribeOn操作符的线程切换功能
-
实现just和fromIterable操作符对集合数据遍历功能
-
实现map操作符对数据类型的转换功能
-
实现flatMap操作符将数据结果转换成并合并多个Observable功能
-
实现filter操作符对数据过滤功能
实现Rxjava最基本的观察者(Observer)和被观察者(Observable)
RxJava原始代码示例:
Observable.create(object : ObservableOnSubscribe<String> //创建被观察者
override fun subscribe(subscriber: ObservableEmitter<String>)
//subscriber是指下游的观察者,当调用subscribe方法订阅时会回调这个方法
printLog("数据源头>>subscribe")
try
subscriber.onNext("1")
subscriber.onComplete()
catch (e: Exception)
subscriber.onError(e)
).subscribe(object : Observer<String> //对被观察者订阅消息
override fun onSubscribe(d: Disposable?)
printLog("订阅成功时回调>>onSubscribe")
override fun onNext(value: String?)
printLog("接收上游数据时回调>>onNext")
override fun onError(e: Throwable?)
printLog("出现错误时回调>>onError")
override fun onComplete()
printLog("结束时回调>>onComplete")
)
运行结果:
订阅成功时回调>>onSubscribe
数据源头>>subscribe
接收上游数据时回调>>onNext
结束时回调>>onComplete
手写RxJava实现:
涉及到的类有:Observable、ObservableOnSubscribe、Observer;为了与原始RxJava的类做区分,我们把类名前面统一加上T
- TObservable.kt : 包含工厂方法create、订阅方法subscribe,为了能够链式调用,每个方法返回的对象必须是TObservable(除了最后调用的subscribe方法)
/**
* 被观察者,Rxjava核心实现类
*/
class TObservable<T> private constructor(val onSubscribe: TObservableOnSubscribe<T>)
companion object
/**
* 工厂方法,创建被观察者Observable
* @param onSubscribe 被观察时的回调接口
*/
fun <T> create(onSubscribe: TObservableOnSubscribe<T>): TObservable<T>
return TObservable<T>(onSubscribe)
/**
* 对当前被观察者进行订阅的方法
* @param observer 观察者
*/
fun subscribe(observer: TObserver<T>)
observer.onSubscribe()
onSubscribe.subscribe(observer)
- TObservableOnSubscribe.kt : 这个是当有观察者对被观察者订阅时的回调接口,包含subscribe方法,该方法传入的ObservableEmitter其实是对Observer进行了一次封装,手写Rxjava时为了简化直接替换成Observer
/**
* 当有观察者Observer对被观察者Observable订阅时的回调接口
*/
interface TObservableOnSubscribe<T>
/**
* 订阅时回调,数据的源头
* @param subscriber 订阅者
*/
fun subscribe(subscriber: TObserver<T>)
- TObserver.kt : 观察者,主要有onSubscribe、onNext、onError、onComplete这几个方法
/**
* 观察者
*/
interface TObserver<T>
/**
* 订阅成功时回调
*/
fun onSubscribe()
/**
* 接收上游数据时回调
* @param obj 上游传递过来的数据
*/
fun onNext(obj: T)
/**
* 出现异常时回调
* @param e 异常信息
*/
fun onError(e: Throwable)
/**
* 结束时回调
*/
fun onComplete()
测试手写RxJava代码:
class TestRxJava
companion object
fun printLog(message: String)
println(message)
fun test()
printLog(">>>>>>>>原始RxJava实现>>>>>>>>")
sourceRxJava()
printLog(">>>>>>>>手写RxJava实现>>>>>>>>")
myRxjava()
fun sourceRxJava()
Observable.create(object : ObservableOnSubscribe<String>
override fun subscribe(subscriber: ObservableEmitter<String>)
printLog("数据源头>>subscribe")
try
subscriber.onNext("1")
subscriber.onComplete()
catch (e: Exception)
subscriber.onError(e)
).subscribe(object : Observer<String>
override fun onSubscribe(d: Disposable?)
printLog("订阅成功时回调>>onSubscribe")
override fun onNext(value: String?)
printLog("接收上游数据时回调>>onNext")
override fun onError(e: Throwable?)
printLog("出现错误时回调>>onError")
override fun onComplete()
printLog("结束时回调>>onComplete")
)
fun myRxjava()
TObservable.create(object : TObservableOnSubscribe<String>
override fun subscribe(subscriber: TObserver<String>)
printLog("数据源头>>subscribe")
try
subscriber.onNext("1")
subscriber.onComplete()
catch (e: Exception)
subscriber.onError(e)
).subscribe(object : TObserver<String>
override fun onSubscribe()
printLog("订阅成功时回调>>onSubscribe")
override fun onNext(value: String)
printLog("接收上游数据时回调>>onNext")
override fun onError(e: Throwable)
printLog("出现错误时回调>>onError")
override fun onComplete()
printLog("结束时回调>>onComplete")
)
运行结果:
>>>>>>>>原始RxJava实现>>>>>>>>
订阅成功时回调>>onSubscribe
数据源头>>subscribe
接收上游数据时回调>>onNext
结束时回调>>onComplete
>>>>>>>>手写RxJava实现>>>>>>>>
订阅成功时回调>>onSubscribe
数据源头>>subscribe
接收上游数据时回调>>onNext
结束时回调>>onComplete
小结:
可以看到我们手写RxJava跟原生的RxJava运行效果是一样,它的实现也是很简单的,就是当最后调用被观察者Observable的subscribe方法进行订阅时,会把观察者Observer传给被观察者,被观察者根据业务情况回调观察者Observer的onSubscribe、onNext、onError、onComplete方法,并把数据传递给观察者;
而所谓的链式调用,其实就类似于建造者模式中的Builder,每次调用被观察者Observable的方法后都返回它本身(除了最后执行的subscribe方法)
实现observerOn和subscribeOn操作符的线程切换功能
-
observerOn用于指定观察者Observer运行所在线程
-
subscribeOn用于指定被观察者Observable运行所在线程
RxJava原始代码示例:
Observable.create(object : ObservableOnSubscribe<String>
override fun subscribe(subscriber: ObservableEmitter<String>) //子线程运行
printLog("数据源头>>subscribe>>thread=$Thread.currentThread().name")
try
subscriber.onNext("1")
subscriber.onComplete()
catch (e: Exception)
subscriber.onError(e)
).subscribeOn(Schedulers.io())//指定Observable在子线程运行
.observeOn(AndroidSchedulers.mainThread())//指定Observer在主线程运行
.subscribe(object : Observer<String>
override fun onSubscribe(d: Disposable?) //主线程运行
printLog("订阅成功时回调>>onSubscribe>>thread=$Thread.currentThread().name")
override fun onNext(value: String?) //主线程运行
printLog("接收上游数据时回调>>onNext>>thread=$Thread.currentThread().name")
override fun onError(e: Throwable?) //主线程运行
printLog("出现错误时回调>>onError>>thread=$Thread.currentThread().name")
override fun onComplete() //主线程运行
printLog("结束时回调>>onComplete>>thread=$Thread.currentThread().name")
)
运行结果:
>>>>>>>>原始RxJava实现>>>>>>>>
订阅成功时回调>>onSubscribe>>thread=main
数据源头>>subscribe>>thread=RxCachedThreadScheduler-1
接收上游数据时回调>>onNext>>thread=main
结束时回调>>onComplete>>thread=main
手写RxJava实现:
线程切换主要涉及到的类有:Observable.subscribeOn、Observable.observeOn、Schedulers;
Schedulers我们可以推测出来里面肯定用到了线程池,切换主线程肯定用到了Handler;所以我们首先需要创建一个TSchedulers类,用于实现在不同线程中执行任务
-
Schedulers.kt 实现任务在子线程或者主线程运行
import android.os.Handler import android.os.Looper import java.util.concurrent.Executor import java.util.concurrent.Executors /** * 用于切换线程的调度器 */ class TSchedulers companion object private val IO = Scheduler(Executors.newFixedThreadPool(2))//定义2个核心线程的IO线程池 private val MAIN = Scheduler(MainThreadExecutor()) /** * 获取IO类型线程执行器 */ fun io(): Scheduler return IO /** * 获取主线程的执行器 */ fun mainThread(): Scheduler return MAIN /** * 定义在主线程执行的执行器,通过Handler实现 */ class MainThreadExecutor : Executor private val handler = Handler(Looper.getMainLooper()) override fun execute(command: Runnable) handler.post(command) /** * 调度器类 */ class Scheduler constructor(private val executor: Executor) fun schedule(task: Runnable) executor.execute(task)
-
Schedulers.kt 实现任务在子线程或者主线程运行
测试手写RxJava代码:
TObservable.create(object : TObservableOnSubscribe<String>
override fun subscribe(subscriber: TObserver<String>)
printLog("数据源头>>subscribe>>thread=$Thread.currentThread().name")
try
subscriber.onNext("1")
subscriber.onComplete()
catch (e: Exception)
subscriber.onError(e)
).subscribeOn(TSchedulers.io())
.observeOn(TSchedulers.mainThread()).subscribe(object : TObserver<String>
override fun onSubscribe()
printLog("订阅成功时回调>>onSubscribe>>thread=$Thread.currentThread().name")
override fun onNext(value: String)
printLog("接收上游数据时回调>>onNext>>thread=$Thread.currentThread().name")
override fun onError(e: Throwable)
printLog("出现错误时回调>>onError>>thread=$Thread.currentThread().name")
override fun onComplete()
printLog("结束时回调>>onComplete>>thread=$Thread.currentThread().name")
)
运行结果:
>>>>>>>>手写RxJava实现>>>>>>>>
订阅成功时回调>>onSubscribe>>thread=main
数据源头>>subscribe>>thread=pool-3-thread-1
订阅成功时回调>>onSubscribe>>thread=main
接收上游数据时回调>>onNext>>thread=main
结束时回调>>onComplete>>thread=main
小结:
- subscribeOn方法中会生成一个新的被观察者,当下游观察者订阅事件时,生成的这个中间被观察者会将下游的订阅者直接交给上游的被观察者,并在执行线程中调用subscribe方法,从而实现被观察者执行线程的切换
- subscribeOn不管调用多少次,都只有第一次调用时是有效的,这是因为顶层的被观察者的subscribe方法只会执行一次,而且不可能同时运行在多个线程上,它只会运行在离它最近的一次调用的subscribeOn的线程上
- observeOn内部也是会创建一个新的被观察者,然后通过新的被观察者去订阅观察上游的被观察者事件,在接收到事件后在指定线程中将事件传递给下游的观察者;
- 虽然可以observeOn可以调用多次,但对于中间的操作来说,只有在它上游离它最近的一次调用observerOn时指定的线程有效
实现just和fromIterable操作符对集合数据遍历功能
- just操作符支持将传入的可变长度变量进行遍历
- fromIterable支持将实现了Iterable接口的集合进行遍历
RxJava原始代码示例:
Observable.just("just1", "just2", "just3")
.subscribe(object : Observer<String>
override fun onSubscribe(d: Disposable?)
override fun onNext(value: String?)
printLog("接收上游数据时回调>>onNext>>$value")
override fun onError(e: Throwable?)
override fun onComplete()
)
Observable.fromIterable(listOf("fromIterable1", "fromIterable2", "fromIterable3"))
.subscribe(object : Observer<String>
override fun onSubscribe(d: Disposable?)
override fun onNext(value: String?)
printLog("接收上游数据时回调>>onNext>>$value")
override fun onError(e: Throwable?)
override fun onComplete()
)
运行结果:
>>>>>>>>原始RxJava实现>>>>>>>>
接收上游数据时回调>>onNext>>just1
接收上游数据时回调>>onNext>>just2
接收上游数据时回调>>onNext>>just3
接收上游数据时回调>>onNext>>fromIterable1
接收上游数据时回调>>onNext>>fromIterable2
接收上游数据时回调>>onNext>>fromIterable3
手写RxJava实现:
TObservable: 实现思路就是在被观察者的subscribe方法中遍历传入的集合
/**
* 被观察者,Rxjava核心实现类
*/
class TObservable<T> private constructor(val onSubscribe: TObservableOnSubscribe<T>)
companion object
/**
* 遍历可变参数
*/
fun <T> just(vararg items: T): TObservable<T>
return create(object : TObservableOnSubscribe<T>
override fun subscribe(subscriber: TObserver<T>)
for (item in items)
subscriber.onNext(item)
)
/**
* 遍历实现Iterable接口的集合
*/
fun <T> fromIterable(list: Iterable<T>): TObservable<T>
return create(object : TObservableOnSubscribe<T>
override fun subscribe(subscriber: TObserver<T>)
for (item in list)
subscriber.onNext(item)
)
测试手写RxJava代码:
TObservable.just("just1", "just2", "just3")
.subscribe(object : TObserver<String>
override fun onSubscribe()
override fun onNext(obj: String)
printLog("接收上游数据时回调>>onNext>>$obj")
override fun onError(e: Throwable)
override fun onComplete()
)
TObservable.fromIterable(listOf("fromIterable1", "fromIterable2", "fromIterable3"))
.subscribe(object : TObserver<String>
override fun onSubscribe()
override fun onNext(obj: String)
printLog("接收上游数据时回调>>onNext>>$obj")
override fun onError(e: Throwable)
override fun onComplete()
)
运行结果:
>>>>>>>>手写RxJava实现>>>>>>>>
接收上游数据时回调>>onNext>>just1
接收上游数据时回调>>onNext>>just2
接收上游数据时回调>>onNext>>just3
接收上游数据时回调>>onNext>>fromIterable1
接收上游数据时回调>>onNext>>fromIterable2
接收上游数据时回调>>onNext>>fromIterable3
小结:
- just和fromIterable操作符是根据传入的集合数据创建一个被观察者,然后遍历每一个集合中的元素
实现map操作符对数据类型的转换功能
map操作符可以将上游传递过来的数据类型转换成另一种数据类型,也可以直接对数据进行加工处理
RxJava原始代码示例:
Observable.just("just1111", "just211", "just3")
.map(object:Function<String,Int>
override fun apply(t: String): Int
return t.length//将数据处理后String类型转换成Int类型
)
.subscribe(object : Observer<Int>
override fun onSubscribe(d: Disposable?)
override fun onNext(value: Int?) //接收转换后的Int类型
printLog("接收上游数据时回调>>onNext>>长度为:$value")
override fun onError(e: Throwable?)
override fun onComplete()
)
运行结果:
>>>>>>>>原始RxJava实现>>>>>>>>
接收上游数据时回调>>onNext>>长度为:8
接收上游数据时回调>>onNext>>长度为:7
接收上游数据时回调>>onNext>>长度为:5
手写RxJava实现:
TFunction.kt : 定义转换接口
/**
* 将一个数据类型转换成另一个数据类型并返回
*/
interface TFunction<T, R>
fun apply(obj: T): R;
TObservable.kt : 实现map方法
fun <R> map(function: TFunction<T, R>): TObservable<R>
return create(object : TObservableOnSubscribe<R>
override fun subscribe(subscriber: TObserver<R>)
this@TObservable.subscribe(object : TObserver<T> //创建观察者对上游事件监听
override fun onSubscribe()
subscriber.onSubscribe()
override fun onNext(obj: T)
//调用传入的转换方法function,将T类型转化成R类型并传递给下游观察者
subscriber.onNext(function.apply(obj))
override fun onError(e: Throwable)
subscriber.onError(e)
override fun onComplete()
subscriber.onComplete()
)
)
测试手写RxJava代码:
TObservable.just("just1111", "just211", "just3")
.map(object : TFunction<String, Int>
override fun apply(t: String): Int
return t.length
)
.subscribe(object : TObserver<Int>
override fun onSubscribe()
override fun onNext(obj: Int)
printLog("接收上游数据时回调>>onNext>>$obj")
override fun onError(e: Throwable)
override fun onComplete()
)
运行结果:
>>>>>>>>手写RxJava实现>>>>>>>>
接收上游数据时回调>>onNext>>8
接收上游数据时回调>>onNext>>7
接收上游数据时回调>>onNext>>5
小结:
通过创建新的被观察者,当下游观察者订阅时,对上游被观察者进行监听,当事件发生时,调用传入的转换方法,并将结果传递给下游的观察者
实现flatMap操作符将数据结果转换成多个Observable并合并成一个Observable统一处理
与map相比,flatMap是将数据处理后转换成一个新的Observable,并且将产生的多个Observable合并成一个Observable供下游订阅,可用于二维数组的遍历;
也可用于被观察者需要依赖另一个被观察者的场景,比如注册成功后跳转登录界面
- concatMap相比flatMap,concatMap接收上游数据是有序的,而faltMap是无序的
RxJava原始代码示例:
Observable.just(Arrays.asList("张三个人信息", "男", "1岁"), Arrays.asList("李四个人信息", "男", "2岁"))
.flatMap(object : Function<List<String>, Observable<String>>
override fun apply(t: List<String>): Observable<String>
printLog("flatMap接收到数据>>>>>>$t")
return Observable.fromIterable(t)
)
.subscribe(object : Observer<String>
override fun onSubscribe(d: Disposable)
override fun onNext(value: String)
printLog("接收上游数据时回调>>onNext>>$value")
override fun onError(e: Throwable?)
override fun onComplete()
)
运行结果:
>>>>>>>>原始RxJava实现>>>>>>>>
flatMap接收到数据>>>>>>[张三个人信息, 男, 1岁]
接收上游数据时回调>>onNext>>张三个人信息
接收上游数据时回调>>onNext>>男
接收上游数据时回调>>onNext>>1岁
flatMap接收到数据>>>>>>[李四个人信息, 男, 2岁]
接收上游数据时回调>>onNext>>李四个人信息
接收上游数据时回调>>onNext>>男
接收上游数据时回调>>onNext>>2岁
手写RxJava实现:
实现思路:
-
创建一个中间层的Observable用于接收下游的订阅者Observer
-
在新创建的Observable.subscribe方法中订阅上游的被观察者Observable
-
当接收到上游被观察者Observable发送过来的数据时,调用flatMap传入的方法,将数据转成Observable
-
将下游的订阅者Observer重新订阅使用flatMap方法生成的Observable
fun <R> flatMap(function: Function<T, TObservable<R>>): TObservable<R> return create(object : TObservableOnSubscribe<R> override fun subscribe(subscriber: TObserver<R>) this@TObservable.subscribe(object : TObserver<T> //创建观察者对上游事件监听 override fun onSubscribe() override fun onNext(obj: T) //调用传入的转换方法function,将类型处理后转成Observable,然后使用下游观察者对其订阅 val observable = function.apply(obj) observable.subscribe(subscriber) override fun onError(e: Throwable) override fun onComplete() ) )
测试手写RxJava代码:
TObservable.just(Arrays.asList("张三个人信息", "男", "1岁"), Arrays.asList("李四个人信息", "男", "2岁")) .flatMap(object : Function<List<String>, TObservable<String>> override fun apply(t: List<String>): TObservable<String> printLog("flatMap接收到数据>>>>>>$t") return TObservable.fromIterable(t)//将数组转成Observable供下游观察者订阅 ) .subscribe(object : TObserver<String> override fun onSubscribe() override fun onNext(obj: String) printLog("接收上游数据时回调>>onNext>>$obj") override fun onError(e: Throwable) override fun onComplete() )
运行结果:
>>>>>>>>手写RxJava实现>>>>>>>> flatMap接收到数据>>>>>>[张三个人信息, 男, 1岁] 接收上游数据时回调>>onNext>>张三个人信息 接收上游数据时回调>>onNext>>男 接收上游数据时回调>>onNext>>1岁 flatMap接收到数据>>>>>>[李四个人信息, 男, 2岁] 接收上游数据时回调>>onNext>>李四个人信息 接收上游数据时回调>>onNext>>男 接收上游数据时回调>>onNext>>2岁
小结:
通过运行结果可以看到flatmap可以实现二维数组遍历的功能,将元素一个个取出后传递给最下游的观察者
实现filter操作符对数据过滤功能
filter操作符可以通过传入一个判断是否过滤的方法,当该方法返回true时表示符合条件,会继续传递给下游订阅者
RxJava原始代码示例:
Observable.just(
Arrays.asList("张三个人信息", "男", "1岁"),
Arrays.asList("李四个人信息", "男", "2岁"),
Arrays.asList("王五个人信息", "男", "1岁")
)
.filter(object : Predicate<List<String>>
override fun test(t: List<String>): Boolean
val age = t[2]
if (age.equals("1岁"))
return true
printLog("filter>>不符合条件的跳过>>$t")
return false
)
.subscribe(object : Observer<List<String>>
override fun onSubscribe(d: Disposable)
override fun onNext(value: List<String>)
printLog("接收上游数据时回调>>onNext>>$value")
override fun onError(e: Throwable?)
override fun onComplete()
)
运行结果:
>>>>>>>>原始RxJava实现>>>>>>>>
接收上游数据时回调>>onNext>>[张三个人信息, 男, 1岁]
不符合条件的跳过>>[李四个人信息, 男, 2岁]
接收上游数据时回调>>onNext>>[王五个人信息, 男, 1岁]
手写RxJava实现:
实现思路:
-
创建一个新的Observable并返回
-
在下游订阅该Observable时,去订阅上游的Observable
-
接收到上游数据后,在onNext方法中,调用传入的过滤方法,如果返回结果是true,则传递给下游观察者的onNext方法,反之则忽略该条数据
fun filter(filter:TPredicate<T>):TObservable<T> return create(object :TObservableOnSubscribe<T> override fun subscribe(subscriber: TObserver<T>) this@TObservable.subscribe(object :TObserver<T> override fun onSubscribe() override fun onNext(obj: T) if (filter.test(obj))//调用传入的过滤方法,返回true表示符合条件,继续传递给下游观察者 subscriber.onNext(obj) override fun onError(e: Throwable) override fun onComplete() ) )
测试手写RxJava代码:
TObservable.just( Arrays.asList("张三个人信息", "男", "1岁"), Arrays.asList("李四个人信息", "男", "2岁"), Arrays.asList("王五个人信息", "男", "1岁") ) .filter(object : TPredicate<List<String>> override fun test(t: List<String>): Boolean val age = t[2] if (age.equals("1岁")) return true printLog("filter>>不符合条件的跳过>>$t") return false ) .subscribe(object : TObserver<List<String>> override fun onSubscribe() override fun onNext(obj: List<String>) printLog("接收上游数据时回调>>onNext>>$obj") override fun onError(e: Throwable) override fun onComplete() )
运行结果:
手写RxJava实现>>>>>>>>
接收上游数据时回调>>onNext>>[张三个人信息, 男, 1岁]
filter>>不符合条件的跳过>>[李四个人信息, 男, 2岁]
接收上游数据时回调>>onNext>>[王五个人信息, 男, 1岁]
小结:
filter操作符跟map和flatMap都比较类似,他们都是传入一个方法处理数据,内部都是创建一个中间被观察者连接上游被观察者和下游的观察者,调用传入方法后并传给下游观察者
总结
- RxJava将观察者模式运用到了极致,使得通过一些简单的操作符就可以实现诸如线程切换、过滤、转换等复杂的业务流程,并通过链式调用保持了代码的可读性和简洁性
- 链式调用是通过在每一个操作符中都返回一个新的被观察者Observable实现
- 每一个操作符的基本实现逻辑,都是先创建新的中间被观察者,利用这个中间被观察者从上游的被观察者获取数据,经过加工处理后,再传递给下游的观察者
- 线程的切换本质上还是通过线程池和Handler实现
- 其他常见操作符:
- map 转换事件,返回普通事件
- flatMap 转换事件,返回Observable
- conactMap concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序
- subscribeOn 规定被观察者所在的线程
- observeOn 规定下面要执行的消费者所在的线程
- take 接受一个 long 型参数 count ,代表至多接收 count 个数据
- debounce 去除发送频率过快的项,常用在重复点击解决上,配合 RxBinging 使用效果很好
- timer 定时任务,多少时间以后发送事件
- interval 每隔一定时间执行一些任务
- skip 跳过前多少个事件
- distinct 去重
- takeUntil 直到到一定条件的是停下,也可以接受另外一个被观察者,当这个被观察者结束之后则停止第一个被观察者
- Zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。不影响Observable的发射,Observable 被观察者会一直发射,不会停,只是Observer 接收不到
- merge 多个 Observable 发射的数据随机发射,不保证先后顺序
- Concat 多个 Observable 组合以后按照顺序发射,保证了先后顺序,不过最多能组合4个 Observable ,多的可以使用 contactArray
- onErrorReturn 遇到错误是发射指定的数据到 onNext,并正常终止
- onErrorResumeReturn 遇到错误时,发射设置好的一个 Observable ,用来发送数据到 onNext,并正常终止
- onExceptionResumeReturn 和onErrorResumeReturn 类似,不同之处在于会判断是否是 Exception。如果是和 onErrorResumeReturn 一样,不是则会调用 onError。不会调用onNext
常见问题:
-
RxJava 如何实现线程切换?
答:会创建一个中间被观察者对象,如果是对被观察者线程切换则直接调用上游的被观察者的subscribe方法,如果是对观察者切换线程,则订阅上游的被观察者事件,接收到事件后在指定的线程上调用下游订阅者的接收方法 -
如果不指定observer的线程,也就是指设置subscribeOn,而不设置observeOn,那observer的线程是什么样的?
答:观察者就会在被观察者所在线程中运行 -
操作符 map 和 flatmap 、concatMap的区别?
答:map是将接收到的数据类型,经过处理后转成另一个数据类型;而flatmap是将接收到的数据处理后转成新的被观察者,然后让下游去订阅;concatMap是在flatMap基础上保持了顺序性 -
RxJava 如何解决内存泄漏?
答:订阅的时候拿到 Disposable ,退出时主动调用 dispose;或者使用RxLifecycle和AutoDispose -
RxJava中有哪些被观察者对象?使用时如何选择?
答:主要有Observable、Flowable、Single、Maybe、Completable- Observable:发射0到N个数据,支持onNext、onError、onComplete的回调
- Flowable:在Observable基础上,支持背压
- Single:只能发送一次数据,只支持onSuccess、onError的回调
- Completable:不会发送任何事件,相当于Runnable直接执行,只支持onComplete、onError的回调
- Maybe:可以发送0或者1个事件,支持onSuccess、onError、onComplete的回调,但是当没有发送数据时,会调用onComplete不会调用onSuccess或者onError;如何发送了数据则不会回调onComplete
-
为什么 subscribeOn() 只有第一次切换有效?
答:因为被观察者是从下往上进行订阅的,在多次调用subscribeOn()之后,被观察者是运行在最顶层指定的所在线程的,也就是第一次设置的所在线程 -
背压是什么,以及Flowable怎么能控制背压?
答:背压是指被观察者,也就是上游发送的数据频率太快,而下游的观察者处理不过来,比如设置按钮点击事件监听,短时间内连续点击多次,下游就会重复处理多次点击事件;
Flowable里面有个缓存队列(大小是128),当上游发送事件时,会先发送到缓存队列中,然后下游的观察者调用request方法请求固定数量的数据,上游被观察者根据下游请求的数据个数来控制发送数据的量,从而避免因数据发送过快而导致的背压问题;
当队列满了时,可以指定5种处理方案:MISSING(抛异常)、ERROR(抛异常)、BUFFER(设置缓存无限大,需要注意OOM)、DROP(超过128个丢弃)、LATEST(保留前128个和最后发送的那个)
以上是关于手写Rxjava了解核心实现原理的主要内容,如果未能解决你的问题,请参考以下文章