如何创建从多个数据更新的Flowable?

Posted

技术标签:

【中文标题】如何创建从多个数据更新的Flowable?【英文标题】:How to create a Flowable that updates from multiple data? 【发布时间】:2021-11-01 17:20:12 【问题描述】:

我正在构建一个聊天应用程序,我遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。

A -> 是一个联系人 B -> 是最后一条未读消息 C -> 是消息数

我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息中的任何数据更改,或未读消息或消息计数。我如何使用 RxJava 只更新必要的不阻塞 UI 来做到这一点? (有人告诉我我可以使用 Flowable)。

到目前为止我已经尝试过:

fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap  contacts ->
        Flowable.fromIterable(contacts)
            .flatMapSingle  contact ->
                getLastUnreadMessage(contact.id)
                    .materialize()
                    .zipWith(
                        getUnreadCount(contact.id)
                    )  msg: Notification<Messages>, unreadCount: Int ->
                        Chat(contact, msg.value, unreadCount)
                    
            .toList().toFlowable()
    .subscribeOn(schedulers.io).observeOn(schedulers.main)

在视图模型中

var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())

但它似乎只更新一次,然后不再更新任何数据。

【问题讨论】:

Flowable 与 RoomDB 一起可以在您数据库中的数据更改时“更新”您。您仍然需要观察每个表(联系人、消息、消息计数)。你能分享一些你到目前为止的代码吗? @gioravered 我已经添加了到目前为止我已经完成的一段代码。我花了大约 5 个小时才这样做,但我认为它没有按预期工作 【参考方案1】:

您的代码中的问题是,您只能观察联系人列表的变化,并且每个联系人只能获得一次最后的消息和未读计数。

这应该可行:

  fun queryAllChats() = dao.queryContactsFlowable()
    .switchMap  contacts ->
      val chats = contacts.map  contact ->
        Flowable.combineLatest(
          getUnreadCount(contact.id), // Flowable
          getLastUnreadMessage(contact.id), // Flowable
           unreadCount, unreadMessages ->
            Chat(contact, unreadMessages, unreadCount)
          )
      
      return@switchMap Flowable.combineLatest(chats) 
        it.map  it as Chat 
      
    
    .subscribeOn(Schedulers.io())
    .observeOn(androidSchedulers.mainThread())

【讨论】:

真的很管用!谢谢 有没有办法将排放量过滤为每个 1 并在上行时按属性过滤? 您可以使用zipWithfilter 运算符来实现它。 .zipWith(Flowable.interval(1, TimeUnit.SECONDS)) item, _ -&gt; item .filter /*your code*/ 再次感谢,我想我明白了。现在,每次更新和按时间戳排序时,所有项目都会出现!

以上是关于如何创建从多个数据更新的Flowable?的主要内容,如果未能解决你的问题,请参考以下文章

37Springboot集成Flowable

flowable会签结束,会签变量没有更新。bug

(前后端都开源)Activiti Flowable Camunda json转bpmn 仿钉钉流程设计器 vue2vue3 Ant Design Wflow-web smart-flow-design

如何使用 RxJava 2 改进从 Firebase db 读取的 Flowable<Object> 数据?

Flowable入门系列文章83 - 验证图表并导出为自定义输出格式

Flowable 任务如何认领,回退?