RxJava使用过程中遇到线程相关的坑
Posted 许英俊
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava使用过程中遇到线程相关的坑相关的知识,希望对你有一定的参考价值。
RxJava线程暴增的坑
1、问题
在使用RxJava的时候,对于开发者频繁使用subscribeOn(Schedulers.computation())
或者.subscribeOn(Schedulers.io())
,导致App线程暴涨,在业务繁多的App中,容易导致超过句柄数限制,导致App崩溃
2、原因
RxJava对线程的使用也有缓存策略,其缓存策略是在60s内会复用已有的线程,但如果在60s过多的调用subscribeOn(Schedulers.computation())
或者.subscribeOn(Schedulers.io())
,每调用一次都会创建一个单条线程的线程池去跑当前任务,最终导致线程数暴增
3、案例分析
RxJava提供内置线程池类型
- (常用)Schedulers.io():IO相关的任务
- (常用)Schedulers.computation():CPU相关的任务
- (基本不用)Schedulers.newThread():为每个任务创建一个新线程
- (基本不用)Schedulers.single():只有一条线程的线程池
- (基本不用)Schedulers.trampoline():当其它排队的任务完成后,在当前线程排队开始执行
开发中常用的两种类型,其的弊端在于源码中每次创建一个任务都会创建一个单条线程的线程池去跑当前任务
Scheduler#scheduleDirect()
每次RxJava被订阅后,最终任务的执行,会被调到scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)
final Worker w = createWorker(); //1、每次都会创建新的对象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
ioscheduler#createWorker()
每次任务的执行都会对应的Shceduler
去执行createWorker()
public Worker createWorker()
return new EventLoopWorker(pool.get()); //1、包装新的worker
static final class EventLoopWorker extends Scheduler.Worker
EventLoopWorker(CachedWorkerPool pool)
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get(); //2、从pool取出一条工作worker
static final class CachedWorkerPool implements Runnable
ThreadWorker get()
if (allWorkers.isDisposed())
return SHUTDOWN_THREAD_WORKER;
//RxJava的线程缓存策略
//expiringWorkerQueue根据keepAliveTime(60秒)设置的过期时间,优先从expiringWorkerQueue中取ThreadWorker
while (!expiringWorkerQueue.isEmpty())
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null)
return threadWorker;
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory); //3、创建新的工作线程
allWorkers.add(w);
return w;
static final class ThreadWorker extends NewThreadWorker
ThreadWorker(ThreadFactory threadFactory)
super(threadFactory); //4、继承父类的创建
this.expirationTime = 0L;
public class NewThreadWorker extends Scheduler.Worker implements Disposable
public NewThreadWorker(ThreadFactory threadFactory)
executor = SchedulerPoolFactory.create(threadFactory); //5、父类的创建线程池
public static ScheduledExecutorService create(ThreadFactory factory)
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); //6、创建单条可用线程池
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
结论
可见每次订阅之后的Rxjava都会跑这套流程,其根本原因就是在自身的缓存策略的60s内没有复用线程,导致线程数一直增加
4、解决方案
- 创建新的
IoSchedulerEx
,并在内部实现多条线程的线程池,并单例化 - 替换掉RxJava的
IoScheduler
通过RxJava提供的自定义插件方法,将其IoIoScheduler
换成我们自己的IoSchedulerEx
,那么将会在IoScheduler#createWorker()
开始,将被替换成IoSchedulerEx#createWorker()
//IO线程池
val optIoSchedulerEx = IoSchedulerEx()
RxJavaPlugins.setIoSchedulerHandler optIoSchedulerEx
RxJavaPlugins.setInitIoSchedulerHandler optIoSchedulerEx
//CPU线程池
val optComputationSchedulerEx = ComputationSchedulerEx()
RxJavaPlugins.setComputationSchedulerHandler optComputationSchedulerEx
RxJavaPlugins.setInitComputationSchedulerHandler optComputationSchedulerEx
IoSchedulerEx#createWorker()
@NonNull
@Override
public Worker createWorker()
return new EventLoopWorker(pool.get()); //1、包装新的worker
static final class EventLoopWorker extends Scheduler.Worker
EventLoopWorker(CachedWorkerPool pool)
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get(); //2、从pool取出一条工作worker
ThreadWorker get()
if (allWorkers.isDisposed())
return SHUTDOWN_THREAD_WORKER;
while (!expiringWorkerQueue.isEmpty())
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null)
return threadWorker;
// No cached worker found, so create a new one.
//3、创建新的工作线程,但会将我们已经创建好的线程池传递进去
ThreadWorker w = new ThreadWorker(RxSchedulerThreadCore.getIOScheduleExecutor());
allWorkers.add(w);
return w;
static final class ThreadWorker extends NewThreadWorkerEx
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) //不走原来的创建逻辑,否则又会创建新的单条线程的线程池
super(threadFactory);
this.expirationTime = 0L;
ThreadWorker(ScheduledExecutorService excutorService)
super(excutorService); //4、继承父类的创建,但这里已经悄悄的将线程池作为参数传递进来了
this.expirationTime = 0L;
public class NewThreadWorkerEx extends Scheduler.Worker implements Disposable
public NewThreadWorkerEx(ScheduledExecutorService executorService)
isAssignThreadPool = true;
executor = executorService; //5、将线程池直接拿来用,此时就已经不会再新增线程池了
结论
- 可见每次订阅之后的Rxjava同样也会跑这套流程,但已经开始复用我们创建的线程池,线程数得到线程池的控制
RxSchedulerThreadCore.getIOScheduleExecutor()
此处是整个替换的关键,对于Io
和Computation
都可以取不同的策略
RxJava线程阻塞的坑
1、问题
这个问题是我们最常见的坑,在使用RxJava一些计时的操作符的时候,如timer()
和interval()
计时等,导致事件没有准时回调,比如在页面中设置有3s的心跳轮询,然而,你会发现,有时候心跳隔了7s后才发心跳,导致后台误认为心跳已被停止
2、原因
由于计时是默认在Schedulers.computation()
调用的,会导致线程在使用率太高的情况下,子线程不容易被调度到,导致延误计时结果
3、案例分析
我们通过启用定时器并指定Schedulers.io()
来处理,在map
操作符通过日志去输出线程的名字来验证
Observable.interval(1, 1, TimeUnit.SECONDS)
.map
MLog.info("Hensen", "map=" + Thread.currentThread().name)
.subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread())
.subscribe(
MLog.info("Hensen", "subscribe=" + Thread.currentThread().name)
)
结果发现日志,map是在CPU类型的线程上执行,也就是Schedulers.io()
指定后没有生效,而默认在Schedulers.computation()
I/Hensen: map=RxCPUEx-46
I/Hensen: subscribe=main
通过源码发现,如果不指定是哪个Schedulers
,那么默认在Schedulers.computation()
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
return interval(initialDelay, period, unit, Schedulers.computation());
timer
操作符也是同样的源码
public static Observable<Long> timer(long delay, TimeUnit unit)
return timer(delay, unit, Schedulers.computation());
4、解决办法
- 使用带有
Schedulers
参数的定时器,可以是Schedulers.io()
或者自定义Schedulers
- 将计时器改为Handler计时
以上是关于RxJava使用过程中遇到线程相关的坑的主要内容,如果未能解决你的问题,请参考以下文章