如何使用过滤压缩多个 Flux 流

Posted

技术标签:

【中文标题】如何使用过滤压缩多个 Flux 流【英文标题】:How to zip multiple Flux streams with filtering 【发布时间】:2021-02-08 02:33:43 【问题描述】:

我有 2 个源 Flux 流,它们返回所有关键字和所有字典的流:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Keyword 具有对 Dictionary 对象的引用,如下所示:

public class Keyword 
    private String id;
    private String dictionaryId;

目标是转换后的Flux&lt;DictionaryTO&gt;,其中包含Dictionary 的所有属性以及属于字典的关键字列表

public class DictionaryTO 
    private String id;
    private Collection<KeywordTO> keywords;

public class KeywordTO 
    private String id;

问题是如何在不阻塞任何源流的情况下以反应方式压缩/合并这 2 个 Flux 流。

请注意keywordFlux 包含所有关键字,因此应根据Keyword.dictionaryId 应用一些过滤。

【问题讨论】:

您必须至少缓存其中一个输入才能加入它们。很明显,如果您想要 all 字典的关键字,您将不得不扫描整个关键字源 - 除非这里有更多的结构,您还没有包含在您的问题陈述中。跨度> 感谢您的回复。但即使我在dictionaryFlux.map 中执行keywordFlux.cache()keywordFlux.filter(),我也需要执行阻塞来为特定字典构建关键字列表。哪个不好。 不明白为什么你需要阻止 - 收集一个到 Mono&lt;Map&gt;&gt; 然后你将 flatMap 另一个在。正如我所说,你不能发出一个字典直到您看到所有关键字 - 但您不需要阻止,只需暂停... 【参考方案1】:

按照 boris-the-spider 的建议,我最终使用了 .flatMap().zipWith()

    创建一个Mono&lt;Map&gt;的关键字(按dictionaryId分组)并缓存它,因为以后会被多次使用。 flatMap 字典的Fluxzip 具有上述关键字映射的单个字典。然后将“字典和关键字映射元组”映射到带有关键字的字典。

完整解决方案:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
    .collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
    .cache(); 

Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
    .map(dictionaryTOMapper:map) 
    .flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
    .map(tuple -> 
        Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
        DictionaryTO dic = tuple.getT1();
        dic.setKeywords(keywordsForDic);
        return dic;
    );

【讨论】:

以上是关于如何使用过滤压缩多个 Flux 流的主要内容,如果未能解决你的问题,请参考以下文章

如何连接两个或多个 gzip 文件/流

如何使用 Apache Flume 过滤多个源数据?

使用增强和过滤流解压缩档案

如何在 Flux / ReactJS 中处理多个相同类型的商店?

如何并行进行多个 Spring Webclient 调用并等待结果?

如何让多个 Flux 应用程序说话?