Webclient 与简单 Flux.just 的 Flux 行为不同
Posted
技术标签:
【中文标题】Webclient 与简单 Flux.just 的 Flux 行为不同【英文标题】:Flux behaving differently for Webclient vs simple Flux.just 【发布时间】:2021-09-11 18:34:09 【问题描述】:我在 Spring 中有基本的 rest 控制器。为了试用 spring webflux 并了解它的非阻塞性质。我创建了两个控制器映射,一个用于读取,一个用于服务 webclient 调用(如下所示)
@GetMapping("/slow-service-tweets")
private List<String> getAllTweets()
try
Thread.sleep(2000L); // delay
catch (InterruptedException e)
e.printStackTrace();
return Arrays.asList(
"Item1", "Item2","Item3");
这是我的测试get api,它只是触发下面给出的代码(第一个版本)
@GetMapping("/test")
public void doSomething()
log.info("Starting NON-BLOCKING Controller!");
Flux<String> tweetFlux = WebClient.create()
.get()
.uri("http://localhost:9090/slow-service-tweets")
.retrieve()
.bodyToFlux(String.class);
tweetFlux.subscribe(tweet ->
try
log.info("i am back");
Thread.sleep(6000L);
catch (InterruptedException e)
e.printStackTrace();
log.info(tweet.toString()););
log.info("Exiting NON-BLOCKING Controller!");
以上代码的行为与它应该的完全一样。输出是
Starting NON-BLOCKING Controller!
Exiting NON-BLOCKING Controller!
Item1
Item2
Item3
原因是线程不会阻止订阅通量并继续前进。 现在请看下面的第二个版本。
@GetMapping("/test")
public void doSomething()
System.out.println("i am here");
Flux<Integer> f= Flux.just(1,2,3,4,5);
// Flux<Integer> f= Flux.fromIterable(testService.returnStaticList());
f.subscribe(consumer->
try
log.info("consuming");
Thread.sleep(2000L);
catch (InterruptedException e)
e.printStackTrace();
log.info(consumer);
);
log.info("doing something else");
最好像前面的例子 “做其他事情”必须立即打印。 但无论我做什么,打印所有元素都需要 10 秒,然后打印“做其他事情”。输出如下:
i am here
consuming
1
2
3
4
5
doing something else
谁能解释一下我在这里缺少什么?
【问题讨论】:
【参考方案1】:我觉得我需要从一个警告开始 - 这肯定是不是如何使用 Webflux。任何时候你调用订阅,你几乎肯定做错了什么——这应该留给框架。相反,您应该使用:
doOnNext()
用于日志记录等副作用;
delayElements()
如果你想延迟 Flux 中的每个元素,而不是使用 Thread.sleep();
返回你的Flux
(或返回一些通过使用运算符链接你的 Flux 创建的发布者)从你的 doSomething()
方法,以允许框架订阅并因此执行你的反应链。
如果您遵循上述“正常”的做事方式,您可能不会遇到这些阻塞/线程导致意外问题的问题。如果你做更多的小众事情,比如自己订阅、屏蔽线程而不考虑它们是否应该被屏蔽等等——那么你可能会给自己带来很多问题。
但是,要直接回答为什么会发生这种行为的问题 - 这是因为线程。当您使用Flux.just()
时,您使用的是即时调度程序(这意味着首先实际执行您的doSomething()
方法的线程。)因为这里只有一个线程在起作用,所以您的订阅方法会阻止这个线程它直到它完成,所以没有其他东西可以执行。例如,如果您告诉您的 Flux
在 boundedElastic()
线程池上发布,您会发现它的行为符合您的预期:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.boundedElastic());
在您的另一个示例中,当您使用 WebClient 时,它在与执行您的方法的线程不同的线程上发布 - 因此在订阅者中阻止这个不同的线程不会阻止您的 doSomething()
方法的整体执行。
【讨论】:
迈克尔,谢谢你的回答。无需警告,因为我提供了一个代码 sn-p,我试图通过 poc 而不是生产代码来了解有关通量的所有内容。 @gooner 很高兴听到这个消息,我不确定何时涉及 Webflux - 我已经看到一些来自生产环境的有趣代码,我们应该说 :-) 是的@michael,我明白了... :D.另外,我正在与另一个我在生产中广泛使用的反应式框架 Vertx 进行比较。那里的事情几乎没有必要。我只关注官方文档:projectreactor.io/docs/core/release/reference/…。并尝试看看最好的行为是什么。同样从后端的角度来看,我将使用 webclient 进行交互以阻止 api 调用,但不确定我是否会为我自己的服务控制器返回 Flux。只有当我的调用者本身是非阻塞客户端时才有意义。以上是关于Webclient 与简单 Flux.just 的 Flux 行为不同的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Flux<String> 上执行 flatMap() 时获取索引号