如何根据多个 RxJava Completable 结果的结果执行操作
Posted
技术标签:
【中文标题】如何根据多个 RxJava Completable 结果的结果执行操作【英文标题】:How to perform operation based on result of multiple RxJava Completable results 【发布时间】:2020-12-09 11:04:18 【问题描述】:我已经为此苦恼了一段时间,我在管理一个必须在 Kotlin 中使用 Rx 的需求时迷失了方向。
让我解释一下。
有一组 ids 其等效项需要从服务器中删除,并根据服务器成功最终在本地。
基本上
-
进行网络调用以删除单个
id
(支持的网络调用返回Completable
)
如果收到complete
(成功)回调,则将id
存储在list
(内存)中
对所有 id
执行第一步和第二步删除
每次网络调用完成后,传递列表以从本地数据库中删除
所以这些功能是可用的,不能修改。
fun deleteId(id: String): Completable networkCall.deleteId(id)
fun deleteIds(ids: List<String>): Unit localDb.deleteId(ids)
这是我尝试过的,但显然不完整并且卡住了......
val deleted = CopyOnWriteArrayList<String>()
val error = CopyOnWriteArrayList<String>()
items?.filter it.isChecked
?.map Pair(it.id, dataManager.deleteId(it.id))
?.forEach (Id, deleteOp) ->
deleteOp.subscribeOn(Schedulers.io())
.subscribe(object: CompletableObserver
override fun onComplete() deleted.add(Id)
override fun onSubscribe(d: Disposable) disposableManager += d
override fun onError(e: Throwable) error.add(Id)
)
所以现在这里有多个问题,其中一个是我无法找到一个地方知道所有请求都已完成以启动localDb删除的要求。
有没有一种方法可以让我使用Flowable.fromIterable()
或zip
或merge
以某种方式遵循上述命令链来实现上述场景?
【问题讨论】:
【参考方案1】:如果我正确理解了您的用例,那么应该这样做:
// ids of items to delete, for illustration lets have some temp set
val ids = setOf<String>("1", "2", "3", "4")
val deleteIdSingles = mutableListOf<Single<String>>()
ids.forEach id ->
deleteIdSingles.add(
api.deleteId(id)
// when request successfully completes, return its id wrapped in a Single, instead of Completable
.toSingle<String> id
// return a flag when this request fails, so that the stream is not closed and other requests would still be executed
.onErrorReturn "FAILED"
)
Single.merge(deleteIdSingles)
// collect the results of the singles (i.e. the ids of successful deletes), and emit a set of those ids once all the singles has completed
.collect(
mutableListOf() ,
deletedIds: MutableList<String>, id: String -> if (id != "FAILED") deletedIds.add(id)
)
.observeOn(Schedulers.io())
.subscribe(
deletedIds ->
db.deleteIds(deletedIds)
, error ->
// todo: onError
)
【讨论】:
以上是关于如何根据多个 RxJava Completable 结果的结果执行操作的主要内容,如果未能解决你的问题,请参考以下文章
使用 Completable.timer 对 RxJava 进行单元测试