如何通过 Flux 使用不同的调度程序运行两个任务
Posted
技术标签:
【中文标题】如何通过 Flux 使用不同的调度程序运行两个任务【英文标题】:How to run two tasks with different Schedulers via Flux 【发布时间】:2020-04-07 00:55:24 【问题描述】:我尝试使用两种策略(串行和并行)存储、解析和存储一些原始数据
Flux<PanasonicData> f = Flux.create(sink -> dataRepo.addConsumer(sink::next));
Flux.from(f).publishOn(Schedulers.single()).subscribe(this::save1);
Flux.from(f).publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);
或者
ConnectableFlux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish();
cf.autoConnect().publishOn(Schedulers.single()).subscribe(this::save1);
cf.autoConnect().publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);
但是第二个任务从来没有运行过!!! 我如何使用这两种不同的策略来运行这两个任务?
【问题讨论】:
我找到了原因:其中一个订阅者在执行过程中被挂起。 【参考方案1】:您可以通过autoConnect(int minSubscribers)
指定最小订阅人数:
Flux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish().autoConnect(2);
cf.publishOn(Schedulers.single()).subscribe(this::save1);
cf.publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);
【讨论】:
感谢您的回答,但我找到了原因:其中一个订阅者在执行期间被暂停。 好的,你找到了解决方案真是太好了!以上是关于如何通过 Flux 使用不同的调度程序运行两个任务的主要内容,如果未能解决你的问题,请参考以下文章