关于 Project Reactor 的 flatMap 中的线程的困惑

Posted

技术标签:

【中文标题】关于 Project Reactor 的 flatMap 中的线程的困惑【英文标题】:Confusion about threads in Project Reactor's flatMap 【发布时间】:2019-12-18 10:13:54 【问题描述】:

我正在使用 Project Reactor 和响应式 MongoDB 存储库。我有以下代码:

@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person 
    @Id
    Integer id;
    String name;

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> 

和主@SpringBootApplication类:

@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication 

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) 
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    

    @PostConstruct
    public void postConstruct() 
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> 
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        )
                        //.publishOn(single)
                        .flatMap(savedPerson -> 
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        )
                        //.publishOn(single)
                        .flatMap(foundPerson -> 
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        )
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    

Flux::subscribeOn 方法描述说:

因此,将此运算符放在链中的任何位置也会影响 onNext/onError/onComplete 信号的执行 * 上下文来自 链的开头直到 * 下一次出现的 @link publishOn(调度程序) publishOn

这让我有点困惑,因为当我没有在处理链中指定任何 publishOn 时,线程名称的打印值为:

从线程 single-scheduler-1 中保存人员 - 正如预期的那样

从线程 Thread-13 找人

从线程 Thread-6 找人

从线程 Thread-15 找人

从线程 Thread-6 中删除人员

从线程 Thread-5 中删除人员

从线程 Thread-4 中删除人员

我不明白为什么。 subscribeOn 方法中指定的调度程序不应该用于每个flatMap 执行吗?

当我取消注释 publishOn 行时,所有内容都由给定的单个调度程序执行,这是预期的。

谁能解释为什么flatMap 操作不使用单个调度程序,而没有publishOn

【问题讨论】:

在 flatMap 中,您订阅了由 Reactive Mongo 存储库提供的新 Mono,它可能会在与其订阅的线程不同的线程上发布数据。另一方面,publishOn 强制在哪个线程上将数据发送到下游。这就是为什么它在这种情况下按“预期”工作的原因。你想在线程之间切换的原因是什么?除非你做一些阻塞的事情,否则你根本不需要担心这个。 【参考方案1】:

这个人为的例子可能会更清楚:

Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> 
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        )
        .flatMap(x -> 
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        )
        .flatMap(x -> 
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        )
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));

这将给出类似于:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4

或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,是您看到您所做行为的原因。 “直到下一次出现publishOn()”并不意味着下次你的代码调用publishOn() - 它也可以在你的任何flatMap()调用中的任何发布者中,您将无法控制。

【讨论】:

以上是关于关于 Project Reactor 的 flatMap 中的线程的困惑的主要内容,如果未能解决你的问题,请参考以下文章

Project Reactor 3 中的 publishOn 与 subscribeOn

使用Project Reactor对反应流进行递归

Project Reactor 之 publishOn 与 subscribeOn

使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件

Project Reactor:仅在未发出第一项时通量超时

带有消息代理(例如 Kafka)的事件驱动微服务与反应式编程(RxJava、Project Reactor)以及改进的协议(RSocket)