如何通过 Flux 使用不同的调度程序运行两个任务

Posted

技术标签:

【中文标题】如何通过 Flux 使用不同的调度程序运行两个任务【英文标题】:How to run two tasks with different Schedulers via Flux 【发布时间】:2020-04-07 00:55:24 【问题描述】:

我尝试使用两种策略(串行和并行)存储、解析和存储一些原始数据

    Flux<PanasonicData> f = Flux.create(sink -> dataRepo.addConsumer(sink::next));
    Flux.from(f).publishOn(Schedulers.single()).subscribe(this::save1);
    Flux.from(f).publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

或者

    ConnectableFlux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish();
    cf.autoConnect().publishOn(Schedulers.single()).subscribe(this::save1);
    cf.autoConnect().publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

但是第二个任务从来没有运行过!!! 我如何使用这两种不同的策略来运行这两个任务?

【问题讨论】:

我找到了原因:其中一个订阅者在执行过程中被挂起。 【参考方案1】:

您可以通过autoConnect(int minSubscribers)指定最小订阅人数:

Flux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish().autoConnect(2);
cf.publishOn(Schedulers.single()).subscribe(this::save1);
cf.publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

【讨论】:

感谢您的回答,但我找到了原因:其中一个订阅者在执行期间被暂停。 好的,你找到了解决方案真是太好了!

以上是关于如何通过 Flux 使用不同的调度程序运行两个任务的主要内容,如果未能解决你的问题,请参考以下文章

如何等待从另一家商店在一家商店发生的调度

在 Flux/React 中调度级联/依赖异步请求

如何使用 Messagebox 在 Windows 7 任务调度程序中启动 VBS 脚本?

React Native + Flux,无响应的调度程序

React Flux - 在调度中调度 - 如何避免?

如何使用 Laravel 在 Windows 10 中运行任务调度程序