如何将异步任务/ rx java代码转换为rxjava2?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将异步任务/ rx java代码转换为rxjava2?相关的知识,希望对你有一定的参考价值。
我尝试将下面的AsyncTask
代码转换为Rxjava2
,但显然Rxjava2
不处理空值,因此我的应用程序崩溃。这是我的AsyncTask
代码:
new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
Set<Map.Entry<String, Participant>> entries = pool.entrySet();
for (Map.Entry<String, Participant> entry : entries) {
Participant participant = entry.getValue();
participant.release();
}
return null;
}
@Override
protected void onPostExecute(Void aVoid) {
cb.event(new Spin.Event<Void>());
}
}.execute();
这里是转换为Rxjava
(不是Rxjava2
)的代码:
Observable.defer(new Func0<Observable<Void>>() {
@Override
public Observable<Void> call() {
Set<Map.Entry<String, Participant>> entries = pool.entrySet();
for (Map.Entry<String, Participant> entry : entries) {
Participant participant = entry.getValue();
participant.release();
}
return Observable.just(null);
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
cb.event(new Spin.Event<Void>());
}
})
.subscribeOn(Schedulers.computation())
.subscribe();
什么是最好的方法将它转换为Rxjava
而不会在返回null时崩溃。另外,.execute()如何播放Rxjava2
?不确定这是否适用于Rxjava
?
这是崩溃日志:
FATAL EXCEPTION: RxComputationThreadPool-3
io.reactivex.exceptions.OnErrorNotImplementedException: null ObservableSource supplied
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
Caused by: java.lang.NullPointerException: null ObservableSource supplied
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
答案
Observable.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
Set<Map.Entry<String, Participant>> entries = pool.entrySet();
for (Map.Entry<String, Participant> entry : entries) {
Participant participant = entry.getValue();
participant.release();
}
return Completable.complete().toObservable();
}
}).doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d("Complete", "Complete");
}
})
.subscribeOn(Schedulers.computation())
.observeOn(androidSchedulers.mainThread()).subscribe();
这段代码也可以。调用subscribe()方法将启动该作业。
另一答案
由于您没有值回发到主线程,您可以使用Completable
:
Completable.fromAction(() -> {
Set<Map.Entry<String, Participant>> entries = pool.entrySet();
for (Map.Entry<String, Participant> entry : entries) {
Participant participant = entry.getValue();
participant.release();
}
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
() -> {
cb.event(new Spin.Event<Void>());
},
error -> { /* show error toast */ }
);
另一答案
Observable.defer(new Callable<ObservableSource<?>>() {
//This method is replacing doInBackground
@Override
public ObservableSource<?> call() throws Exception {
Set<Map.Entry<String, Participant>> entries = pool.entrySet();
for (Map.Entry<String, Participant> entry : entries) {
Participant participant = entry.getValue();
participant.release();
}
return Completable.complete().toObservable();
}
}).doOnComplete(new Action() {
//This is onPostExecute
@Override
public void run() throws Exception {
Log.d("Complete", "Complete");
}
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread()).subscribe()
以上是关于如何将异步任务/ rx java代码转换为rxjava2?的主要内容,如果未能解决你的问题,请参考以下文章