如何使用过滤压缩多个 Flux 流
Posted
技术标签:
【中文标题】如何使用过滤压缩多个 Flux 流【英文标题】:How to zip multiple Flux streams with filtering 【发布时间】:2021-02-08 02:33:43 【问题描述】:我有 2 个源 Flux 流,它们返回所有关键字和所有字典的流:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Keyword
具有对 Dictionary
对象的引用,如下所示:
public class Keyword
private String id;
private String dictionaryId;
目标是转换后的Flux<DictionaryTO>
,其中包含Dictionary
的所有属性以及属于字典的关键字列表:
public class DictionaryTO
private String id;
private Collection<KeywordTO> keywords;
public class KeywordTO
private String id;
问题是如何在不阻塞任何源流的情况下以反应方式压缩/合并这 2 个 Flux 流。
请注意keywordFlux
包含所有关键字,因此应根据Keyword.dictionaryId
应用一些过滤。
【问题讨论】:
您必须至少缓存其中一个输入才能加入它们。很明显,如果您想要 all 字典的关键字,您将不得不扫描整个关键字源 - 除非这里有更多的结构,您还没有包含在您的问题陈述中。跨度> 感谢您的回复。但即使我在dictionaryFlux.map
中执行keywordFlux.cache()
和keywordFlux.filter()
,我也需要执行阻塞来为特定字典构建关键字列表。哪个不好。
不明白为什么你需要阻止 - 收集一个到 Mono<Map>>
然后你将 flatMap
另一个在。正如我所说,你不能发出一个字典直到您看到所有关键字 - 但您不需要阻止,只需暂停...
【参考方案1】:
按照 boris-the-spider 的建议,我最终使用了 .flatMap()
和 .zipWith()
。
-
创建一个
Mono<Map>
的关键字(按dictionaryId
分组)并缓存它,因为以后会被多次使用。
flatMap
字典的Flux
和zip
具有上述关键字映射的单个字典。然后将“字典和关键字映射元组”映射到带有关键字的字典。
完整解决方案:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
.collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
.cache();
Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
.map(dictionaryTOMapper:map)
.flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
.map(tuple ->
Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
DictionaryTO dic = tuple.getT1();
dic.setKeywords(keywordsForDic);
return dic;
);
【讨论】:
以上是关于如何使用过滤压缩多个 Flux 流的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Flux / ReactJS 中处理多个相同类型的商店?