Webclient 与简单 Flux.just 的 Flux 行为不同

Posted

技术标签:

【中文标题】Webclient 与简单 Flux.just 的 Flux 行为不同【英文标题】:Flux behaving differently for Webclient vs simple Flux.just 【发布时间】:2021-09-11 18:34:09 【问题描述】:

我在 Spring 中有基本的 rest 控制器。为了试用 spring webflux 并了解它的非阻塞性质。我创建了两个控制器映射,一个用于读取,一个用于服务 webclient 调用(如下所示)

@GetMapping("/slow-service-tweets")
private List<String> getAllTweets() 
    try 
        Thread.sleep(2000L); // delay
     catch (InterruptedException e) 
        e.printStackTrace();
    
    return Arrays.asList(
            "Item1", "Item2","Item3");


这是我的测试get api,它只是触发下面给出的代码(第一个版本)

 @GetMapping("/test")
public void doSomething()
    
   log.info("Starting NON-BLOCKING Controller!");
    Flux<String> tweetFlux = WebClient.create()
            .get()
            .uri("http://localhost:9090/slow-service-tweets")
            .retrieve()
            .bodyToFlux(String.class);

    tweetFlux.subscribe(tweet ->
        try 
            log.info("i am back");
            Thread.sleep(6000L);
         catch (InterruptedException e) 
            e.printStackTrace();
        
       log.info(tweet.toString()););

  
   log.info("Exiting NON-BLOCKING Controller!");

以上代码的行为与它应该的完全一样。输出是

Starting NON-BLOCKING Controller!
Exiting NON-BLOCKING Controller!
Item1
Item2
Item3

原因是线程不会阻止订阅通量并继续前进。 现在请看下面的第二个版本。

@GetMapping("/test")
    public void doSomething()
        
        System.out.println("i am here");

        Flux<Integer> f= Flux.just(1,2,3,4,5);
        //  Flux<Integer> f= Flux.fromIterable(testService.returnStaticList());



        f.subscribe(consumer->
            try 
                log.info("consuming");
                Thread.sleep(2000L);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            log.info(consumer);
        );

        log.info("doing something else");
    

最好像前面的例子 “做其他事情”必须立即打印。 但无论我做什么,打印所有元素都需要 10 秒,然后打印“做其他事情”。输出如下:

i am here
consuming
1
2
3
4
5
doing something else

谁能解释一下我在这里缺少什么?

【问题讨论】:

【参考方案1】:

我觉得我需要从一个警告开始 - 这肯定是不是如何使用 Webflux。任何时候你调用订阅,你几乎肯定做错了什么——这应该留给框架。相反,您应该使用:

doOnNext() 用于日志记录等副作用; delayElements() 如果你想延迟 Flux 中的每个元素,而不是使用 Thread.sleep(); 返回你的Flux(或返回一些通过使用运算符链接你的 Flux 创建的发布者)从你的 doSomething() 方法,以允许框架订阅并因此执行你的反应链。

如果您遵循上述“正常”的做事方式,您可能不会遇到这些阻塞/线程导致意外问题的问题。如果你做更多的小众事情,比如自己订阅、屏蔽线程而不考虑它们是否应该被屏蔽等等——那么你可能会给自己带来很多问题。

但是,要直接回答为什么会发生这种行为的问题 - 这是因为线程。当您使用Flux.just() 时,您使用的是即时调度程序(这意味着首先实际执行您的doSomething() 方法的线程。)因为这里只有一个线程在起作用,所以您的订阅方法会阻止这个线程它直到它完成,所以没有其他东西可以执行。例如,如果您告诉您的 FluxboundedElastic() 线程池上发布,您会发现它的行为符合您的预期:

Flux<Integer> f = Flux.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.boundedElastic());

在您的另一个示例中,当您使用 WebClient 时,它在与执行您的方法的线程不同的线程上发布 - 因此在订阅者中阻止这个不同的线程不会阻止您的 doSomething() 方法的整体执行。

【讨论】:

迈克尔,谢谢你的回答。无需警告,因为我提供了一个代码 sn-p,我试图通过 poc 而不是生产代码来了解有关通量的所有内容。 @gooner 很高兴听到这个消息,我不确定何时涉及 Webflux - 我已经看到一些来自生产环境的有趣代码,我们应该说 :-) 是的@michael,我明白了... :D.另外,我正在与另一个我在生产中广泛使用的反应式框架 Vertx 进行比较。那里的事情几乎没有必要。我只关注官方文档:projectreactor.io/docs/core/release/reference/…。并尝试看看最好的行为是什么。同样从后端的角度来看,我将使用 webclient 进行交互以阻止 api 调用,但不确定我是否会为我自己的服务控制器返回 Flux。只有当我的调用者本身是非阻塞客户端时才有意义。

以上是关于Webclient 与简单 Flux.just 的 Flux 行为不同的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Flux<String> 上执行 flatMap() 时获取索引号

Reactor详解之:异常处理

C# WebClient 禁用缓存

java合并两个通量没有重复

如何在 Postman 中查看 Spring 5 Reactive API 的响应?

websocket通信 实现java模拟一个client与webclient通信