RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换)
Posted 0 and 1
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换)相关的知识,希望对你有一定的参考价值。
前言
已经使用rxjava两个月了,觉得rxjava特别好用,爱不释手。本文目的是通过几百行的代码,帮助大家理解rxjava中的链式调用,操作符,线程切换是如何实现的。如果有写的不对的地方欢迎在评论区留言,如果觉得写的可以,请点赞,关注,谢谢。
代码链接: github
目录:
RxJava编程思想1-(实现简易版Rxjava,如何基本功能和链式调用?)
RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)
RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换?)
如何做线程切换?
如何实现?其实就是对被观察者的数据处理过程进行装饰,设计思想跟操作符一样,使用装饰者模式,对被被观察者的数据处理进行线程装饰。
操作起来:
先想一下,线程切换的几种场景:指定新线程,指定io线程,指定android主线程等。
定义:抽象,约定线程的公共行为
public abstract class Scheduler
public abstract void scheduleDirect(Runnable runnable);
新线程实现:NewThreadScheduler
public class NewThreadScheduler extends Scheduler
private static final String TAG = "NewThreadScheduler";
public static NewThreadScheduler getInstance()
return NewThreadSchedulerHolder.INSTANCE;
private static class NewThreadSchedulerHolder
private static NewThreadScheduler INSTANCE = new NewThreadScheduler();
@Override
public void scheduleDirect(Runnable runnable)
executorService().execute(runnable);
private static ExecutorService executorService;
private static synchronized ExecutorService executorService()
if (executorService == null)
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory
(false));
return executorService;
private static ThreadFactory threadFactory(final boolean daemon)
final AtomicInteger mCount = new AtomicInteger(1);
return new ThreadFactory()
@Override
public Thread newThread(Runnable runnable)
Thread result = new Thread(runnable, "RxJava New Thread #" + mCount.getAndIncrement());
result.setDaemon(daemon);
return result;
;
Android主线程实现AndroidSchedulers
public class AndroidSchedulers extends Scheduler
private static final String TAG = "AndroidSchedulers";
private final Handler handler = new Handler(Looper.getMainLooper());
public static AndroidSchedulers getInstance()
return AndroidSchedulersHolder.INSTANCE;
private static class AndroidSchedulersHolder
private static AndroidSchedulers INSTANCE = new AndroidSchedulers();
@Override
public void scheduleDirect(Runnable runnable)
Message message = Message.obtain(handler, runnable);
message.obj = this;
handler.sendMessage(message);
提供方便用户使用的静态实例对象
public final class Schedulers
public static final Scheduler IO = ioscheduler.getInstance();
public static final Scheduler NEW_THREAD = NewThreadScheduler.getInstance();
public static final Scheduler ANDROID_MAIN_THREAD = AndroidSchedulers.getInstance();
装饰指定线程工具做好了。下面直接使用就行了。
装饰者模式走起。。。。。。。。。。。。
实际的装饰者对象ObservableSubscribeOn :指定被观察者发射数据的线程
public class ObservableSubscribeOn<T> extends Observable<T>
final Scheduler scheduler;
final ObservableSource<T> source;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler)
this.source = source;
this.scheduler = scheduler;
public final ObservableSource<T> source()
return source;
@Override
public void subscribeActual(final Observer<? super T> s)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
scheduler.scheduleDirect(new Runnable()
@Override
public void run()
RLog.printInfo("我在这里切换");
source.subscribe(parent);
);
static final class SubscribeOnObserver<T> implements Observer<T>
final Observer<? super T> actual;
SubscribeOnObserver(Observer<? super T> actual)
this.actual = actual;
@Override
public void onSubscribe()
actual.onSubscribe();
@Override
public void onNext(T t)
CheckUtils.checkNotNull(t, "onNext called parameter can not be null");
actual.onNext(t);
@Override
public void onError(Throwable error)
actual.onError(error);
@Override
public void onComplete()
actual.onComplete();
实际的装饰者对象ObservableObserveOn :指定观察者接收数据的线程
public class ObservableObserveOn<T> extends Observable<T>
final Scheduler scheduler;
final ObservableSource<T> source;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler)
this.source = source;
this.scheduler = scheduler;
public final ObservableSource<T> source()
return source;
@Override
public void subscribeActual(final Observer<? super T> s)
final ObserveOnObserver<T> parent = new ObserveOnObserver<T>(s, scheduler);
source.subscribe(parent);
static final class ObserveOnObserver<T> implements Observer<T>
final Observer<? super T> actual;
final Scheduler scheduler;
public ObserveOnObserver(Observer<? super T> actual, Scheduler scheduler)
this.actual = actual;
this.scheduler = scheduler;
@Override
public void onSubscribe()
scheduler.scheduleDirect(new Runnable()
@Override
public void run()
actual.onSubscribe();
);
@Override
public void onNext(final T t)
scheduler.scheduleDirect(new Runnable()
@Override
public void run()
CheckUtils.checkNotNull(t, "onNext called parameter can not be null");
actual.onNext(t);
);
@Override
public void onError(final Throwable error)
scheduler.scheduleDirect(new Runnable()
@Override
public void run()
actual.onError(error);
);
@Override
public void onComplete()
scheduler.scheduleDirect(new Runnable()
@Override
public void run()
actual.onComplete();
);
同样为了方便用户使用 使用和链式调用,在装饰者组件Observable中 加入工具方法
public final Observable<T> subscribeOn(Scheduler scheduler)
return new ObservableSubscribeOn<T>(this, scheduler);
public final Observable<T> observeOn(Scheduler scheduler)
return new ObservableObserveOn<T>(this, scheduler);
现在就可以使用了:
Observable.create(new ObservableOnSubscribe<Integer>()
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception
RLog.printInfo("emitter发送第一个onNext,value = 1");
emitter.onNext(1);
RLog.printInfo("emitter发送onComplete");
emitter.onComplete();
).subscribeOn(Schedulers.NEW_THREAD).subscribeOn(Schedulers.IO).subscribeOn(Schedulers.NEW_THREAD).subscribeOn(Schedulers.IO).observeOn
(Schedulers.IO).map(new Function<Integer, String>()
@Override
public String apply(Integer integer) throws Exception
RLog.printInfo("切换线程");
return "切换线程" + integer;
).observeOn(Schedulers.ANDROID_MAIN_THREAD).subscribe(new Observer<String>()
@Override
public void onSubscribe()
RLog.printInfo("Observer被订阅");
@Override
public void onNext(String value)
RLog.printInfo("Observer接收到onNext,被转换之后的value = " + value);
@Override
public void onError(Throwable e)
RLog.printInfo("Observer接收到onError,errorMsg = " + e.getMessage());
@Override
public void onComplete()
RLog.printInfo("Observer接收到onComplete");
);
这里只是非常简单的带领大家理解rxjava主体的设计思想。为了方便理解,代码也尽可能简化到了极点。真正的rxjava源码做了很多优化,比如,数据流缓冲(背压)等。到此,大家应该对Rxjava的设计思想有了很好的理解了。其它操作符的架构设计思想是一样的,只是实现细节不同,相信看到这里在去看看rxjava源码应该很轻松就可以理解了。
以上是关于RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换)的主要内容,如果未能解决你的问题,请参考以下文章
RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)
RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)