Kotlin结合Spring WebFlux实现响应式编程

Posted 零壹技术栈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin结合Spring WebFlux实现响应式编程相关的知识,希望对你有一定的参考价值。

前言

简单的来说,响应式编程是基于事件流的编程方式,而事件流是将要发生事件按照时间顺序排序成一种单向数据流。其中,RxJava 和 Reactor 是 Java 生态对 Reactive 响应式编程规范的具体实现,而 Spring 在 5.0 版本中引入了响应式编程框架 WebFlux,它就是基于 Reactor 实现的异步 Web 框架 。本文就要介绍如何使用 Kotlin 和 Spring WebFlux 实现基于流的异步编程。

正文

Spring 5.0 响应式Web框架

其中,左侧是传统的基于 Servlet 的 Spring Web MVC 框架,右侧是 5.0 版本新引入的基于 Reactive Streams 的 Spring WebFlux 框架。

Kotlin结合Spring WebFlux实现响应式编程

从上到下依次是以下三个新组件:

  • Router Functions

  • WebFlux

  • Reactive Streams

Router Functions

对标 @Controller,@RequestMapping 等一系列标准的 Spring MVC 注解,提供一套函数式风格的 API,用于创建 Router,Handler 和 Filter。

WebFlux

最核心的组件,协调上下游各个组件提供响应式编程支持。

Reactive Streams

一种支持背压(Backpressure)的异步数据流处理标准,主流实现有 RxJava 和 Reactor,Spring WebFlux 默认集成的是 Reactor。

Web Container

在 Web 容器的选择上,Spring WebFlux 既支持像 Tomcat,Jetty 这样的的同步容器(前提是支持 Servlet 3.1 Non-Blocking IO API),又支持像 Netty,Undertow 这样的异步容器。不管是何种容器,Spring WebFlux 都会将其输入输出流适配成 Flux 格式,以便进行统一处理。

值得一提的是,除了新的 Router Functions 接口,Spring WebFlux 同时支持使用老的 Spring MVC 注解声明 API。和传统的 Controller 不同的是,Reactive Controller 使用的是非阻塞的 ServerHttpRequest 和 ServerHttpResponse,而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。

示例工程详解

在线使用 Spring Initializr 配置一个 Kotlin 的 Web 项目,配置 Reactive Web 依赖,如图所示:

Kotlin结合Spring WebFlux实现响应式编程

下载 Gradle 项目压缩包,导入 IDEA 编辑器中,查看项目的依赖结构如下:

Kotlin结合Spring WebFlux实现响应式编程

工程目录结构如下:

 
   
   
 
  1. ├── build.gradle

  2. ├── gradle

  3. └── wrapper

  4. ├── gradle-wrapper.jar

  5. └── gradle-wrapper.properties

  6. ├── gradlew

  7. ├── gradlew.bat

  8. ├── src

  9. ├── main

  10. ├── java

  11. ├── kotlin

  12. └── com

  13. └── easy

  14. └── kotlin

  15. └── webflux

  16. └── WebfluxApplication.kt

  17. └── resources

  18. └── application.properties

  19. └── test

  20. ├── java

  21. ├── kotlin

  22. └── com

  23. └── easy

  24. └── kotlin

  25. └── webflux

  26. └── WebfluxApplicationTests.kt

  27. └── resources

  28. └── webflux.iml

build.gradle

 
   
   
 
  1. buildscript {

  2. ext {

  3. kotlinVersion = '1.1.51'

  4. springBootVersion = '2.0.0.BUILD-SNAPSHOT'

  5. }

  6. repositories {

  7. mavenCentral()

  8. maven { url "https://repo.spring.io/snapshot" }

  9. maven { url "https://repo.spring.io/milestone" }

  10. }

  11. dependencies {

  12. classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")

  13. classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")

  14. classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")

  15. }

  16. }


  17. apply plugin: 'kotlin'

  18. apply plugin: 'kotlin-spring'

  19. apply plugin: 'eclipse'

  20. apply plugin: 'org.springframework.boot'

  21. apply plugin: 'io.spring.dependency-management'


  22. group = 'com.easy.kotlin'

  23. version = '0.0.1-SNAPSHOT'

  24. sourceCompatibility = 1.8

  25. compileKotlin {

  26. kotlinOptions.jvmTarget = "1.8"

  27. }

  28. compileTestKotlin {

  29. kotlinOptions.jvmTarget = "1.8"

  30. }


  31. repositories {

  32. mavenCentral()

  33. maven { url "https://repo.spring.io/snapshot" }

  34. maven { url "https://repo.spring.io/milestone" }

  35. }


  36. dependencies {

  37. compile('org.springframework.boot:spring-boot-starter-webflux')

  38. compile("org.jetbrains.kotlin:kotlin-stdlib-jre8")

  39. compile("org.jetbrains.kotlin:kotlin-reflect")

  40. testCompile('org.springframework.boot:spring-boot-starter-test')

  41. testCompile('io.projectreactor:reactor-test')

  42. }

这是 Spring Initializr 帮我们自动生成的样板工程。下面我们分别来加入 model 、dao 、 service 、 handler 等模块的内容。

源码目录结构设计如下:

 
   
   
 
  1. ├── src

  2. ├── main

  3. ├── java

  4. ├── kotlin

  5. └── com

  6. └── easy

  7. └── kotlin

  8. └── webflux

  9. ├── WebfluxApplication.kt

  10. ├── dao

  11. └── PersonRepository.kt

  12. ├── handler

  13. └── PersonHandler.kt

  14. ├── model

  15. └── Person.kt

  16. ├── router

  17. └── RouterConfig.kt

  18. ├── server

  19. └── HttpServerConfig.kt

  20. └── service

  21. └── PersonService.kt

  22. └── resources

  23. └── application.properties

Person.kt

 
   
   
 
  1. class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) {


  2. override fun toString(): String {

  3. return "Person{" +

  4. "name='" + name + '\'' +

  5. ", age=" + age +

  6. '}'

  7. }

  8. }

PersonRepository.kt

 
   
   
 
  1. interface PersonRepository {

  2. fun getPerson(id: Int): Mono<Person>

  3. fun allPeople(): Flux<Person>

  4. fun savePerson(person: Mono<Person>): Mono<Void>

  5. }

PersonService.kt

 
   
   
 
  1. @Service

  2. class PersonService : PersonRepository {

  3. var persons: MutableMap<Int, Person> = hashMapOf()


  4. constructor() {

  5. this.persons[1] = Person("Jack", 20)

  6. this.persons[2] = Person("Rose", 16)

  7. }


  8. override fun getPerson(id: Int): Mono<Person> {

  9. return Mono.justOrEmpty(this.persons[id])

  10. }


  11. override fun allPeople(): Flux<Person> {

  12. return Flux.fromIterable(this.persons.values)

  13. }


  14. override fun savePerson(person: Mono<Person>): Mono<Void> {

  15. return person.doOnNext {

  16. val id = this.persons.size + 1

  17. persons.put(id, it)

  18. println("Saved ${person} with ${id}")

  19. }.thenEmpty(Mono.empty())


  20. }

  21. }

PersonHandler.kt

 
   
   
 
  1. @Service

  2. class PersonHandler {

  3. @Autowired lateinit var repository: PersonRepository


  4. fun getPerson(request: ServerRequest): Mono<ServerResponse> {

  5. val personId = Integer.valueOf(request.pathVariable("id"))!!

  6. val notFound = ServerResponse.notFound().build()

  7. val personMono = this.repository.getPerson(personId)

  8. return personMono

  9. .flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) }

  10. .switchIfEmpty(notFound)

  11. }


  12. fun createPerson(request: ServerRequest): Mono<ServerResponse> {

  13. val person = request.bodyToMono(Person::class.java)

  14. return ServerResponse.ok().build(this.repository.savePerson(person))

  15. }


  16. fun listPeople(request: ServerRequest): Mono<ServerResponse> {

  17. val people = this.repository.allPeople()

  18. return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java)

  19. }

  20. }

RouterConfig.kt

 
   
   
 
  1. @Configuration

  2. class RouterConfig {

  3. @Autowired lateinit var personHandler: PersonHandler


  4. @Bean

  5. fun routerFunction(): RouterFunction<*> {

  6. return route(GET("/api/person").and(accept(APPLICATION_JSON)),

  7. HandlerFunction { personHandler.listPeople(it) })


  8. .and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)),

  9. HandlerFunction { personHandler.getPerson(it) }))

  10. }

  11. }

HttpServerConfig.kt

 
   
   
 
  1. @Configuration

  2. class HttpServerConfig {

  3. @Autowired

  4. lateinit var environment: Environment


  5. @Bean

  6. fun httpServer(routerFunction: RouterFunction<*>): HttpServer {

  7. val httpHandler = RouterFunctions.toHttpHandler(routerFunction)

  8. val adapter = ReactorHttpHandlerAdapter(httpHandler)

  9. val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt())

  10. server.newHandler(adapter)

  11. return server

  12. }

  13. }

WebfluxApplication.kt

 
   
   
 
  1. @SpringBootApplication

  2. class WebfluxApplication {

  3. fun main(args: Array<String>) {

  4. runApplication<WebfluxApplication>(*args)

  5. }

  6. }

启动入口类运行以后,输出的日志如下图所示:

在输出的日志我们注意到这两行:

 
   
   
 
  1. Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372

  2. ((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319

测试输出

 
   
   
 
  1. $ curl http://127.0.0.1:9000/api/person

  2. [{"name":"Jack","age":20},{"name":"Rose","age":16}]


  3. $ curl http://127.0.0.1:9000/api/person/1

  4. {"name":"Jack","age":20}


  5. $ curl http://127.0.0.1:9000/api/person/2

  6. {"name":"Rose","age":16}

小结

Spring Web 是一个命令式的编程框架,可以很方便的进行开发和调试。你需要根据实际情况去决定采用 Spring 5 Reactive 或者是 Spring Web 命令式框架。在很多情况下,命令式的编程风格就可以满足,但当你的应用需要高可伸缩性,那么 Reactive 非堵塞方式是最适合的。

参考资料

  • 《 Kotlin + Spring Boot : K2EE 服务端开发实战 》

  • 《 Kotlin 极简教程》

本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。


以上是关于Kotlin结合Spring WebFlux实现响应式编程的主要内容,如果未能解决你的问题,请参考以下文章

Spring WebFlux Reactive 和 Kotlin Coroutines 启动错误

Kotlin Spring Boot Webflux 使用 @Valid 注解验证 @RequestBody

如何在 Spring Boot 和 Spring WebFlux 中使用“功能 bean 定义 Kotlin DSL”?

Spring WebFlux 使用 RSocket:Kotlin 协程 Flow 与 Reactor Flux 消息格式

Spring WebFlux:只允许一个连接接收订阅者

使用 Kotlin 协程的 Spring Boot Rest 服务