为啥在 Spring 响应式 Mongo 中订阅有效而阻止无效?
Posted
技术标签:
【中文标题】为啥在 Spring 响应式 Mongo 中订阅有效而阻止无效?【英文标题】:Why does subscribe work and block doesn't in Spring reactive Mongo?为什么在 Spring 响应式 Mongo 中订阅有效而阻止无效? 【发布时间】:2018-06-04 03:04:14 【问题描述】:我通过选择Kotlin
、Gradle
、M7
和Web-reactive
,从Spring Initializr 中新建了一个项目。
我做了一个小项目:
data class Person (val id: String)
@Component class PersonHandler(val template: ReactiveMongoTemplate)
init
println("Initializing")
val jim: Mono<Person> = template.save(Person("Jim"))
val john: Mono<Person> = template.save(Person("John"))
val jack: Mono<Person> = template.save(Person("Jack"))
launch(jim)
launch(john)
launch(jack)
println("Finished Initializing")
fun launch(mono: Mono<Person>)
mono.subscribe(println(it.id), println("Error")) // This works
// mono.block() This just hangs
我尝试将三个人保存到数据库中。 save
方法只返回一个需要执行的Mono
。如果我尝试通过简单的订阅来执行它,一切都很好:
Initializing
Finished Initializing
2017-12-21 13:14:39.513 INFO 17278 --- [ Thread-13] org.mongodb.driver.connection : Opened connection [connectionIdlocalValue:3, serverValue:158] to localhost:27017
2017-12-21 13:14:39.515 INFO 17278 --- [ Thread-12] org.mongodb.driver.connection : Opened connection [connectionIdlocalValue:4, serverValue:159] to localhost:27017
2017-12-21 13:14:39.520 INFO 17278 --- [ Thread-14] org.mongodb.driver.connection : Opened connection [connectionIdlocalValue:5, serverValue:160] to localhost:27017
Jim
Jack
John
但是,当我使用 block
而不是 subscribe
时,应用程序挂起:
Initializing
2017-12-21 13:16:47.200 INFO 17463 --- [ Thread-14] org.mongodb.driver.connection : Opened connection [connectionIdlocalValue:3, serverValue:163] to localhost:27017
如果我手动查询数据库,我看到 Jim 已保存,但 Jack 和 John 未保存。
这是一个错误,还是我做错了什么?我想保证用户在代码进一步运行之前就在数据库中,所以我真的很想使用block
。
我不确定它是否相关,但我收到编译器警告
在构造函数中访问非最终属性
template
有一个最小的工作示例。它包含两个分支。一种是解决该问题的方法。
https://github.com/martin-drozdik/spring-mongo-bug-example
【问题讨论】:
【参考方案1】:我认为这可能是 Spring Framework 错误/可用性问题。
首先,让我强调一下subscribe
和block
之间的区别:
subscribe
方法启动工作并立即返回。因此,当您的应用程序的其他部分运行时,您无法保证操作已完成。
block
是一个阻塞操作:它触发操作并等待其完成。
对于初始化工作,组合操作和调用块一次可能是最好的选择:
val jim: Mono<Person> = template.save(Person("Jim"))
val john: Mono<Person> = template.save(Person("John"))
val jack: Mono<Person> = template.save(Person("Jack"))
jim.then(john).then(jack).block();
正如您所说,使用block
会挂起应用程序。我怀疑这可能是 Spring 上下文初始化问题 - 如果我没记错的话,这个过程可能会在某些部分假设一个线程,并使用反应式管道安排在许多线程上工作。
您能否创建一个最小的示例应用程序(仅使用 Java/Spring Boot/Spring Data Reactive Mongo)并在 https://jira.spring.io 上报告?
【讨论】:
感谢您的回答!我已经在 Github github.com/spring-projects/spring-boot/issues/… 上打开了一个问题,在 github.com/martin-drozdik/spring-mongo-bug-example 上使用 MWE(也带有解决方法分支),但它已关闭。开发人员告诉我,将它放在 SO 上就足够了。我应该把它发布到 Jira 上吗? 是的,我认为这是一个 Spring Framework 问题,而不是 Spring Boot 问题。所以我给你的 Jira 链接应该没问题。【参考方案2】:我遇到过类似的情况,通过调用“reactiveMongoTemplate.save(model).block()”,应用程序挂起。
问题是由我的一个类中的@PostConstruct 引起的,该类旨在在应用程序初始化后创建我的系统用户。我认为它是在完整的 Spring 上下文初始化之前被调用的。
@Configuration
public class InitialDataPostLoader
private Logger logger = LogManager.getLogger(this.getClass());
@PostConstruct
public void init()
logger.info(String.format(MSG_SERVICE_JOB, "System Metadata initialization"));
createDefaultUsers();
通过将@PostConstruct 替换为 ContextRefreshEvent 监听器,问题得到了解决。
@Configuration
public class InitialDataPostLoader implements
ApplicationListener<ContextRefreshedEvent>
private Logger logger = LogManager.getLogger(this.getClass());
@Override
public void onApplicationEvent(ContextRefreshedEvent arg0)
logger.info(String.format(MSG_SERVICE_JOB, "System Metadata initialization"));
createDefaultUsers();
【讨论】:
我认为这与问题无关,但已经解决了我的问题。以上是关于为啥在 Spring 响应式 Mongo 中订阅有效而阻止无效?的主要内容,如果未能解决你的问题,请参考以下文章
基于 Spring WebFlux/Reactive Mongo 的应用程序打开到 mongo db 的多个连接
(19)Reactor Processors——响应式Spring的道法术器
(18)Hot vs Cold——响应式Spring的道法术器