Kotlin结合Spring WebFlux实现响应式编程
Posted 零壹技术栈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin结合Spring WebFlux实现响应式编程相关的知识,希望对你有一定的参考价值。
前言
简单的来说,响应式编程是基于事件流的编程方式,而事件流是将要发生事件按照时间顺序排序成一种单向数据流。其中,RxJava 和 Reactor 是 Java 生态对 Reactive 响应式编程规范的具体实现,而 Spring 在 5.0 版本中引入了响应式编程框架 WebFlux,它就是基于 Reactor 实现的异步 Web 框架 。本文就要介绍如何使用 Kotlin 和 Spring WebFlux 实现基于流的异步编程。
正文
Spring 5.0 响应式Web框架
其中,左侧是传统的基于 Servlet 的 Spring Web MVC 框架,右侧是 5.0 版本新引入的基于 Reactive Streams 的 Spring WebFlux 框架。
从上到下依次是以下三个新组件:
Router Functions
WebFlux
Reactive Streams
Router Functions
对标 @Controller,@RequestMapping 等一系列标准的 Spring MVC 注解,提供一套函数式风格的 API,用于创建 Router,Handler 和 Filter。
WebFlux
最核心的组件,协调上下游各个组件提供响应式编程支持。
Reactive Streams
一种支持背压(Backpressure)的异步数据流处理标准,主流实现有 RxJava 和 Reactor,Spring WebFlux 默认集成的是 Reactor。
Web Container
在 Web 容器的选择上,Spring WebFlux 既支持像 Tomcat,Jetty 这样的的同步容器(前提是支持 Servlet 3.1 Non-Blocking IO API),又支持像 Netty,Undertow 这样的异步容器。不管是何种容器,Spring WebFlux 都会将其输入输出流适配成 Flux
值得一提的是,除了新的 Router Functions 接口,Spring WebFlux 同时支持使用老的 Spring MVC 注解声明 API。和传统的 Controller 不同的是,Reactive Controller 使用的是非阻塞的 ServerHttpRequest 和 ServerHttpResponse,而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。
示例工程详解
在线使用 Spring Initializr 配置一个 Kotlin 的 Web 项目,配置 Reactive Web 依赖,如图所示:
下载 Gradle 项目压缩包,导入 IDEA 编辑器中,查看项目的依赖结构如下:
工程目录结构如下:
├── build.gradle
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── src
│ ├── main
│ │ ├── java
│ │ ├── kotlin
│ │ │ └── com
│ │ │ └── easy
│ │ │ └── kotlin
│ │ │ └── webflux
│ │ │ └── WebfluxApplication.kt
│ │ └── resources
│ │ └── application.properties
│ └── test
│ ├── java
│ ├── kotlin
│ │ └── com
│ │ └── easy
│ │ └── kotlin
│ │ └── webflux
│ │ └── WebfluxApplicationTests.kt
│ └── resources
└── webflux.iml
build.gradle
buildscript {
ext {
kotlinVersion = '1.1.51'
springBootVersion = '2.0.0.BUILD-SNAPSHOT'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")
}
}
apply plugin: 'kotlin'
apply plugin: 'kotlin-spring'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.easy.kotlin'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-webflux')
compile("org.jetbrains.kotlin:kotlin-stdlib-jre8")
compile("org.jetbrains.kotlin:kotlin-reflect")
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
}
这是 Spring Initializr 帮我们自动生成的样板工程。下面我们分别来加入 model 、dao 、 service 、 handler 等模块的内容。
源码目录结构设计如下:
├── src
│ ├── main
│ │ ├── java
│ │ ├── kotlin
│ │ │ └── com
│ │ │ └── easy
│ │ │ └── kotlin
│ │ │ └── webflux
│ │ │ ├── WebfluxApplication.kt
│ │ │ ├── dao
│ │ │ │ └── PersonRepository.kt
│ │ │ ├── handler
│ │ │ │ └── PersonHandler.kt
│ │ │ ├── model
│ │ │ │ └── Person.kt
│ │ │ ├── router
│ │ │ │ └── RouterConfig.kt
│ │ │ ├── server
│ │ │ │ └── HttpServerConfig.kt
│ │ │ └── service
│ │ │ └── PersonService.kt
│ │ └── resources
│ │ └── application.properties
Person.kt
class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) {
override fun toString(): String {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}'
}
}
PersonRepository.kt
interface PersonRepository {
fun getPerson(id: Int): Mono<Person>
fun allPeople(): Flux<Person>
fun savePerson(person: Mono<Person>): Mono<Void>
}
PersonService.kt
@Service
class PersonService : PersonRepository {
var persons: MutableMap<Int, Person> = hashMapOf()
constructor() {
this.persons[1] = Person("Jack", 20)
this.persons[2] = Person("Rose", 16)
}
override fun getPerson(id: Int): Mono<Person> {
return Mono.justOrEmpty(this.persons[id])
}
override fun allPeople(): Flux<Person> {
return Flux.fromIterable(this.persons.values)
}
override fun savePerson(person: Mono<Person>): Mono<Void> {
return person.doOnNext {
val id = this.persons.size + 1
persons.put(id, it)
println("Saved ${person} with ${id}")
}.thenEmpty(Mono.empty())
}
}
PersonHandler.kt
@Service
class PersonHandler {
@Autowired lateinit var repository: PersonRepository
fun getPerson(request: ServerRequest): Mono<ServerResponse> {
val personId = Integer.valueOf(request.pathVariable("id"))!!
val notFound = ServerResponse.notFound().build()
val personMono = this.repository.getPerson(personId)
return personMono
.flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) }
.switchIfEmpty(notFound)
}
fun createPerson(request: ServerRequest): Mono<ServerResponse> {
val person = request.bodyToMono(Person::class.java)
return ServerResponse.ok().build(this.repository.savePerson(person))
}
fun listPeople(request: ServerRequest): Mono<ServerResponse> {
val people = this.repository.allPeople()
return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java)
}
}
RouterConfig.kt
@Configuration
class RouterConfig {
@Autowired lateinit var personHandler: PersonHandler
@Bean
fun routerFunction(): RouterFunction<*> {
return route(GET("/api/person").and(accept(APPLICATION_JSON)),
HandlerFunction { personHandler.listPeople(it) })
.and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)),
HandlerFunction { personHandler.getPerson(it) }))
}
}
HttpServerConfig.kt
@Configuration
class HttpServerConfig {
@Autowired
lateinit var environment: Environment
@Bean
fun httpServer(routerFunction: RouterFunction<*>): HttpServer {
val httpHandler = RouterFunctions.toHttpHandler(routerFunction)
val adapter = ReactorHttpHandlerAdapter(httpHandler)
val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt())
server.newHandler(adapter)
return server
}
}
WebfluxApplication.kt
@SpringBootApplication
class WebfluxApplication {
fun main(args: Array<String>) {
runApplication<WebfluxApplication>(*args)
}
}
启动入口类运行以后,输出的日志如下图所示:
在输出的日志我们注意到这两行:
Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372
((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319
测试输出
$ curl http://127.0.0.1:9000/api/person
[{"name":"Jack","age":20},{"name":"Rose","age":16}]
$ curl http://127.0.0.1:9000/api/person/1
{"name":"Jack","age":20}
$ curl http://127.0.0.1:9000/api/person/2
{"name":"Rose","age":16}
小结
Spring Web 是一个命令式的编程框架,可以很方便的进行开发和调试。你需要根据实际情况去决定采用 Spring 5 Reactive 或者是 Spring Web 命令式框架。在很多情况下,命令式的编程风格就可以满足,但当你的应用需要高可伸缩性,那么 Reactive 非堵塞方式是最适合的。
参考资料
《 Kotlin + Spring Boot : K2EE 服务端开发实战 》
《 Kotlin 极简教程》
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。
以上是关于Kotlin结合Spring WebFlux实现响应式编程的主要内容,如果未能解决你的问题,请参考以下文章
Spring WebFlux Reactive 和 Kotlin Coroutines 启动错误
Kotlin Spring Boot Webflux 使用 @Valid 注解验证 @RequestBody
如何在 Spring Boot 和 Spring WebFlux 中使用“功能 bean 定义 Kotlin DSL”?
Spring WebFlux 使用 RSocket:Kotlin 协程 Flow 与 Reactor Flux 消息格式