使用 Spring boot webflux reactive 记录未持久化到 R2DB

Posted

技术标签:

【中文标题】使用 Spring boot webflux reactive 记录未持久化到 R2DB【英文标题】:Record not persisted into R2DB using Spring boot webflux reactive 【发布时间】:2021-10-02 01:40:22 【问题描述】:

我正在尝试使用响应式构建应用程序。最初我曾经得到正确的响应,但是在修改代码后,记录没有持久化到数据库中,但是当我更改逻辑以修改响应正文时,我看到了成功响应,但在数据库中没有找到记录,我也看不到日志中的错误。

修改前的代码:

public Mono<ServerResponse> createCustomer(ServerRequest serverRequest) 
        return serverRequest.bodyToMono(Customer.class).flatMap(customer -> 
            ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(customerRepository.save(customer), Customer.class);

        );
    

但我想为所有 API 调用返回一个通用 API 响应并修改代码如下:

public Mono<ServerResponse> createCustomer(ServerRequest serverRequest) 
        Response response = new Response();
        return serverRequest.bodyToMono(Customer.class).flatMap(customer -> 
            saveCustomer(customer,apiResponse);
            return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(Mono.just(apiResponse), Response.class);

        ).doOnError(err -> 
                    log.error("Exception while creating customr record", err);
        ).onErrorResume(err ->  
            apiResponse.setError(new Error(err.getMessage(),err.getCause()));
            return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(Mono.just(apiResponse), ApiResponse.class);
        );
    

    public Response saveCustomer(Customer customer,Response apiResponse)
        customerRepository.save(customer);
        apiResponse.setCode("0");
        apiResponse.setMessage("Successfully Created customer");
        return apiResponse;
    

任何想法都欢迎。

【问题讨论】:

【参考方案1】:

我怀疑这个代码 - customerRepository.save(customer);

如果你使用的是 R2DBC,save 方法不会直接保存。它将返回一个发布者类型。必须订阅才能使其工作。否则不会插入记录。

你需要这样做!

customerRepository.save(customer).subscribe();

但是,像这样直接订阅并不是一个好习惯。相反,您应该这样做。

public Mono<Response> saveCustomer(Customer customer,Response apiResponse)
    return customerRepository.save(customer)
             .map(c -> 
                  apiResponse.setCode("0");
                  apiResponse.setMessage("Successfully Created customer");
                  return apiResponse;
              );

然后修改你的

return serverRequest.bodyToMono(Customer.class).flatMap(customer -&gt; .....

这样的代码。

return serverRequest.bodyToMono(Customer.class)
                     .flatMap(customer -> saveCustomer(customer, response))
                     .flatMap(r -> ServerResponse.ok()
                                 .contentType(MediaType.APPLICATION_JSON)
                                 .body(Mono.just(r), Response.class))

【讨论】:

就是这样。但是仍然试图理解为什么 .save 如果记录没有保留,为什么没有抛出任何错误。任何可以解释这一点的文档?我对 webflux 相当陌生,并试图通过构建示例应用程序来获得一些知识。 @vins。感谢您的帮助。 嗨..它不会抛出任何错误。在有人订阅之前,甚至不会执行保存操作。如果我这样说,请不要误会我 - 在你学习 reactive programming 之前不要在 webflux 中做任何事情。您必须首先熟悉反应器库。 vinsguru.com/topics - 检查 webflux 部分 谢谢。如前所述,我只是在学习反应式。感谢您的建议,在进一步了解之前会更好地了解。 我相信你的教程中的例子很清楚主题。感谢分享。【参考方案2】:

您是否使用像 R2DBC 这样的反应式 JDBC 驱动程序,以及反应式存储库?如果不是更好,你应该这样做,因为它会使完整的堆栈反应。 您可以将 R2DBC 驱动程序 maven 和 Spring Data R2DBC 用于 Reactive Repository

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2.RELEASE</version>
</dependency>

然后您将不得不使用使用 r2dbc 连接而不是 JDBC 的连接工厂。

@Bean
  public ConnectionFactory connectionFactory() 
    ConnectionFactory connectionFactory = ConnectionFactories.get(
        "r2dbcs:mysql://localhost:3306/dbname?"+
            "zeroDate=use_round&"+
            "sslMode=disabled");

    return connectionFactory;
  

完成此配置后,您可以从 Spring Data R2DBC 扩展 ReactiveCrudRepository 并创建自己的存储库,如下所示

public interface UserRepository extends ReactiveCrudRepository<User, Long> 

  @Query("SELECT * FROM user WHERE firstname = :firstname")
  Flux<User> findByFirstName(String firstname);

请查看this博客了解更多信息。

【讨论】:

以上是关于使用 Spring boot webflux reactive 记录未持久化到 R2DB的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spring boot + WebFlux 进行全局错误处理

Spring Boot Webflux/Netty - 检测关闭的连接

spring-boot-starter-web 和 spring-boot-starter-webflux 不能一起工作吗?

如何使用 Spring Boot 对 WebFlux 进行异常处理?

如何使用 WebFlux 在 Spring Boot 2 中设置登录页面?

如何使用WebFlux在Spring Boot 2中设置登录页面?