Spring Data R2DBC 响应式数据库操作使用

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Data R2DBC 响应式数据库操作使用相关的知识,希望对你有一定的参考价值。

一、R2DBC

R2DBC(Reactive Relational Database Connectivity)是在 2018 年 Spring One Platform 大会被提出来的,它旨在使用完全无阻塞驱动程序创建数据库链接,为 SQL 数据库创建响应式 API。

Spring Data R2DBC项目是Spring Data家族的一部分,可轻松实现基于R2DBC的存储库。R2DBC(Reactive Relational Database Connectivity)是一个使用反应式驱动集成关系数据库的孵化器。Spring Data R2DBC运用熟悉的Spring抽象和repository 支持R2DBC。基于此,在响应式程序栈上使用关系数据访问技术,构建由Spring驱动的程序将变得非常简单。

目前,Spring Data R2DBC 支持 Postgres、H2、Microsoft SQL Server 、mysql、MariaDB。

本篇文章基于 MySql ,做相关演示。

二、Spring Data R2DBC 使用

首先创建一个 SpringBoot 项目,确保 Spring Boot 的版本大于等于 2.3.0,在此版本之后才开始支持 MYSQL 的响应式驱动。

引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<!--   连接池 -->
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
</dependency>

<!--mysql 驱动-->
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
</dependency>

配置文件添加mysql的连接:

spring.r2dbc.name= r2dbc
spring.r2dbc.url= r2dbcs:mysql://127.0.0.1:3306/testdb?serverTimezone=GMT%2B8
spring.r2dbc.username=root
spring.r2dbc.password=root
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.validation-query= SELECT 1

引入上面的配置后,r2dbc 的环境就已经搭建完成了,使用 r2dbc 操作数据库,有多种方式,其中可以通过 ConnectionFactory 、DatabaseClient、R2dbcEntityTemplate、ReactiveCrudRepository,都可以进行数据库的操作。下面对这几种方式逐一进行介绍使用。

在操作之前,现在数据库中创建一张测试表,下面就针对该表进行操作:

CREATE TABLE `user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `mail` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

再创建一个实体类:

@Data
public class UserEntity 
    private Long id;
    private String name;
    private String phone;
    private String mail;

1. ConnectionFactory

在配置好上述环境后,直接就可以注入 ConnectionFactory 对象:

@Autowired
ConnectionFactory connectionFactory;

写入数据:

Mono.from(connectionFactory.create())
        .flatMap(connection -> Mono.from(connection.createStatement("INSERT INTO `testdb`.`user`(`name`, `phone`, `mail`) VALUES ( ?name, ?phone, ?mail)")
                .bind("name", "张三")
                .bind("phone", "111110")
                .bind("mail", "111110@qq.com")
                .returnGeneratedValues("id")
                .execute())
                .map(result -> result.map((row, meta) -> row.get("id")))
                .flatMap(id -> Mono.from(id))
        ).subscribe(id -> System.out.println("写入:" + id));



上面只是一个简单的写入,在项目中,有可能还有其他表的操作,或者其他业务逻辑,这种情况肯定需要事物,来保证数据的一致性了,下面对上面的操作加上事物:

在程序中故意加一个 int a = 1 / 0 来模拟异常:

Mono.from(connectionFactory.create())
        .flatMap(connection -> Mono.from(connection.beginTransaction())
                .then(Mono.from(connection.createStatement("INSERT INTO `testdb`.`user`(`name`, `phone`, `mail`) VALUES ( ?name, ?phone, ?mail)")
                        .bind("name", "张三")
                        .bind("phone", "111110")
                        .bind("mail", "111110@qq.com")
                        .returnGeneratedValues("id")
                        .execute()))
                .map(result -> result.map((row, meta) -> row.get("id")))
                .flatMap(id -> Mono.from(id))
                .flatMap(id -> Mono.fromCallable(() -> 
                    //其他业务逻辑
                    System.out.println("写入的ID:" + id);
                    int a = 1 / 0;
                    return id;
                ))
                .delayUntil(i -> connection.commitTransaction())
                .doOnError(e -> 
                    System.out.println("异常回滚事物:" + e.getMessage());
                    connection.rollbackTransaction();
                )
        ).subscribe(id -> System.out.println("写入:" + id));



可以看到,ID为24的并没有写入数据库中。

下面将 int a = 1 / 0去除:


ID为26的也写入了数据库中。

查询数据

Mono.from(connectionFactory.create())
        .flatMap(connection -> Mono.from(
                connection.createStatement("SELECT id,name,phone,mail FROM user WHERE name = ?name")
                        .bind("name", "张三")
                        .execute())

        ).flatMapMany(result -> Flux.from(result.map((row, meta) -> 
            UserEntity entity = new UserEntity();
            entity.setId(Long.parseLong(String.valueOf(row.get("id"))));
            entity.setName(String.valueOf(row.get("name")));
            entity.setPhone(String.valueOf(row.get("phone")));
            entity.setMail(String.valueOf(row.get("mail")));
            return entity;
        ))).subscribe(u -> System.out.println(u.toString()));

2. DatabaseClient

上面可以看到 ConnectionFactory 提供的 api 非常全,但使用起来有些麻烦,相比 DatabaseClient 就简单许多了。同样配置好环境后直接注入:

 @Autowired
 DatabaseClient databaseClient;

修改下上面的实体类,加上几个注解,标识出表和字段:

@Data
@Table("user")
public class UserEntity 
    @Id
    private Long id;
    @Column("name")
    private String name;
    private String phone;
    private String mail;

执行SQL

databaseClient.execute("SELECT * FROM user")
        .as(UserEntity.class)
        .fetch()
        .all()
        .subscribe(System.out::println);


相比之下确实简化了许多吧,不过DatabaseClient还为常用操作提供了针对的方法,比如:

写入数据

UserEntity entity = new UserEntity();
entity.setName("王五");
entity.setPhone("11111");
entity.setMail("111@qq.com");

databaseClient.insert()
        .into(UserEntity.class)
        .using(entity)
        .fetch().all()
        .subscribe(map -> System.out.println("写入ID:" + map.get("LAST_INSERT_ID")));

查询数据

databaseClient.
        select().
        from(UserEntity.class)
        .matching(Criteria.where("name").is("李四").and(Criteria.where("phone").is("111110")))
        .orderBy(Sort.Order.desc("id"))
        .fetch()
        .all()
        .subscribe(System.out::println);


更新数据

databaseClient.update()
        .table("user")
        .using(Update.update("name", "小王").set("phone", "110"))
        .matching(Criteria.where("name").is("张三").and(Criteria.where("phone").is("111110")))
        .fetch()
        .rowsUpdated()
        .subscribe(i -> System.out.println("更新个数:" + i));


删除数据

databaseClient.delete()
        .from("user")
        .matching(Criteria.where("name").is("小王"))
        .fetch()
        .rowsUpdated()
        .subscribe(i -> System.out.println("删除个数:" + i));

3. R2dbcEntityTemplate

R2dbcEntityTemplate 的使用方式和 DatabaseClient 差不多,但创建 R2dbcEntityTemplate 需要一个 DatabaseClient 对象:

@Configuration
public class R2dbcConfig 
    @Bean
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) 
        return new R2dbcEntityTemplate(databaseClient);
    

写入数据

UserEntity entity = new UserEntity();
entity.setName("王五");
entity.setPhone("11111");
entity.setMail("111@qq.com");

r2dbcEntityTemplate.insert(entity).subscribe(e-> System.out.println(e));


可以看出又比 DatabaseClient 简化了些。

查询数据

r2dbcEntityTemplate
        .select(Query.query(Criteria.where("name").is("李四").and(Criteria.where("phone").is("111110"))),UserEntity.class)
        .subscribe(e-> System.out.println(e));


修改数据

r2dbcEntityTemplate.update(
        Query.query(Criteria.where("name").is("李四").and(Criteria.where("phone").is("111110"))),
        Update.update("name", "张三").set("phone", "22220"),
        UserEntity.class)
        .subscribe(i -> System.out.println("更新数:" + i));


删除数据

r2dbcEntityTemplate.delete(
        Query.query(Criteria.where("name").is("张三").and(Criteria.where("phone").is("22220"))),
        UserEntity.class)
        .subscribe(i -> System.out.println("删除数:" + i));

4. ReactiveCrudRepository

前面的使用都是基于提供的 api 进行调用,有使用过mybatis-plus,应该知道,经常用一个 BaseMapper类,帮我们实现了基本的操作,同时也可以在自己的Mapper 下,用 Mybatis 的注解进行操作,这里的ReactiveCrudRepository 也可以实现类似的功能。

新建一个 Dao ,实现 ReactiveCrudRepository

@Repository
public interface UserDao extends ReactiveCrudRepository<UserEntity, Long> 

    @Query("SELECT * FROM user WHERE id = :id")
    Flux<UserEntity> findDateById(Long id);

    @Query("UPDATE user SET name = :name WHERE id = :id")
    Mono<Integer> updateNameById(String name, Long id);

我们可以在其中 使用@Query执行特定的SQL,同时ReactiveCrudRepository提供给我们的有:

基本的操作都还是有的:

写入数据

 UserEntity entity = new UserEntity();
 entity.setName("王五");
 entity.setPhone("11111");
 entity.setMail("111@qq.com");
 
 userDao.save(entity).subscribe(e-> System.out.println(e));


使用自己的SQL查询

userDao.findDateById(36L).subscribe(e-> System.out.println(e));



喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!

以上是关于Spring Data R2DBC 响应式数据库操作使用的主要内容,如果未能解决你的问题,请参考以下文章

响应式MySql交互

是时候考虑Spring非阻塞编程模式?R2DBC pk JDBC 和 WebFlux pk Web MVC 评测数据

#yyds干货盘点#Spring认证中国教育管理中心-Spring Data R2DBC框架教程六

Spring Data(数据)R2DBC

springboot3+r2dbc——响应式编程实践

springboot3+r2dbc——响应式编程实践