Spring Data R2DBC 响应式数据库操作使用
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Data R2DBC 响应式数据库操作使用相关的知识,希望对你有一定的参考价值。
一、R2DBC
R2DBC(Reactive Relational Database Connectivity)是在 2018 年 Spring One Platform 大会被提出来的,它旨在使用完全无阻塞驱动程序创建数据库链接,为 SQL 数据库创建响应式 API。
Spring Data R2DBC项目是Spring Data家族的一部分,可轻松实现基于R2DBC的存储库。R2DBC(Reactive Relational Database Connectivity)是一个使用反应式驱动集成关系数据库的孵化器。Spring Data R2DBC运用熟悉的Spring抽象和repository 支持R2DBC。基于此,在响应式程序栈上使用关系数据访问技术,构建由Spring驱动的程序将变得非常简单。
目前,Spring Data R2DBC 支持 Postgres、H2、Microsoft SQL Server 、mysql、MariaDB。
本篇文章基于 MySql ,做相关演示。
二、Spring Data R2DBC 使用
首先创建一个 SpringBoot 项目,确保 Spring Boot 的版本大于等于 2.3.0,在此版本之后才开始支持 MYSQL 的响应式驱动。
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
<!--mysql 驱动-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
配置文件添加mysql的连接:
spring.r2dbc.name= r2dbc
spring.r2dbc.url= r2dbcs:mysql://127.0.0.1:3306/testdb?serverTimezone=GMT%2B8
spring.r2dbc.username=root
spring.r2dbc.password=root
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.validation-query= SELECT 1
引入上面的配置后,r2dbc 的环境就已经搭建完成了,使用 r2dbc 操作数据库,有多种方式,其中可以通过 ConnectionFactory 、DatabaseClient、R2dbcEntityTemplate、ReactiveCrudRepository
,都可以进行数据库的操作。下面对这几种方式逐一进行介绍使用。
在操作之前,现在数据库中创建一张测试表,下面就针对该表进行操作:
CREATE TABLE `user` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
`mail` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
再创建一个实体类:
@Data
public class UserEntity
private Long id;
private String name;
private String phone;
private String mail;
1. ConnectionFactory
在配置好上述环境后,直接就可以注入 ConnectionFactory
对象:
@Autowired
ConnectionFactory connectionFactory;
写入数据:
Mono.from(connectionFactory.create())
.flatMap(connection -> Mono.from(connection.createStatement("INSERT INTO `testdb`.`user`(`name`, `phone`, `mail`) VALUES ( ?name, ?phone, ?mail)")
.bind("name", "张三")
.bind("phone", "111110")
.bind("mail", "111110@qq.com")
.returnGeneratedValues("id")
.execute())
.map(result -> result.map((row, meta) -> row.get("id")))
.flatMap(id -> Mono.from(id))
).subscribe(id -> System.out.println("写入:" + id));
上面只是一个简单的写入,在项目中,有可能还有其他表的操作,或者其他业务逻辑,这种情况肯定需要事物,来保证数据的一致性了,下面对上面的操作加上事物:
在程序中故意加一个 int a = 1 / 0
来模拟异常:
Mono.from(connectionFactory.create())
.flatMap(connection -> Mono.from(connection.beginTransaction())
.then(Mono.from(connection.createStatement("INSERT INTO `testdb`.`user`(`name`, `phone`, `mail`) VALUES ( ?name, ?phone, ?mail)")
.bind("name", "张三")
.bind("phone", "111110")
.bind("mail", "111110@qq.com")
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) -> row.get("id")))
.flatMap(id -> Mono.from(id))
.flatMap(id -> Mono.fromCallable(() ->
//其他业务逻辑
System.out.println("写入的ID:" + id);
int a = 1 / 0;
return id;
))
.delayUntil(i -> connection.commitTransaction())
.doOnError(e ->
System.out.println("异常回滚事物:" + e.getMessage());
connection.rollbackTransaction();
)
).subscribe(id -> System.out.println("写入:" + id));
可以看到,ID为24的并没有写入数据库中。
下面将 int a = 1 / 0
去除:
ID为26的也写入了数据库中。
查询数据
Mono.from(connectionFactory.create())
.flatMap(connection -> Mono.from(
connection.createStatement("SELECT id,name,phone,mail FROM user WHERE name = ?name")
.bind("name", "张三")
.execute())
).flatMapMany(result -> Flux.from(result.map((row, meta) ->
UserEntity entity = new UserEntity();
entity.setId(Long.parseLong(String.valueOf(row.get("id"))));
entity.setName(String.valueOf(row.get("name")));
entity.setPhone(String.valueOf(row.get("phone")));
entity.setMail(String.valueOf(row.get("mail")));
return entity;
))).subscribe(u -> System.out.println(u.toString()));
2. DatabaseClient
上面可以看到 ConnectionFactory
提供的 api 非常全,但使用起来有些麻烦,相比 DatabaseClient 就简单许多了。同样配置好环境后直接注入:
@Autowired
DatabaseClient databaseClient;
修改下上面的实体类,加上几个注解,标识出表和字段:
@Data
@Table("user")
public class UserEntity
@Id
private Long id;
@Column("name")
private String name;
private String phone;
private String mail;
执行SQL
databaseClient.execute("SELECT * FROM user")
.as(UserEntity.class)
.fetch()
.all()
.subscribe(System.out::println);
相比之下确实简化了许多吧,不过DatabaseClient还为常用操作提供了针对的方法,比如:
写入数据
UserEntity entity = new UserEntity();
entity.setName("王五");
entity.setPhone("11111");
entity.setMail("111@qq.com");
databaseClient.insert()
.into(UserEntity.class)
.using(entity)
.fetch().all()
.subscribe(map -> System.out.println("写入ID:" + map.get("LAST_INSERT_ID")));
查询数据
databaseClient.
select().
from(UserEntity.class)
.matching(Criteria.where("name").is("李四").and(Criteria.where("phone").is("111110")))
.orderBy(Sort.Order.desc("id"))
.fetch()
.all()
.subscribe(System.out::println);
更新数据
databaseClient.update()
.table("user")
.using(Update.update("name", "小王").set("phone", "110"))
.matching(Criteria.where("name").is("张三").and(Criteria.where("phone").is("111110")))
.fetch()
.rowsUpdated()
.subscribe(i -> System.out.println("更新个数:" + i));
删除数据
databaseClient.delete()
.from("user")
.matching(Criteria.where("name").is("小王"))
.fetch()
.rowsUpdated()
.subscribe(i -> System.out.println("删除个数:" + i));
3. R2dbcEntityTemplate
R2dbcEntityTemplate 的使用方式和 DatabaseClient 差不多,但创建 R2dbcEntityTemplate 需要一个 DatabaseClient 对象:
@Configuration
public class R2dbcConfig
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient)
return new R2dbcEntityTemplate(databaseClient);
写入数据
UserEntity entity = new UserEntity();
entity.setName("王五");
entity.setPhone("11111");
entity.setMail("111@qq.com");
r2dbcEntityTemplate.insert(entity).subscribe(e-> System.out.println(e));
可以看出又比 DatabaseClient 简化了些。
查询数据
r2dbcEntityTemplate
.select(Query.query(Criteria.where("name").is("李四").and(Criteria.where("phone").is("111110"))),UserEntity.class)
.subscribe(e-> System.out.println(e));
修改数据
r2dbcEntityTemplate.update(
Query.query(Criteria.where("name").is("李四").and(Criteria.where("phone").is("111110"))),
Update.update("name", "张三").set("phone", "22220"),
UserEntity.class)
.subscribe(i -> System.out.println("更新数:" + i));
删除数据
r2dbcEntityTemplate.delete(
Query.query(Criteria.where("name").is("张三").and(Criteria.where("phone").is("22220"))),
UserEntity.class)
.subscribe(i -> System.out.println("删除数:" + i));
4. ReactiveCrudRepository
前面的使用都是基于提供的 api 进行调用,有使用过mybatis-plus,应该知道,经常用一个 BaseMapper类,帮我们实现了基本的操作,同时也可以在自己的Mapper 下,用 Mybatis 的注解进行操作,这里的ReactiveCrudRepository 也可以实现类似的功能。
新建一个 Dao ,实现 ReactiveCrudRepository
@Repository
public interface UserDao extends ReactiveCrudRepository<UserEntity, Long>
@Query("SELECT * FROM user WHERE id = :id")
Flux<UserEntity> findDateById(Long id);
@Query("UPDATE user SET name = :name WHERE id = :id")
Mono<Integer> updateNameById(String name, Long id);
我们可以在其中 使用@Query
执行特定的SQL,同时ReactiveCrudRepository
提供给我们的有:
基本的操作都还是有的:
写入数据
UserEntity entity = new UserEntity();
entity.setName("王五");
entity.setPhone("11111");
entity.setMail("111@qq.com");
userDao.save(entity).subscribe(e-> System.out.println(e));
使用自己的SQL查询
userDao.findDateById(36L).subscribe(e-> System.out.println(e));
喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!
以上是关于Spring Data R2DBC 响应式数据库操作使用的主要内容,如果未能解决你的问题,请参考以下文章
是时候考虑Spring非阻塞编程模式?R2DBC pk JDBC 和 WebFlux pk Web MVC 评测数据