RxJava2 Observable中的异步任务状态&&取消等效项?

Posted

技术标签:

【中文标题】RxJava2 Observable中的异步任务状态&&取消等效项?【英文标题】:Asynctask status && cancel equivalent in RxJava2 Observable? 【发布时间】:2017-10-10 18:02:01 【问题描述】:

我正在尝试学习 RxJava2,并将我的 AsyncTasks 转换为 Observables。

我正在尝试转换以下代码。

if(asyncTask.getStatus() == AsyncTask.Status.RUNNING)
    asyncTask.cancel();

asyncTask = new CustomTask();  
asyncTask.execute(input);

我尝试使用 Disposables 重新创建以下内容。

Disposable currentTask;
PublishSubject input = PublishSubject.create();

对于每个输入

if(currentTask != null) currentTask.dispose();

currentTask = input
    .map(// Network calls
         // returns CustomObject)
    .subscribeOn(Schedulers.io())
    .observeOn(androidSchedulers.mainThread())
    .subscribe(result -> 
               // do work with result
               , throwable -> Log.e(TAG, throwable.toString()));

但是,currentTask 始终为 null。为什么?这是错误的做法吗??

【问题讨论】:

并非所有subscribe(...) 方法都返回Disposable。你能至少暗示你传递给subscribe(...)的参数吗? 您是否试图随时取消currentTask?另外,您可以添加使用PublishSubject input 的任何其他地方吗?我怀疑您可能滥用了该主题,这导致了您的问题。 【参考方案1】:

您正确使用了Disposable,但我只能假设您在某个主题上搞砸了。 rx 中的主题既可以是发布者也可以是订阅者……主题不一定要等到subscribe(...) 才开始发送项目。出于这个原因,我不建议用任何类型的Subject 替换您的AsyncTasks。

您可以获得类似的、更具确定性的行为,但这样做:

Observable<CustomObject> networkObservable =
            Observable.create(emitter ->
                    
                        try 
                            CustomObject object = doNetworking();
                            emitter.onNext(object);
                            emitter.onComplete();
                         catch (Exception e) 
                            emitter.onError(e);
                        
                    
            );

if(currentTask != null) currentTask.dispose();

currentTask = networkObservable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // this next subscribe is similar to AsyncTask.execute() as it starts the stream
        .subscribe(result -> 
            // do work with result
            , throwable -> Log.e(TAG, throwable.toString()));

另外,考虑查看SerialDisposable,您不必进行那些 null/dispose 检查

SerialDisposable serialDisposable = new SerialDisposable();

以原子方式:在此容器上设置下一个可处置对象并处置前一个(如果有)或处置下一个(如果已处置容器)。

Disposable disposable = networkObservable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);

serialDisposable.set(disposable); // this will call dispose on the previous task if it exists

【讨论】:

以上是关于RxJava2 Observable中的异步任务状态&&取消等效项?的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

如何将异步任务/ rx java代码转换为rxjava2?

Rxjava2 Observable的创建详解及实例

Rxjava2 Observable的辅助操作详解及实例

Rxjava2 Observable的辅助操作详解及实例

Rxjava2 Observable的数据过滤详解及实例