springboot3+r2dbc——响应式编程实践
Posted 麒思妙想
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot3+r2dbc——响应式编程实践相关的知识,希望对你有一定的参考价值。
Spring boot3
已经M1
了,最近群佬们也开始蠢蠢欲动的开始整活Reactive
+Spring Boot3
,跟着大家的步伐,我也来整一篇工程入门,我们将用java17
+Spring Boot3
+r2dbc
+Reactive
栈来讲述,欢迎大家来讨论。(关于响应式,请大家异步到之前的文章里,有详细介绍。)
r2dbc
Reactor
还有基于其之上的Spring WebFlux
框架。包括vert.x
,rxjava
等等reactive
技术。我们实际上在应用层已经有很多优秀的响应式处理框架。
但是有一个问题就是所有的框架都需要获取底层的数据,而基本上关系型数据库的底层读写都还是同步的。
为了解决这个问题,出现了两个标准,一个是oracle
提出的 ADBC
(Asynchronous Database Access API),另一个就是Pivotal
提出的R2DBC
(Reactive Relational Database Connectivity)。
R2DBC
是基于Reactive Streams
标准来设计的。通过使用R2DBC
,你可以使用reactive API
来操作数据。
同时R2DBC
只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。
今天我们以r2dbc-h2
为例,讲解一下r2dbc
在Spring webFlux
中的使用。
工程依赖
以下是 pom.xml
清单
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.0-M1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>wang.datahub</groupId>
<artifactId>springboot3demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot3demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-groovy-templates</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<!-- <version>3.4.14</version>-->
<!-- <scope>compile</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
</pluginRepositories>
</project>
配置文件
这里我们只配置了r2dbc链接信息
r2dbc:
url: r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
配置类
用于配置默认链接,创建初始化数据
package wang.datahub.springboot3demo.config;
import io.netty.util.internal.StringUtil;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;
@Configuration
@ConfigurationProperties(prefix = "r2dbc")
public class DBConfig
private String url;
private String user;
private String password;
public String getUrl()
return url;
public void setUrl(String url)
this.url = url;
public String getUser()
return user;
public void setUser(String user)
this.user = user;
public String getPassword()
return password;
public void setPassword(String password)
this.password = password;
@Bean
public ConnectionFactory connectionFactory()
System.out.println("url ==> "+url);
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(url);
ConnectionFactoryOptions.Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(user))
ob = ob.option(USER, user);
if (!StringUtil.isNullOrEmpty(password))
ob = ob.option(PASSWORD, password);
return ConnectionFactories.get(ob.build());
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf)
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Users")
.add("create table Users(" +
"id IDENTITY(1,1)," +
"firstname varchar(80) not null," +
"lastname varchar(80) not null)")
.add("insert into Users(firstname,lastname)" +
"values('Jacky','Li')")
.add("insert into Users(firstname,lastname)" +
"values('Doudou','Li')")
.add("insert into Users(firstname,lastname)" +
"values('Maimai','Li')")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
bean
创建用户bean
package wang.datahub.springboot3demo.bean;
import org.springframework.data.annotation.Id;
public class Users
@Id
private Long id;
private String firstname;
private String lastname;
public Users()
public Users(Long id, String firstname, String lastname)
this.id = id;
this.firstname = firstname;
this.lastname = lastname;
public Long getId()
return id;
public void setId(Long id)
this.id = id;
public String getFirstname()
return firstname;
public void setFirstname(String firstname)
this.firstname = firstname;
public String getLastname()
return lastname;
public void setLastname(String lastname)
this.lastname = lastname;
@Override
public String toString()
return "User" +
"id=" + id +
", firstname='" + firstname + '\\'' +
", lastname='" + lastname + '\\'' +
'';
DAO
dao代码清单如下,包含查询列表、按id查询,以及创建用户等操作
package wang.datahub.springboot3demo.dao;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.relational.core.query.Query;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import wang.datahub.springboot3demo.bean.Users;
import static org.springframework.data.r2dbc.query.Criteria.where;
import static org.springframework.data.relational.core.query.Query.query;
@Component
public class UsersDao
private ConnectionFactory connectionFactory;
private R2dbcEntityTemplate template;
public UsersDao(ConnectionFactory connectionFactory)
this.connectionFactory = connectionFactory;
this.template = new R2dbcEntityTemplate(connectionFactory);
public Mono<Users> findById(long id)
return this.template.selectOne(query(where("id").is(id)),Users.class);
// return Mono.from(connectionFactory.create())
// .flatMap(c -> Mono.from(c.createStatement("select id,firstname,lastname from Users where id = $1")
// .bind("$1", id)
// .execute())
// .doFinally((st) -> close(c)))
// .map(result -> result.map((row, meta) ->
// new Users(row.get("id", Long.class),
// row.get("firstname", String.class),
// row.get("lastname", String.class))))
// .flatMap( p -> Mono.from(p));
public Flux<Users> findAll()
return this.template.select(Users.class).all();
// return Mono.from(connectionFactory.create())
// .flatMap((c) -> Mono.from(c.createStatement("select id,firstname,lastname from users")
// .execute())
// .doFinally((st) -> close(c)))
// .flatMapMany(result -> Flux.from(result.map((row, meta) ->
// Users acc = new Users();
// acc.setId(row.get("id", Long.class));
// acc.setFirstname(row.get("firstname", String.class));
// acc.setLastname(row.get("lastname", String.class));
// return acc;
// )));
public Mono<Users> createAccount(Users account) 以上是关于springboot3+r2dbc——响应式编程实践的主要内容,如果未能解决你的问题,请参考以下文章