如何将异步任务/ rx java代码转换为rxjava2?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将异步任务/ rx java代码转换为rxjava2?相关的知识,希望对你有一定的参考价值。

我尝试将下面的AsyncTask代码转换为Rxjava2,但显然Rxjava2不处理空值,因此我的应用程序崩溃。这是我的AsyncTask代码:

new AsyncTask<Void, Void, Void>() {
            @Override
            protected Void doInBackground(Void... params) {
                Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                for (Map.Entry<String, Participant> entry : entries) {
                    Participant participant = entry.getValue();
                    participant.release();
                }
                return null;
            }

            @Override
            protected void onPostExecute(Void aVoid) {
                cb.event(new Spin.Event<Void>());
            }
        }.execute();

这里是转换为Rxjava(不是Rxjava2)的代码:

 Observable.defer(new Func0<Observable<Void>>() {
        @Override
        public Observable<Void> call() {
            Set<Map.Entry<String, Participant>> entries = pool.entrySet();
            for (Map.Entry<String, Participant> entry : entries) {
                Participant participant = entry.getValue();
                participant.release();
            }
            return Observable.just(null);
        }
    }).doOnCompleted(new Action0() {
        @Override
        public void call() {
            cb.event(new Spin.Event<Void>());
        }
    })
    .subscribeOn(Schedulers.computation())
    .subscribe();

什么是最好的方法将它转换为Rxjava而不会在返回null时崩溃。另外,.execute()如何播放Rxjava2?不确定这是否适用于Rxjava

这是崩溃日志:

FATAL EXCEPTION: RxComputationThreadPool-3

                                                                           io.reactivex.exceptions.OnErrorNotImplementedException: null ObservableSource supplied
                                                                               at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
                                                                               at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
                                                                               at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
                                                                               at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
                                                                               at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:35)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10842)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
                                                                               at java.lang.Thread.run(Thread.java:818)
                                                                            Caused by: java.lang.NullPointerException: null ObservableSource supplied
                                                                               at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
                                                                               at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10842) 
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) 
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
                                                                               at java.lang.Thread.run(Thread.java:818) 
答案
Observable.defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                for (Map.Entry<String, Participant> entry : entries) {
                   Participant participant = entry.getValue();
                   participant.release();
                }
                return Completable.complete().toObservable();
            }
        }).doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.d("Complete", "Complete");
            }
        })
            .subscribeOn(Schedulers.computation())
            .observeOn(androidSchedulers.mainThread()).subscribe();

这段代码也可以。调用subscribe()方法将启动该作业。

另一答案

由于您没有值回发到主线程,您可以使用Completable

Completable.fromAction(() -> {
    Set<Map.Entry<String, Participant>> entries = pool.entrySet();
    for (Map.Entry<String, Participant> entry : entries) {
        Participant participant = entry.getValue();
        participant.release();
    }
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    () -> {
         cb.event(new Spin.Event<Void>());
    }, 
    error -> { /* show error toast */ }
);
另一答案
Observable.defer(new Callable<ObservableSource<?>>() {

 //This method is replacing doInBackground
        @Override
        public ObservableSource<?> call() throws Exception {  
            Set<Map.Entry<String, Participant>> entries = pool.entrySet();
            for (Map.Entry<String, Participant> entry : entries) {
               Participant participant = entry.getValue();
               participant.release();
            }
            return Completable.complete().toObservable();
        }
    }).doOnComplete(new Action() {
     //This is onPostExecute
        @Override 
        public void run() throws Exception {
            Log.d("Complete", "Complete");
        }
    })
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread()).subscribe()

以上是关于如何将异步任务/ rx java代码转换为rxjava2?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2 - 将 Mono 转换为 rx.Observable?

反应式框架(RX)和异步处理事件

C#如何将T的动作转换为T的任务的等待函数

关于C#异步编程的建议

完成“空”异步任务的不同方法

卡在 Rx Observable SelectMany