Android 基础面内容——Rxjava与协程(含参考答案)
Posted 初一十五啊
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android 基础面内容——Rxjava与协程(含参考答案)相关的知识,希望对你有一定的参考价值。
Rxjava
Rxjava常用操作符
map()
操作符:用于将流中的每个元素通过一个函数转换为另一个元素。flatMap()
操作符:用于将流中的每个元素通过一个函数转换为多个元素,并将这些元素组合成一个新的流。filter()
操作符:用于过滤流中的元素,只保留符合条件的元素。take()
操作符:用于从流中取前 n 个元素。reduce()
操作符:用于将流中的元素通过一个函数进行累加,得到一个最终结果。scan()
操作符:用于将流中的元素通过一个函数进行累加,得到每一步的中间结果。concat()
操作符:用于将多个流组合成一个新的流。merge()
操作符:用于将多个流合并成一个新的流。zip()
操作符:用于将多个流中的元素按顺序一一组合成一个新的元素,并形成一个新的流。
-debounce() 操作符:用于过滤流中发射过快的元素,只保留一个元素。
map和flatMap有什么区别
map
和flatMap
都可以用来对数据流中的数据进行变换,但它们的实现方式有所不同。map 只进行一次变换,并将变换后的结果发射出去,而 flatMap 则进行多次变换,并将得到的 Observable 合并成一个新的 Observable 发射出去
在源码层面,map 操作符的实现非常简单,它实际上就是在原有的 Observable 上添加了一个新的 MapObservable 观察者,并将变换函数作为参数传递给 MapObservable。在 MapObservable 的 onNext 方法中,会将接收到的元素传递给变换函数进行变换,并将变换后的结果作为新的元素发射出去。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
在源码层面,flatMap 操作符的实现相对比较复杂。它实际上是在原有的 Observable 上添加了一个新的 FlatMapObservable 观察者,并将变换函数作为参数传递给 FlatMapObservable。在 FlatMapObservable 的 onNext 方法中,会将接收到的元素传递给变换函数进行变换,并得到一个新的 Observable。然后,它会将这个新的 Observable 注册到一个 FlatMapSubscriber 中,等待下一次数据的到来。当所有数据都处理完成后,FlatMapObservable 会调用 FlatMapSubscriber 的 onComplete 方法,将所有得到的 Observable 合并成一个新的 Observable,并将它发送给下游的观察者。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
return flatMap(mapper, false, bufferSize(), bufferSize());
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency)
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize)
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable)
ScalarCallable<T> scalarCallable = (ScalarCallable<T>)this;
R r = scalarCallable.call();
if (r == null)
return empty();
return ObservableScalarXMap.scalarXMap(r, mapper);
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
Rxjava1.0和Rxjava2.0有什么区别?
- 改进的异常处理:RxJava 2.0 改进了异常处理机制,使得开发者可以更好地处理异常,避免应用程序崩溃。
- 新的操作符:RxJava 2.0 引入了一些新的操作符,如 Flowable,Single 和 Completable,来取代旧版本的 Observable。这些新的操作符可以更好地处理背压(backpressure)和错误处理。
- 改进的背压支持:RxJava 2.0 引入了更好的背压支持,可以更好地处理在数据源发送大量数据时的情况。
- 改进的线程调度:RxJava 2.0 改进了线程调度机制,使得开发者可以更好地控制并发性。
更好的性能:RxJava 2.0 在性能上也有所提升,可以更好地处理大量数据流。
总的来说,RxJava 2.0 在异常处理、背压支持、线程调度和性能等方面都有所改进和提升
什么是背压?怎么改进的?
背压(Backpressure)是指当数据产生速度大于消费速度,程序处理不过来是消息就会出现堆积。从而导致内存溢出、程序崩溃等问题。这种情况被称为背压问题
逻辑上的改进办法:
- 生产者数量=消费者数量
- 节流,丢弃一部分请求
- 打包,把所有事件封装在一个集合中发送
Rxjava1.x的时候没有对背压的支持,只提供了onBackpressureBuffer(time)、onBackpressureDrop() 等)来缓解背压问题,但这些解决方案都只是对数据流进行了缓存或者丢弃处理
RxJava 2.0后 引入了新的数据类型 Flowable
,它支持背压,并提供了更多的背压控制策略。
Flowable 类型是一个支持背压的数据源,可以通过 onBackpressureBuffer
,onBackpressureDrop
,onBackpressureLatest
等方式来处理背压问题。其中
onBackpressureBuffer
策略会在内存中缓存数据,直到消费者可以消费这些数据;onBackpressureDrop
策略会在数据流中丢弃一部分数据,直到消费者可以消费;onBackpressureLatest
策略会只保留最新的数据,丢弃旧数据。
另外 Flowable
的方式和 Observable
类似,只是Flowable
在使用的时候需要注意要制定背压策略。
subscribeOn
与observeOn
多次执行会怎么样?
结论:subscribeOn
只跟第一次指定的线程有关,执行多次跟最后一次有关。
subscribeOn
只有第一次会生效,所以只跟第一次指定的线程有关。
当我们在一个 Observable
中使用多个 subscribeOn
操作符时,它们的执行顺序只会影响到代码中的顺序,但实际上只有第一个 subscribeOn
会生效。原因是在 ObservableSubscribeOn
类的实现中,只会在第一个 subscribeOn
操作符中调用 scheduler.scheduleDirect
方法,后面的 subscribeOn
操作符调用该方法也会被拦截,也就不会改变 Observable
的执行线程。这就是为什么在同一个 Observable
中使用多个 subscribeOn
操作符时,只有第一个 subscribeOn
会生效的原因。
//Observable.java
@Override
public final void subscribe(Observer<? super T> observer)
...
subscribeActual(observer);
...
//-----------------
// ObservableSubscribeOn.java
@Override
public void subscribeActual(Observer<? super T> observer)
if (once)
source.subscribe(observer);
return;
once = true;
Scheduler scheduler = this.scheduler;
SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent, source)));
observeOn
执行多次跟最后一次有关。
@Override
protected void subscribeActual(Observer<? super T> observer)
Scheduler.Worker worker = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, worker, delayError, bufferSize));
其实比较好理解,subscribeOn理解为一个管道的入口,observeOn 理解为一个管道的出口。数据进去之后就没办法指定了,但是数据出来之前都可以在切换出口
Rxjava是怎么切回到主线程的
使用observeOn(androidSchedulers.mainThread())
,内部的实现其实是new Handler(Looper.getMainLooper())
public class MainThreadScheduler extends Scheduler
private static MainThreadScheduler INSTANCE;
private MainThreadScheduler()
public static MainThreadScheduler instance()
if (INSTANCE == null)
INSTANCE = new MainThreadScheduler();
return INSTANCE;
@NonNull
@Override
public Worker createWorker()
return new MainThreadWorker(new Handler(Looper.getMainLooper()));
private static class MainThreadWorker extends Worker
private final Handler mHandler;
MainThreadWorker(Handler handler)
mHandler = handler;
@NonNull
@Override
public Disposable schedule(@NonNull Runnable runnable)
mHandler.post(runnable);
return Disposables.empty();
@NonNull
@Override
public Disposable schedule(@NonNull Runnable runnable, long delay, @NonNull TimeUnit unit)
mHandler.postDelayed(runnable, unit.toMillis(delay));
return Disposables.empty();
@Override
public void dispose()
@Override
public boolean isDisposed()
return false;
协程
进程、线程、协程的区别
- 进程(Process)是指操作系统中的一个执行单位,它有自己独立的内存空间和资源,可以执行独立的程序,是程序运行的基本单位。一个进程可以包含多个线程。
- 线程(Thread)是进程中的一个执行单元,它共享进程的内存空间和资源,但具有独立的执行序列和运行堆栈。一个进程可以包含多个线程,线程之间可以并发执行,实现多任务处理。
- 协程(Coroutine)是一种用户态的轻量级线程,由程序员自己控制调度,而不是由操作系统控制。协程可以在同一线程中实现并发执行,利用时间片轮转算法切换任务,避免了线程上下文切换带来的开销,可以提高程序的执行效率。
Kotlin 的协程是基于 Kotlin 标准库中的协程框架实现的。该框架基于一种称为“挂起函数”的特殊函数类型实现,这些函数可以暂停执行并在稍后的某个时候恢复执行,从而实现了协程的效果。不依赖于操作系统和编译器。
什么回调地狱以及协程在这方面的处理
回调地狱指的是在异步编程中,如果多次嵌套使用回调函数来处理异步操作,会造成代码的可读性和可维护性变差,代码逻辑难以理解和调试的情况。举个例子
getUserInfo(userId) user ->
getUserOrders(user.id) orders ->
for (order in orders)
getItems(order.id) items ->
for (item in items)
processItem(item) result ->
saveResult(result)
// ...
协程中的挂起函数写法是
suspend fun processOrders(userId: String) = withContext(Dispatchers.IO)
val user = getUserInfo(userId)
val orders = getUserOrders(user.id)
for (order in orders)
val items = getItems(order.id)
for (item in items)
val result = processItem(item)
saveResult(result)
使用
withContext
可以指定协程执行的上下文,这里使用了 IO 线程池,避免了主线程的阻塞。
开发中怎么选择合适的调度器
其实无非就是三个,一个主线程,一个io密集型,一个cpu密集型
rxjava中的调度器
Schedulers.io()
:用于 I/O 密集型任务,比如网络请求等。Schedulers.computation()
:用于 CPU 密集型任务,比如类似视频编解码这种大量的计算和数据处理等。Schedulers.newThread()
:每次都创建一个新线程,不推荐使用。AndroidSchedulers.mainThread()
:用于 Android 平台的 UI 线程。
对应kotlin协程里面的是
1.Dispatchers.Default
:适合执行 CPU 密集型任务的调度器,它会自动根据可用的 CPU 数量进行调度。
2. Dispatchers.IO
:适合执行 I/O 密集型任务的调度器,比如网络请求和磁盘 I/O 等。
3. Dispatchers.Main
:适合在 Android 应用程序中执行 UI 操作的调度器。在 Android 应用程序中,Main 调度器会将协程的执行切换到主线程上。
4. Dispatchers.Unconfined
:一个不受限制的调度器,它允许协程在调用挂起函数的线程中继续执行。使用这个调度器时需要特别小心,因为它可能会导致一些奇怪的行为。
Android 面试题锦(初中高级篇):https://qr18.cn/CKV8OZ
以上是关于Android 基础面内容——Rxjava与协程(含参考答案)的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )
字节内部超全Kotlin学习笔记,快速上手 Kotlin 开发,基础 + 实战 + 源码,手把手带你吃透 Kotlin 语法与协程。