如何使用 RxJava 对来自异步源的数据进行分组

Posted

技术标签:

【中文标题】如何使用 RxJava 对来自异步源的数据进行分组【英文标题】:How to group data from async sources using RxJava 【发布时间】:2021-09-02 14:41:33 【问题描述】:

我正在开发一个交易应用程序。当用户选择一些产品时,我需要为每个产品显示市场是否开放及其最新价格。用户可以选择例如 2 个产品,我要做的是仅在我拥有 2 个产品的所有信息时才显示数据。数据可以随时更改(即其中一种产品的市场关闭)。这是我的代码:

data class ProductInfo(
    val productCode: String,
    val isMarketOpen: Boolean,
    val latestPrice: BigDecimal,
)

// This observable could emit at any time due to user interaction with the UI
private fun productsCodeObservable(): Observable<List<String>> = Observable.just(listOf("ProductA", "ProductB"))

// Markets have different working hours
private fun isMarketOpenObservable(productCode: String): Observable<Boolean> 
    return Observable.interval(2, TimeUnit.SECONDS)
            .map 
                // TODO: Use API to determine if the market is open for productCode
                it.toInt() % 2 == 0
            


// The product price fluctuates so this emits every X seconds
private fun latestPriceObservable(productCode: String): Observable<BigDecimal> 
    return Observable.interval(2, TimeUnit.SECONDS)
            .map  productPrice -> productPrice.toBigDecimal() 


@Test
fun `test`() 

    val countDownLatch = CountDownLatch(1)

    productsCodeObservable()
            .switchMap  Observable.fromIterable(it) 
            .flatMap  productCode ->
                Observable.combineLatest(
                        isMarketOpenObservable(productCode),
                        latestPriceObservable(productCode)
                )  isMarketOpen, latestPrice ->
                    ProductInfo(productCode, isMarketOpen, latestPrice)
                
            
            .toList()
            .doOnSuccess  productsInfo ->
                println(productsInfo)
            
            .subscribe()

    countDownLatch.await()

我不知道问题出在哪里,因为测试方法从不打印任何内容。我对 RxJava 了解不多,但我的理解是 toList 不起作用,因为源 observables 永远不会完成。关于如何收集产品代码的数据并在任何数据更改时发出列表的任何想法? :)

【问题讨论】:

你是对的,为什么 toList 在这里不起作用。 【参考方案1】:

如果您想在每次这些产品发生变化时收到新的产品信息列表:

productsCodeObservable()
    .switchMap  list ->
        val productInfoObservables = list.map  productCode ->
            Observable.combineLatest(
                isMarketOpenObservable(productCode),
                latestPriceObservable(productCode)
            )  isMarketOpen, latestPrice ->
                ProductInfo(productCode, isMarketOpen, latestPrice)
            
        
        Observable.combineLatest(productInfoObservables)  it.toList() as List<ProductInfo> 
    
    .doOnNext  productsInfoList ->
        println(productsInfoList)
    
    .subscribe()

【讨论】:

谢谢你,这就像一个魅力????【参考方案2】:

使用 RxJava 的 Emitter 接口并实现其方法:

public interface Emitter<T> 
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();

我认为您需要一个 ObservableEmitter,请查看以下页面: https://www.raywenderlich.com/2071847-reactive-programming-with-rxandroid-in-kotlin-an-introduction

【讨论】:

错了。 Emitter 丝毫没有解决 OP 的问题,也没有理由在库外实现。

以上是关于如何使用 RxJava 对来自异步源的数据进行分组的主要内容,如果未能解决你的问题,请参考以下文章

如何对来自不同表MySQL的两列求和

如何使用 rxjava/retrofit 正确等待/观察来自服务器的传入数据

RxJava异步请求加载状态控制

Mongodb对来自聚合方面的两个相似数据数组进行分组和求和

如何在 StarUML 中对箭头进行分组/概括?

按 JSON 数组对 SQL 数据进行分组