结合多个条件 Observable 来准备数据

Posted

技术标签:

【中文标题】结合多个条件 Observable 来准备数据【英文标题】:Combining multiple conditional Observables for preparing the data 【发布时间】:2021-02-06 13:09:18 【问题描述】:

我有两个stores 需要准备合并数据

    获取所有部门(前 50 个部门) 获取所有用户(例如:1000 用户)

我想合并数据并准备最终数据。 注意:部门会要求加载所有用户(这可能需要一些时间),并且不知不觉地,users 不能自己做任何事情,因为他们需要 department 数据来准备最终结果。

这里是department

      this.store.select(getUserInfo).subscribe((info) => 
        this.store.select(getAllDepts).subscribe((depts) => 
          if (depts.length < 1) 
            this.store.dispatch(loadDeptStore( accountId: info.acntId));
           else 
            console.log(depts);  // get Dept data
          
        );
      )

对于Users 我拥有的数据:

      this.store
        .select(getUserInfo)
        .pipe(
          flatMap((info) => 
            this.acntName = info.acntId;
            return this.store.select(getAllUsers,  accountId: info.acntId );
          )
        )
        .subscribe((res) => 
          if (res.length < 1 ) 
            this.store.dispatch(loadUsersInAccountStore( billingCrmAccountId: this.acntName ));
           else 
            console.log(res); // get users data
          
        )

我应该使用combineLates 或其他东西吗,如果是,我应该如何使用 RxJS 功能以反应的方式做到这一点。

【问题讨论】:

【参考方案1】:

首先,不要在订阅中进行订阅,这是一种反模式。始终使用更高的可观察量。我不知道代码是如何工作的,因为我不使用 ngrx,但你可以像这样构建你的 observable:

    const userInfo$ = this.store.select(getUserInfo);
    userInfo$.pipe(
        mergeMap(info => 
            this.acntName = info.acntId;
            const departments$ = this.store.select(getAllDepts)
                .pipe(
                    map(depts => 
                        if (depts.length < 1) 
                            return this.store.dispatch(loadDeptStore( accountId: info.acntId));
                         else 
                            return depts;  // get Dept data
                        
                    )
                );
            const users$ = this.store.select(getAllUsers,  accountId: info.acntId )
                .pipe(
                    map(res => 
                        if (res.length < 1 )                           
                            return this.store.dispatch(loadUsersInAccountStore(billingCrmAccountId: this.acntName ));
                         else 
                            return res; // get users data
                        
                    )
                );
            return combineLatest([departments$, users$]); 
        ),
        map(([depts, users]) => 
            // do stuffs and return something
        )
    ).subscribe(res => console.log('results: ', res));
        

当你的两个 observables 都发出一次时,CombineLatest 就会发出。每次有一个可观察对象发射时,它都会再次发射。 它可能不适用于您当前的实现,但您会了解如何组合来自一个来源的两个可观察对象。

【讨论】:

我的直觉是使用forkJoin()。因为你知道源 observables 已经完成,所以它的语义更严格——这意味着如果你做错了其他事情,你会提前失败,而不是造成内存泄漏,以后很难找到。 当你期望它发射时它会以另一种方式发生,但它不会因为你的一个 observable 没有完成但你不明白为什么;)。无论如何,您应该始终取消订阅,并且可以使用 take(1) 来完成您的流。只要您知道自己在做什么,两者都会导致相同的最终结果。 如果您的某个来源没有完成并且您不明白原因,那么您的应用程序将会失败。在解决问题之前,您不会得到任何结果。 :) 使用combineLatest,您可能有一个运行良好但实际上正在泄漏内存的程序。你会得到越来越多的活跃的 observables,这些 observables 没有完成,只是占用了 RAM。您可能有 1000 行代码,并且可能很难找出导致内存泄漏的原因。但是,是的,take(1) 也可以! 你可能不希望你的 observable 立即完成。如果您立即关闭流,如何获得更新的值?此时只需使用承诺。这取决于您作为开发人员需要什么。无论如何,留下一个可观察的开放是愚蠢的。总是退订。 如果你组合只发射一次的源,那么 forkJoin() 不再是你最好的选择。这是关于使用最好的工具来完成这项工作。 Promise 并不懒惰,也没有任何一流的方法来取消它们。另外,在编写不同的源代码时,在 Obsverables 和 Promises 之间切换会使代码复杂化。此外,组合也是您可能无法提前知道何时/如何取消订阅流的原因。一旦你知道(通过takeUntilforkJoin 等正确的运算符)包括该信息,就会为你的应用程序添加语义。

以上是关于结合多个条件 Observable 来准备数据的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift 结合 observable 和条件

Observable类API

RxJava系列5(组合操作符)

从 txt 文件中读取数据并将其添加到 Observable 集合中

将 lapply 与 for 和 if..else 语句结合使用,将条件列添加到多个数据帧

Android RxJava使用介绍 RxJava的操作符