如何创建从多个数据更新的Flowable?
Posted
技术标签:
【中文标题】如何创建从多个数据更新的Flowable?【英文标题】:How to create a Flowable that updates from multiple data? 【发布时间】:2021-11-01 17:20:12 【问题描述】:我正在构建一个聊天应用程序,我遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。
A -> 是一个联系人 B -> 是最后一条未读消息 C -> 是消息数
我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息中的任何数据更改,或未读消息或消息计数。我如何使用 RxJava 只更新必要的不阻塞 UI 来做到这一点? (有人告诉我我可以使用 Flowable)。
到目前为止我已经尝试过:
fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap contacts ->
Flowable.fromIterable(contacts)
.flatMapSingle contact ->
getLastUnreadMessage(contact.id)
.materialize()
.zipWith(
getUnreadCount(contact.id)
) msg: Notification<Messages>, unreadCount: Int ->
Chat(contact, msg.value, unreadCount)
.toList().toFlowable()
.subscribeOn(schedulers.io).observeOn(schedulers.main)
在视图模型中
var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())
但它似乎只更新一次,然后不再更新任何数据。
【问题讨论】:
Flowable 与 RoomDB 一起可以在您数据库中的数据更改时“更新”您。您仍然需要观察每个表(联系人、消息、消息计数)。你能分享一些你到目前为止的代码吗? @gioravered 我已经添加了到目前为止我已经完成的一段代码。我花了大约 5 个小时才这样做,但我认为它没有按预期工作 【参考方案1】:您的代码中的问题是,您只能观察联系人列表的变化,并且每个联系人只能获得一次最后的消息和未读计数。
这应该可行:
fun queryAllChats() = dao.queryContactsFlowable()
.switchMap contacts ->
val chats = contacts.map contact ->
Flowable.combineLatest(
getUnreadCount(contact.id), // Flowable
getLastUnreadMessage(contact.id), // Flowable
unreadCount, unreadMessages ->
Chat(contact, unreadMessages, unreadCount)
)
return@switchMap Flowable.combineLatest(chats)
it.map it as Chat
.subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread())
【讨论】:
真的很管用!谢谢 有没有办法将排放量过滤为每个 1 并在上行时按属性过滤? 您可以使用zipWith
和filter
运算符来实现它。 .zipWith(Flowable.interval(1, TimeUnit.SECONDS)) item, _ -> item .filter /*your code*/
再次感谢,我想我明白了。现在,每次更新和按时间戳排序时,所有项目都会出现!以上是关于如何创建从多个数据更新的Flowable?的主要内容,如果未能解决你的问题,请参考以下文章
(前后端都开源)Activiti Flowable Camunda json转bpmn 仿钉钉流程设计器 vue2vue3 Ant Design Wflow-web smart-flow-design
如何使用 RxJava 2 改进从 Firebase db 读取的 Flowable<Object> 数据?