为啥在 Spring 响应式 Mongo 中订阅有效而阻止无效?

Posted

技术标签:

【中文标题】为啥在 Spring 响应式 Mongo 中订阅有效而阻止无效?【英文标题】:Why does subscribe work and block doesn't in Spring reactive Mongo?为什么在 Spring 响应式 Mongo 中订阅有效而阻止无效? 【发布时间】:2018-06-04 03:04:14 【问题描述】:

我通过选择KotlinGradleM7Web-reactive,从Spring Initializr 中新建了一个项目。

我做了一个小项目:

data class Person (val id: String)

@Component class PersonHandler(val template: ReactiveMongoTemplate) 

    init
    
        println("Initializing")

        val jim: Mono<Person> =  template.save(Person("Jim"))
        val john: Mono<Person> = template.save(Person("John"))
        val jack: Mono<Person> = template.save(Person("Jack"))

        launch(jim)
        launch(john)
        launch(jack)

        println("Finished Initializing")
    

    fun launch(mono: Mono<Person>)
    
        mono.subscribe(println(it.id), println("Error")) // This works
        // mono.block()  This just hangs
     

我尝试将三个人保存到数据库中。 save 方法只返回一个需要执行的Mono。如果我尝试通过简单的订阅来执行它,一切都很好:

Initializing
Finished Initializing
2017-12-21 13:14:39.513  INFO 17278 --- [      Thread-13] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:3, serverValue:158] to localhost:27017
2017-12-21 13:14:39.515  INFO 17278 --- [      Thread-12] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:4, serverValue:159] to localhost:27017
2017-12-21 13:14:39.520  INFO 17278 --- [      Thread-14] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:5, serverValue:160] to localhost:27017
Jim
Jack
John

但是,当我使用 block 而不是 subscribe 时,应用程序挂起:

Initializing
2017-12-21 13:16:47.200  INFO 17463 --- [      Thread-14] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:3, serverValue:163] to localhost:27017

如果我手动查询数据库,我看到 Jim 已保存,但 Jack 和 John 未保存。

这是一个错误,还是我做错了什么?我想保证用户在代码进一步运行之前就在数据库中,所以我真的很想使用block

我不确定它是否相关,但我收到编译器警告

在构造函数中访问非最终属性template

有一个最小的工作示例。它包含两个分支。一种是解决该问题的方法。

https://github.com/martin-drozdik/spring-mongo-bug-example

【问题讨论】:

【参考方案1】:

我认为这可能是 Spring Framework 错误/可用性问题。

首先,让我强调一下subscribeblock 之间的区别:

subscribe 方法启动工作并立即返回。因此,当您的应用程序的其他部分运行时,您无法保证操作已完成。 block 是一个阻塞操作:它触发操作并等待其完成。

对于初始化工作,组合操作和调用块一次可能是最好的选择:

val jim: Mono<Person> =  template.save(Person("Jim"))
val john: Mono<Person> = template.save(Person("John"))
val jack: Mono<Person> = template.save(Person("Jack"))
jim.then(john).then(jack).block();

正如您所说,使用block 会挂起应用程序。我怀疑这可能是 Spring 上下文初始化问题 - 如果我没记错的话,这个过程可能会在某些部分假设一个线程,并使用反应式管道安排在许多线程上工作。

您能否创建一个最小的示例应用程序(仅使用 Java/Spring Boot/Spring Data Reactive Mongo)并在 https://jira.spring.io 上报告?

【讨论】:

感谢您的回答!我已经在 Github github.com/spring-projects/spring-boot/issues/… 上打开了一个问题,在 github.com/martin-drozdik/spring-mongo-bug-example 上使用 MWE(也带有解决方法分支),但它已关闭。开发人员告诉我,将它放在 SO 上就足够了。我应该把它发布到 Jira 上吗? 是的,我认为这是一个 Spring Framework 问题,而不是 Spring Boot 问题。所以我给你的 Jira 链接应该没问题。【参考方案2】:

我遇到过类似的情况,通过调用“reactiveMongoTemplate.save(model).block()”,应用程序挂起。

问题是由我的一个类中的@PostConstruct 引起的,该类旨在在应用程序初始化后创建我的系统用户。我认为它是在完整的 Spring 上下文初始化之前被调用的。

@Configuration
public class InitialDataPostLoader  
    private Logger logger = LogManager.getLogger(this.getClass());


    @PostConstruct
    public void init() 
        logger.info(String.format(MSG_SERVICE_JOB, "System Metadata initialization"));
        createDefaultUsers();
    

通过将@PostConstruct 替换为 ContextRefreshEvent 监听器,问题得到了解决。

@Configuration 
public class InitialDataPostLoader implements
 ApplicationListener<ContextRefreshedEvent> 
     private Logger logger = LogManager.getLogger(this.getClass());

     @Override
     public void onApplicationEvent(ContextRefreshedEvent arg0) 

         logger.info(String.format(MSG_SERVICE_JOB, "System Metadata initialization"));
         createDefaultUsers();

     

【讨论】:

我认为这与问题无关,但已经解决了我的问题。

以上是关于为啥在 Spring 响应式 Mongo 中订阅有效而阻止无效?的主要内容,如果未能解决你的问题,请参考以下文章

基于 Spring WebFlux/Reactive Mongo 的应用程序打开到 mongo db 的多个连接

(19)Reactor Processors——响应式Spring的道法术器

(18)Hot vs Cold——响应式Spring的道法术器

为啥 Spring 不为关系数据库提供响应式(非阻塞)客户端?

CTO 首次分享|为啥我选择 Spring 响应式编程?

Spring 响应式编程,真香!!!