关于 Project Reactor 的 flatMap 中的线程的困惑
Posted
技术标签:
【中文标题】关于 Project Reactor 的 flatMap 中的线程的困惑【英文标题】:Confusion about threads in Project Reactor's flatMap 【发布时间】:2019-12-18 10:13:54 【问题描述】:我正在使用 Project Reactor 和响应式 MongoDB 存储库。我有以下代码:
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person
@Id
Integer id;
String name;
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer>
和主@SpringBootApplication
类:
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication
private final ReactivePersonRepository reactivePersonRepository;
public static void main(String[] args)
SpringApplication.run(ReactiveDatabaseApplication.class, args);
@PostConstruct
public void postConstruct()
Scheduler single = Schedulers.newSingle("single-scheduler");
IntStream.range(0, 10).forEach(i ->
Flux.just(Person.builder()
.id(i)
.name("PersonName")
.build())
.flatMap(personToSave ->
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.save(personToSave);
)
//.publishOn(single)
.flatMap(savedPerson ->
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.findById(savedPerson.getId());
)
//.publishOn(single)
.flatMap(foundPerson ->
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.deleteById(foundPerson.getId());
)
//.publishOn(single)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName()))));
Flux::subscribeOn
方法描述说:
因此,将此运算符放在链中的任何位置也会影响 onNext/onError/onComplete 信号的执行 * 上下文来自 链的开头直到 * 下一次出现的 @link publishOn(调度程序) publishOn
这让我有点困惑,因为当我没有在处理链中指定任何 publishOn
时,线程名称的打印值为:
从线程 single-scheduler-1 中保存人员 - 正如预期的那样
从线程 Thread-13 找人
从线程 Thread-6 找人
从线程 Thread-15 找人
从线程 Thread-6 中删除人员
从线程 Thread-5 中删除人员
从线程 Thread-4 中删除人员
我不明白为什么。 subscribeOn
方法中指定的调度程序不应该用于每个flatMap
执行吗?
当我取消注释 publishOn
行时,所有内容都由给定的单个调度程序执行,这是预期的。
谁能解释为什么flatMap
操作不使用单个调度程序,而没有publishOn
?
【问题讨论】:
在 flatMap 中,您订阅了由 Reactive Mongo 存储库提供的新 Mono,它可能会在与其订阅的线程不同的线程上发布数据。另一方面,publishOn 强制在哪个线程上将数据发送到下游。这就是为什么它在这种情况下按“预期”工作的原因。你想在线程之间切换的原因是什么?除非你做一些阻塞的事情,否则你根本不需要担心这个。 【参考方案1】:这个人为的例子可能会更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x ->
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
)
.flatMap(x ->
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
)
.flatMap(x ->
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
这将给出类似于:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,这是您看到您所做行为的原因。 “直到下一次出现publishOn()
”并不意味着下次你的代码调用publishOn()
- 它也可以在你的任何flatMap()
调用中的任何发布者中,您将无法控制。
【讨论】:
以上是关于关于 Project Reactor 的 flatMap 中的线程的困惑的主要内容,如果未能解决你的问题,请参考以下文章
Project Reactor 3 中的 publishOn 与 subscribeOn
Project Reactor 之 publishOn 与 subscribeOn
使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件
带有消息代理(例如 Kafka)的事件驱动微服务与反应式编程(RxJava、Project Reactor)以及改进的协议(RSocket)