Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时不会将数据持久化/保存到 Postgres 数据库
Posted
技术标签:
【中文标题】Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时不会将数据持久化/保存到 Postgres 数据库【英文标题】:Spring Batch/Data JPA application not persisting/saving data to Postgres database when calling JPA repository (save, saveAll) methods 【发布时间】:2021-08-10 08:22:02 【问题描述】:我快要崩溃了。到目前为止,我无休止地阅读/搜索并尝试了所有具有此类似问题的 google/*** 帖子的解决方案(有很多)。有些看起来很有希望,但对我来说还没有任何效果;尽管我已经取得了一些进展,并且我相信我走在正确的轨道上(我相信此时它与事务管理器有关,并且可能与 Spring Batch 与 Spring Data JPA 发生冲突)。
参考资料:
-
Spring boot repository does not save to the DB if called from scheduled job
JpaItemWriter: no transaction is in progress
与上述帖子类似,我有一个使用 Spring Batch 和 Spring Data JPA 的 Spring Boot 应用程序。它从 .csv
文件中读取逗号分隔的数据,然后进行一些处理/转换,并尝试使用 JPA 存储库方法持久化/保存到数据库,特别是这里 .saveAll()
(我也尝试过 @ 987654325@ 方法和这做同样的事情),因为我正在保存用户定义数据类型的List<MyUserDefinedDataType>
(批量插入)。
现在,我的代码在 Spring Boot 启动器 1.5.9.RELEASE
上运行良好,但我最近尝试升级到 2.XX,经过无数小时的调试,我发现只有版本 2.2.0.RELEASE
会持久/保存数据到数据库。所以升级到 >= 2.2.1.RELEASE
会破坏持久性。从.csv
中读取的所有内容都很好,就在代码流第一次遇到像.save()
.saveAll()
这样的JPA 存储库方法时,应用程序继续运行,但没有任何东西被持久化。我还注意到 Hikari 池日志"active=1 idle=4"
,但是当我在版本1.5.9.RELEASE
上查看相同的日志时,它在持久化数据后立即显示active=0 idle=5
,因此应用程序肯定挂起。我进入调试器,甚至在跳转到存储库调用后看到,它通过 Spring AOP 库等(所有第三方)进入了几乎无限循环,我不相信会回到真正的应用程序/业务逻辑我写的。
3c22fb53ed64 2021-05-20 23:53:43.909 DEBUG
[HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=5, active=1, idle=4, waiting=0)
无论如何,我尝试了对其他人有用的最常见的解决方案:
-
定义
JpaTransactionManager
@Bean
并将其注入Step
函数,同时使用PlatformTransactionManager
保留JobRepository
。这没有没有工作。然后我也尝试在 JobRepository @Bean
中使用JpaTransactionManager
,这也没有不工作。
在我的应用程序中定义@RestController
端点以手动触发此作业,而不是从我的主Application.java
类手动执行。 (我会在下面详细讨论)。根据我在上面发布的一篇文章,即使在 spring >= 2.2.1 上,数据也能正确地保存到数据库中,我进一步怀疑 Spring Batch 持久性/实体/事务管理器的某些东西已经搞砸了。
代码基本上是这样的: BatchConfiguration.java
@Configuration
@EnableBatchProcessing
@Import(DatabaseConfiguration.class)
public class BatchConfiguration
// Datasource is a Postgres DB defined in separate IntelliJ project that I add to my pom.xml
DataSource dataSource;
@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource)
this.dataSource = dataSource;
@Bean
@Primary
public JpaTransactionManager jpaTransactionManager()
final JpaTransactionManager tm = new JpaTransactionManager();
tm.setDataSource(dataSource);
return tm;
@Bean
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType("POSTGRES");
return jobRepositoryFactoryBean.getObject();
@Bean
public JobLauncher jobLauncher(JobRepository jobRepository)
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
return simpleJobLauncher;
@Bean(name = "jobToLoadTheData")
public Job jobToLoadTheData()
return jobBuilderFactory.get("jobToLoadTheData")
.start(stepToLoadData())
.listener(new CustomJobListener())
.build();
@Bean
@StepScope
public TaskExecutor taskExecutor()
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(maxThreads);
threadPoolTaskExecutor.setThreadGroupName("taskExecutor-batch");
return threadPoolTaskExecutor;
@Bean(name = "stepToLoadData")
public Step stepToLoadData()
TaskletStep step = stepBuilderFactory.get("stepToLoadData")
.transactionManager(jpaTransactionManager())
.<List<FieldSet>, List<myCustomPayloadRecord>>chunk(chunkSize)
.reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
.processor(myCustomPayloadRecordItemProcessor())
.writer(myCustomerWriter())
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.skip(DataValidationException.class)
.listener(new CustomReaderListener())
.listener(new CustomProcessListener())
.listener(new CustomWriteListener())
.listener(new CustomSkipListener())
.taskExecutor(taskExecutor())
.throttleLimit(maxThreads)
.build();
step.registerStepExecutionListener(stepExecutionListener());
step.registerChunkListener(new CustomChunkListener());
return step;
我的主要方法: Application.java
@Autowired
@Qualifier("jobToLoadTheData")
private Job loadTheData;
@Autowired
private JobLauncher jobLauncher;
@PostConstruct
public void launchJob () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException
JobParameters parameters = (new JobParametersBuilder()).addDate("random", new Date()).toJobParameters();
jobLauncher.run(loadTheData, parameters);
public static void main(String[] args)
SpringApplication.run(Application.class, args);
现在,通常我正在从 Amazon S3 存储桶中读取此 .csv
,但由于我在本地进行测试,因此我只是将 .csv 放在项目目录中并通过触发 @987654349 中的作业直接读取它@main 类(如上所示)。另外,我确实在这个BatchConfiguration
类中定义了一些其他bean,但我不想让这篇文章变得过于复杂,而且从我所做的谷歌搜索来看,问题可能出在我发布的方法上(希望如此)。
另外,我想指出,类似于 Google/*** 上的其他帖子之一,用户遇到类似问题,我创建了一个 @RestController
端点,它简单地调用 .run()
方法 JobLauncher
我传入JobToLoadTheData
Bean,它会触发批量插入。你猜怎么了? 数据可以很好地保存到数据库中,即使在 spring >= 2.2.1。
这里发生了什么?这是一个线索吗?某种类型的实体或事务管理器是否有问题?我会接受任何建议!我可以提供你们可能需要的更多信息,所以请尽管询问。
【问题讨论】:
【参考方案1】:您正在定义一个JobRepository
类型的bean,并期望它被Spring Batch 拾取。这是不正确的。您需要提供BatchConfigurer
并覆盖getJobRepository
。这在reference documentation中有解释:
You can customize any of these beans by creating a custom implementation of the
BatchConfigurer interface. Typically, extending the DefaultBatchConfigurer
(which is provided if a BatchConfigurer is not found) and overriding the required
getter is sufficient.
这也记录在@EnableBatchProcessing
的Javadoc 中。所以在你的情况下,你需要定义一个Batchconfigurer
类型的bean并覆盖getJobRepository
和getTransactionManager
,类似于:
@Bean
public BatchConfigurer batchConfigurer(EntityManagerFactory entityManagerFactory, DataSource dataSource)
return new DefaultBatchConfigurer(dataSource)
@Override
public PlatformTransactionManager getTransactionManager()
return new JpaTransactionManager(entityManagerFactory);
@Override
public JobRepository getJobRepository()
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
// set other properties
return jobRepositoryFactoryBean.getObject();
;
在 Spring Boot 上下文中,如果需要,您还可以覆盖 org.springframework.boot.autoconfigure.batch.JpaBatchConfigurer
的 createTransactionManager
和 createJobRepository
方法。
【讨论】:
我已经为您的帖子添加了“BatchConfigurer”bean,但它不起作用。然后我删除了它,并让我在上面发布的“BatchConfiguration”类扩展了“DefaultBatchConfigurer”,然后我以这种方式覆盖了这些方法,但这不起作用。我尝试了与 PlatformTransactionManager 不同的事务管理器,并在 Step 与 JobRepository 中使用了不同的事务管理器,并且在 Spring >= 2.2.1 上几乎没有任何工作(它在 Spring 2.2.0 上工作正常)。 我对您关于 JobRepository 没有被我的 Spring 拾取的评论感到有些困惑,因为我已经完全按照这种方式定义了它,并且在 spring boot starter 2.2.0 上,数据可以很好地保存到数据库中?不需要定义任何 BatchConfigurer 匿名类或扩展任何类?我现在正在浏览文档..... 另外,根据文档“配置 JobRepository 使用 @EnableBatchProcessing 时,会为您提供开箱即用的 JobRepository。本节介绍如何配置您自己的。”这进一步解释了为什么我上面发布的配置确实有效,并且 JobRepository bean 定义为我所做的...... 请提供重现问题的minimal complete example。 嗨@Mahmoud Ben Hassine,我根据您的要求制作了一个非常干净且最小化的MVP/POC - github.com/alpizano/spring-batch-data-jpa-persistence-issue-mvp您能帮忙吗?我为 Postgres 制作了一个 Spring Profile,您可以运行 -Dspring.profiles.active=postgres,我现在将添加到 README.md 中以上是关于Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时不会将数据持久化/保存到 Postgres 数据库的主要内容,如果未能解决你的问题,请参考以下文章
Spring Data JPA在Spring Boot中的应用
运行 Spring Boot 简单应用程序时找不到 Oracle 驱动程序