如何根据多个 RxJava Completable 结果的结果执行操作

Posted

技术标签:

【中文标题】如何根据多个 RxJava Completable 结果的结果执行操作【英文标题】:How to perform operation based on result of multiple RxJava Completable results 【发布时间】:2020-12-09 11:04:18 【问题描述】:

我已经为此苦恼了一段时间,我在管理一个必须在 Kotlin 中使用 Rx 的需求时迷失了方向。

让我解释一下。

有一组 ids 其等效项需要从服务器中删除,并根据服务器成功最终在本地。

基本上

    进行网络调用以删除单个id(支持的网络调用返回Completable) 如果收到complete(成功)回调,则将id 存储在list(内存)中 对所有 id 执行第一步和第二步删除 每次网络调用完成后,传递列表以从本地数据库中删除

所以这些功能是可用的,不能修改。

    fun deleteId(id: String): Completable networkCall.deleteId(id) fun deleteIds(ids: List<String>): Unit localDb.deleteId(ids)

这是我尝试过的,但显然不完整并且卡住了......

val deleted = CopyOnWriteArrayList<String>()
val error = CopyOnWriteArrayList<String>()
items?.filter  it.isChecked 
    ?.map  Pair(it.id, dataManager.deleteId(it.id)) 
    ?.forEach  (Id, deleteOp) ->
        deleteOp.subscribeOn(Schedulers.io())
                .subscribe(object: CompletableObserver 
                    override fun onComplete()  deleted.add(Id) 

                    override fun onSubscribe(d: Disposable)  disposableManager += d 

                    override fun onError(e: Throwable)  error.add(Id) 

                )
    

所以现在这里有多个问题,其中一个是我无法找到一个地方知道所有请求都已完成以启动localDb删除的要求。

有没有一种方法可以让我使用Flowable.fromIterable()zipmerge 以某种方式遵循上述命令链来实现上述场景?

【问题讨论】:

【参考方案1】:

如果我正确理解了您的用例,那么应该这样做:

// ids of items to delete, for illustration lets have some temp set
val ids = setOf<String>("1", "2", "3", "4")
val deleteIdSingles = mutableListOf<Single<String>>()
ids.forEach  id ->
    deleteIdSingles.add(
        api.deleteId(id)
            // when request successfully completes, return its id wrapped in a Single, instead of Completable
            .toSingle<String>  id 
            // return a flag when this request fails, so that the stream is not closed and other requests would still be executed
            .onErrorReturn  "FAILED" 
    )


Single.merge(deleteIdSingles)
    // collect the results of the singles (i.e. the ids of successful deletes), and emit a set of those ids once all the singles has completed
    .collect(
         mutableListOf() ,
         deletedIds: MutableList<String>, id: String -> if (id != "FAILED") deletedIds.add(id) 
    )
    .observeOn(Schedulers.io())
    .subscribe(
         deletedIds ->
                db.deleteIds(deletedIds)
        ,  error ->
            // todo: onError
        )

【讨论】:

以上是关于如何根据多个 RxJava Completable 结果的结果执行操作的主要内容,如果未能解决你的问题,请参考以下文章

RXJAVA-Completable

RXJAVA-Completable

使用 Completable.timer 对 RxJava 进行单元测试

Android :RxJava学习笔记之SingleCompletable以及Maybe

RXJAVA-Maybe

RXJAVA-Maybe