响应式编程(下):Spring 5

Posted 企鹅杏仁技术站

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了响应式编程(下):Spring 5相关的知识,希望对你有一定的参考价值。


作者 | Emac

杏仁医生架构师兼平台组负责人,关注微服务、DevOps领域。


上一篇:

引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM 的研究称,整个人类文明所获得的全部数据中,有 90% 是过去两年内产生的。在此背景下,包括NoSQL、Hadoop、Spark、Storm、Kylin在内的大批新技术应运而生。其中以 RxJava 和 Reactor 为代表的响应式(Reactive)编程技术针对的就是经典的大数据4V定义(Volume,Variety,Velocity,Value)中的 Velocity,即高并发问题,而在刚刚发布的 Spring 5 中,也引入了响应式编程的支持。我将分上下两篇与你分享与响应式编程有关的一些学习心得。本篇是下篇,对刚刚发布的 Spring 5 中有关响应式编程的支持做一些简单介绍,并详解一个完整的 Spring 5 示例应用。

1. Spring 5 中的响应式编程

作为 Java 世界首个响应式 Web 框架,Spring 5 最大的亮点莫过于提供了完整的端到端响应式编程的支持。响应式编程(下):Spring 5图片出处:Spring Framework Reference Documentation

左侧是传统的基于 Servlet 的 Spring Web MVC 框架,右侧是 5.0 版本新引入的基于 Reactive Streams 的 Spring WebFlux 框架,从上到下依次是 Router Functions,WebFlux,Reactive Streams 三个新组件。

  • Router Functions: 对标 @Controller、@RequestMapping 等标准的 Spring MVC 注解,提供一套函数式风格的 API,用于创建 Router,Handler 和 Filter。

  • WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。

  • Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有 RxJava 和 Reactor,Spring WebFlux 默认集成的是 Reactor。

在Web容器的选择上,Spring WebFlux 既支持像 Tomcat,Jetty 这样的的传统容器(前提是支持 Servlet 3.1 Non-Blocking IO API),又支持像 Netty,Undertow 那样的异步容器。不管是何种容器,Spring WebFlux 都会将其输入输出流适配成 Flux<DataBuffer> 格式,以便进行统一处理。

值得一提的是,除了新的 Router Functions 接口,Spring WebFlux 同时支持使用老的 Spring MVC 注解声明Reactive Controller。和传统的 MVC Controller 不同,Reactive Controller 操作的是非阻塞的ServerHttpRequest 和 ServerHttpResponse,而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。

2. 实战

下面我将以一个简单的 Spring 5 应用为例,介绍如何使用 Spring 5 快速搭建一个响应式Web应用(以下简称 RP 应用)。

2.1 环境准备

然后,从 MongoDB 官网下载最新版本的MongoDB,然后在命令行下运行 mongod &启动服务。

现在,可以先试着跑一下项目中自带的测试用例。

./gradlew clean build

2.2 依赖介绍

接下来,看一下这个示例应用里的和响应式编程相关的依赖。

compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
  • spring-boot-starter-webflux: 启用 Spring 5 的 RP(Reactive Programming)支持,这是使用 Spring 5 开发 RP 应用的必要条件,就好比 spring-boot-starter-web 之于传统的 Spring MVC 应用。

  • spring-boot-starter-data-mongodb-reactive: Spring 5 中新引入的针对 MongoDB 的 Reactive Data 扩展库,允许通过统一的 RP 风格的API操作 MongoDB。

2.3 第一种方式:MVC 注解

Spring 5 提供了 Spring MVC 注解和 Router Functions 两种方式来编写 RP 应用。首先,我先用大家最熟悉的MVC注解来展示如何编写一个最简单的 RP Controller。

示例代码

@RestController
public class RestaurantController {
/**
    * 扩展ReactiveCrudRepository接口,提供基本的CRUD操作
    */
private final RestaurantRepository restaurantRepository;

/**
    * spring-boot-starter-data-mongodb-reactive提供的通用模板
    */
private final ReactiveMongoTemplate reactiveMongoTemplate;

public RestaurantController(RestaurantRepository restaurantRepository, ReactiveMongoTemplate reactiveMongoTemplate) {
this.restaurantRepository = restaurantRepository;
this.reactiveMongoTemplate = reactiveMongoTemplate;
}

@GetMapping("/reactive/restaurants")
public Flux<Restaurant> findAll() {
return restaurantRepository.findAll();
}

@GetMapping("/reactive/restaurants/{id}")
public Mono<Restaurant> get(@PathVariable String id) {
return restaurantRepository.findById(id);
}

@PostMapping("/reactive/restaurants")
public Flux<Restaurant> create(@RequestBody Flux<Restaurant> restaurants) {
return restaurants
.buffer(10000)
.flatMap(rs -> reactiveMongoTemplate.insert(rs, Restaurant.class));
}

@DeleteMapping("/reactive/restaurants/{id}")
public Mono<Void> delete(@PathVariable String id) {
return restaurantRepository.deleteById(id);
}
}

可以看到,实现一个 RP Controller 和一个普通的 Controller 是非常类似的,最核心的区别是,优先使用 RP 中最基础的两种数据类型,Flux(对应多值)和Mono(单值),尤其是方法的参数和返回值。即便是空返回值,也应封装为Mono<Void>。这样做的目的是,使得应用能够以一种统一的符合 RP 规范的方式处理数据,最理想的情况是从最底层的数据库(或者其他系统外部调用),到最上层的 Controller 层,所有数据都不落地,经由各种 Flux  Mono 铺设的“管道”,直供调用端。就像农夫山泉那句著名的广告词,我们不生产水,我们只是大自然的搬运工。

单元测试

和非 RP 应用的单元测试相比,RP 应用的单元测试主要是使用了一个 Spring 5 新引入的测试工具类,WebTestClient,专门用于测试 RP 应用。

@RunWith(SpringRunner.class)
@SpringBootTest
public class RestaurantControllerTests {
@Test
public void testNormal() throws InterruptedException {
// start from scratch
restaurantRepository.deleteAll().block();

// prepare
WebTestClient webClient = WebTestClient.bindToController(new RestaurantController(restaurantRepository, reactiveMongoTemplate)).build();
Restaurant[] restaurants = IntStream.range(0, 100)
.mapToObj(String::valueOf)
.map(s -> new Restaurant(s, s, s))
.toArray(Restaurant[]::new);

// create
webClient.post().uri("/reactive/restaurants")
.accept(MediaType.APPLICATION_JSON_UTF8)
.syncBody(restaurants)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBodyList(Restaurant.class)
.hasSize(100)
.consumeWith(rs -> Flux.fromIterable(rs.getResponseBody())
.log()
.subscribe(r1 -> {
// get
webClient.get()
.uri("/reactive/restaurants/{id}", r1.getId())
.accept(MediaType.APPLICATION_JSON_UTF8)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBody(Restaurant.class)
.consumeWith(r2 -> Assert.assertEquals(r1, r2));
})
);
}
}

创建 WebTestClient 实例时,首先要绑定一下待测试的 RP Controller。可以看到,和业务类一样,编写 RP 应用的单元测试,同样也是数据不落地的流式风格。

2.4 第二种方式:Router Functions

接着介绍实现 RP 应用的另一种实现方式 —— Router Functions。

Router Functions 是 Spring 5 新引入的一套 Reactive 风格(基于 Flux 和 Mono)的函数式接口,主要包括RouterFunctionHandlerFunctionHandlerFilterFunction,分别对应 Spring MVC 中的@RequestMapping@Controller 和 HandlerInterceptor(或者 Servlet 规范中的 Filter)。

和 Router Functions 搭配使用的是两个新的请求/响应模型,ServerRequest和 ServerResponse,这两个模型同样提供了 Reactive 风格的接口

示例代码

自定义 RouterFunction 和 HandlerFilterFunction
@Configuration
public class RestaurantServer implements CommandLineRunner {
@Autowired
private RestaurantHandler restaurantHandler;

/**
    * 注册自定义RouterFunction
    */
@Bean
public RouterFunction<ServerResponse> restaurantRouter() {
RouterFunction<ServerResponse> router = route(GET("/reactive/restaurants").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::findAll)
.andRoute(GET("/reactive/delay/restaurants").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::findAllDelay)
.andRoute(GET("/reactive/restaurants/{id}").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::get)
.andRoute(POST("/reactive/restaurants").and(accept(APPLICATION_JSON_UTF8)).and(contentType(APPLICATION_JSON_UTF8)), restaurantHandler::create)
.andRoute(DELETE("/reactive/restaurants/{id}").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::delete)
// 注册自定义HandlerFilterFunction
.filter((request, next) -> {
if (HttpMethod.PUT.equals(request.method())) {
return ServerResponse.status(HttpStatus.BAD_REQUEST).build();
}
return next.handle(request);
});
return router;
}

@Override
public void run(String... args) throws Exception {
RouterFunction<ServerResponse> router = restaurantRouter();
// 转化为通用的Reactive HttpHandler
HttpHandler httpHandler = toHttpHandler(router);
// 适配成Netty Server所需的Handler
ReactorHttpHandlerAdapter httpAdapter = new ReactorHttpHandlerAdapter(httpHandler);
// 创建Netty Server
HttpServer server = HttpServer.create("localhost", 9090);
// 注册Handler并启动Netty Server
server.newHandler(httpAdapter).block();
}
}

可以看到,使用 Router Functions 实现 RP 应用时,你需要自己创建和管理容器,也就是说 Spring 5 并没有针对 Router Functions 提供 IoC 支持,这是 Router Functions 和 Spring MVC 相比最大的不同。除此之外,你需要通过RouterFunction 的 API(而不是注解)来配置路由表和过滤器。对于简单的应用,这样做问题不大,但对于上规模的应用,就会导致两个问题:1)Router 的定义越来越庞大;2)由于 URI 和 Handler 分开定义,路由表的维护成本越来越高。那为什么 Spring 5 会选择这种方式定义 Router 呢?接着往下看。

自定义 HandlerFunction
@Component
public class RestaurantHandler {
/**
    * 扩展ReactiveCrudRepository接口,提供基本的CRUD操作
    */
private final RestaurantRepository restaurantRepository;

/**
    * spring-boot-starter-data-mongodb-reactive提供的通用模板
    */
private final ReactiveMongoTemplate reactiveMongoTemplate;

public RestaurantHandler(RestaurantRepository restaurantRepository, ReactiveMongoTemplate reactiveMongoTemplate) {
this.restaurantRepository = restaurantRepository;
this.reactiveMongoTemplate = reactiveMongoTemplate;
}

public Mono<ServerResponse> findAll(ServerRequest request) {
Flux<Restaurant> result = restaurantRepository.findAll();
return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
}

public Mono<ServerResponse> findAllDelay(ServerRequest request) {
Flux<Restaurant> result = restaurantRepository.findAll().delayElements(Duration.ofSeconds(1));
return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
}

public Mono<ServerResponse> get(ServerRequest request) {
String id = request.pathVariable("id");
Mono<Restaurant> result = restaurantRepository.findById(id);
return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
}

public Mono<ServerResponse> create(ServerRequest request) {
Flux<Restaurant> restaurants = request.bodyToFlux(Restaurant.class);
Flux<Restaurant> result = restaurants
.buffer(10000)
.flatMap(rs -> reactiveMongoTemplate.insert(rs, Restaurant.class));
return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
}

public Mono<ServerResponse> delete(ServerRequest request) {
String id = request.pathVariable("id");
Mono<Void> result = restaurantRepository.deleteById(id);
return ok().contentType(APPLICATION_JSON_UTF8).build(result);
}
}

对比前面的 RestaurantController,由于去除了路由信息,RestaurantHandler 变得非常函数化,可以说就是一组相关的HandlerFunction 的集合,同时各个方法的可复用性也大为提升。这就回答了上一小节提出的疑问,即以牺牲可维护性为代价,换取更好的函数特性。

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RestaurantHandlerTests extends BaseUnitTests {

@Autowired
private RouterFunction<ServerResponse> restaurantRouter;

@Override
protected WebTestClient prepareClient() {
WebTestClient webClient = WebTestClient.bindToRouterFunction(restaurantRouter)
.configureClient().baseUrl("http://localhost:9090").responseTimeout(Duration.ofMinutes(1)).build();
return webClient;
}
}

和针对 Controller 的单元测试相比,编写 Handler 的单元测试的主要区别在于初始化 WebTestClient 方式的不同,测试方法的主体可以完全复用。

3 小结

到此,有关响应式编程的介绍就暂且告一段落。回顾这两篇文章,我先是从响应式宣言说起,然后介绍了响应式编程的基本概念和关键特性,并且详解了 Spring 5 中和响应式编程相关的新特性,最后以一个示例应用结尾。希望读完这些文章,对你理解响应式编程能有所帮助。

4 参考

  • Spring Framework Reference - WebFlux framework

  • spring-framework Reactive Tests

  • poutsma/web-function-sample



全文完



以下文章您可能也会感兴趣:


我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。




杏仁技术站







以上是关于响应式编程(下):Spring 5的主要内容,如果未能解决你的问题,请参考以下文章

学习推荐5:Spring响应式编程实战

厉害了,Spring 5 面向响应式编程!

当RX遇上Spring——Spring 5新特性之响应式Web应用

html 将以编程方式附加外部脚本文件的javascript代码片段,并按顺序排列。用于响应式网站,其中ma

Kotlin结合Spring WebFlux实现响应式编程

Spring 5 响应式开发