RxJava初探

Posted hrbust_wgq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava初探相关的知识,希望对你有一定的参考价值。

RxJava基础

RxJava是什么

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

RxJava就是一个实现异步操作的库。

前言

异步操作在调度过程比较复杂的情况下,经常会既难写也难被读懂。

假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。

//直观写法
new Thread() 
    @Override
    public void run() 
        super.run();
        for (File folder : folders) 
            File[] files = folder.listFiles();
            for (File file : files) 
                if (file.getName().endsWith(".png")) 
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() 
                        @Override
                        public void run() 
                            imageCollectorView.addImage(bitmap);
                        
                    );
                
            
        
    
.start();
//RxJava写法
Observable.fromArray(folders)
        .flatMap(new Function<File, ObservableSource<File>>() 
            @Override public ObservableSource<File> apply(File file) throws Exception 
                return Observable.fromArray(file.listFiles());
            
        )
        .filter(new Predicate<File>() 
            @Override public boolean test(File file) throws Exception 
                return file.getName().endsWith(".png");
            
        )
        .map(new Function<File, Bitmap>() 
            @Override public Bitmap apply(File file) throws Exception 
                return getBitmapFromFile(file);
            
        )
        .subscribeOn(Schedulers.io())
        .observeOn(androidSchedulers.mainThread())
        .subscribe(new Observer<Bitmap>() 
            @Override public void onSubscribe(Disposable d) 

            

            @Override public void onNext(Bitmap bitmap) 
                imageCollectorView.addImage(bitmap);
            

            @Override public void onError(Throwable e) 

            

            @Override public void onComplete() 

            
        );


观察一下你会发现, RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,避免了地狱式的回调,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想如果还要求只选取前 10 张图片,常规方式要怎么办?如果有更多这样那样的要求呢?)。

在使用 RxJava 的情况下,很复杂的逻辑依然只是一条链式调用就完成了。它很长,但很清晰。所以, RxJava 好在哪?就好在简洁,好在那把什么复杂逻辑都能穿成一条线的简洁。

基本概念

  • Observable:被观察者
  • Observer:观察者
  • subscribe:订阅
  • 事件

RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() )之外,还定义了两个特殊的事件:onComplete()onError()

  • onComplete(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onComplete() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onComplete()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onComplete()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

基本Observable操作

1.创建

create : 通过函数从头开始创建 Observable

2. 转换

**Buffer **: 定期将 Observable 发出的项目收集到包中并发出这些包,而不是一次发出一个项目。

flatMap : 将 Observable 发射的项目转换为 Observable,然后将这些项目的发射扁平化为单个 Observable。

注意:这里可能会交错,顺序请使用concat

groupBy :将 Observable 划分为一组 Observable,每个 Observable 从原始 Observable 发出不同的项目子集。

Scan : 将函数应用于 Observable 发出的每个项目,顺序地,并发出每个连续的值。

2. 打包

Zip : 当两个Observables都有新数据时,将他们组合输出。

CombineLatest :当两个Observables有一个有新数据时,则将两个Observables最后一个数据组合输出。

Merge : 通过合并它们的发射将多个 Observable 合并为一个。

实战例子

案例1: 从多个数据源加载数据

load feed tabs

  • 优先使用本地缓存渲染UI
  • 同时从网络拉取新的tabs config
  • 网络数据回来后更新本地缓存以及UI, 网络加载可能失败

常规写法:

RxJava化:

利用contact属性优化:

实例2:从多个数据源取数据

share vedios

  • 将封面下载到本地
  • 将视频地址上传到服务端变为短链
  • fallback
    • 封面可以为空,链接不能为空
    • 短链请求失败时,使用长链
    • 重试…

常规写法:

RxJava化:

案例3:多字段验证

结算信息填写

  • 所有字段均合法时提交按钮置为可点击

  • 扩展:选填和必填

懒得写常规写法了

使用RxJava:

下期:RxJava源码分析

敬请期待!

参考资料:

https://juejin.cn/post/6844903456365346823

https://reactivex.io/documentation/operators.html#conditional

https://drive.google.com/file/d/1nMKVqz8pZ8tjfO2qKfAI7jaTvtn_0QXd/view

https://github.com/ReactiveX/RxJava

以上是关于RxJava初探的主要内容,如果未能解决你的问题,请参考以下文章

RxJava源码初探

Rxjava observeOn()和subscribeOn()初探

响应式系统reactive system初探

当Kotlin遇见RxJava多数据源

Android RxJava/RxAndroid结合Retrofit使用

当Kotlin遇见RxJava多数据源