自学Spring WebFlux

Posted 舒城豪杰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自学Spring WebFlux相关的知识,希望对你有一定的参考价值。

一、什么是响应式编程(Reactive Progarmming)



响应式编程(Reactive Progarmming)是一种面向数据流和变化传播的编程范式。



响应式编程主要处理二个问题:

1、异步非阻塞

2、流速控制



Reactive Progarmming模型:


二、响应式编程的优势



Java 提供了两种异步编程方式:


1、回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener

2、Futures(这个方法是阻塞的) :异步方法 立即 返回一个 Future<T>,该异步方法要返回结果的是 T 类型,通过Future`封装。这个结果并不是 *立刻* 可以拿到,而是等实际处理结束才可用。比如, `ExecutorService 执行 Callable<T> 任务时会返回 Future 对象。



官方示例:

回调地狱 Callback Hell:获取前5的建议,如果没有建议,则采用默认的建议,传统的回调写法

userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() { public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); }});



Reactor的写法

userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);



Reactor 中增加超时控制的例子

userService.getFavorites(userId) // 数据在800ms内获得 .timeout(Duration.ofMillis(800)) // 如果超时/异常,从缓存中获取 .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);



CompletableFuture的写法(异步阻塞)

CompletableFuture<List<String>> ids = ifhIds();CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip = l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList()));});List<String> results = result.join();assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");



Reactor 实现与 Future 同样功能的代码

Flux<String> ids = ifhrIds();Flux<String> combinations = ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });Mono<List<String>> result = combinations.collectList();List<String> results = result.block();assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");



你可以通过代码对比,发现Reactor的代码可读性、可编排性更强、并且提供了背压的支持。单次请求的处理耗时并不能得到有效提升,但是你可以用固定数量的线程和较少的内存实现扩展



三、Reactor基础特性


Reactor 引入了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操作方式。一个 Flux 对象代表一个包含 0..N 个元素的响应式序列,而一个 Mono 对象代表一个包含 零/一个(0..1)元素的结果。



Flux 对象代表一个包含 0..N 个元素的响应式序列



Mono 对象代表一个包含 零/一个(0..1)元素的结果


看完这篇文章,你应该要对Reactor有个基础认知,为后期的Spring WebFlux的实战打下基础



参考: 

https://projectreactor.io/

https://blog.csdn.net/get_set/article/details/79480233

以上是关于自学Spring WebFlux的主要内容,如果未能解决你的问题,请参考以下文章

Spring WebFlux 没有流式响应

一文带您搞清什么是 Spring Boot 2.x WebFlux

spring5 webflux,如何返回自定义json数据?

Spring Boot 2.0 WebFlux 教程 | 入门篇

Spring webflux请求回调不起作用

如何使用 Spring Boot 对 WebFlux 进行异常处理?