Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时不会将数据持久化/保存到 Postgres 数据库

Posted

技术标签:

【中文标题】Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时不会将数据持久化/保存到 Postgres 数据库【英文标题】:Spring Batch/Data JPA application not persisting/saving data to Postgres database when calling JPA repository (save, saveAll) methods 【发布时间】:2021-08-10 08:22:02 【问题描述】:

我快要崩溃了。到目前为止,我无休止地阅读/搜索并尝试了所有具有此类似问题的 google/*** 帖子的解决方案(有很多)。有些看起来很有希望,但对我来说还没有任何效果;尽管我已经取得了一些进展,并且我相信我走在正确的轨道上(我相信此时它与事务管理器有关,并且可能与 Spring Batch 与 Spring Data JPA 发生冲突)。

参考资料:

    Spring boot repository does not save to the DB if called from scheduled job JpaItemWriter: no transaction is in progress

与上述帖子类似,我有一个使用 Spring BatchSpring Data JPASpring Boot 应用程序。它从 .csv 文件中读取逗号分隔的数据,然后进行一些处理/转换,并尝试使用 JPA 存储库方法持久化/保存到数据库,特别是这里 .saveAll()(我也尝试过 @ 987654325@ 方法和这做同样的事情),因为我正在保存用户定义数据类型的List<MyUserDefinedDataType>(批量插入)。

现在,我的代码在 Spring Boot 启动器 1.5.9.RELEASE 上运行良好,但我最近尝试升级到 2.XX,经过无数小时的调试,我发现只有版本 2.2.0.RELEASE 会持久/保存数据到数据库。所以升级到 >= 2.2.1.RELEASE 会破坏持久性。从.csv 中读取的所有内容都很好,就在代码流第一次遇到像.save().saveAll() 这样的JPA 存储库方法时,应用程序继续运行,但没有任何东西被持久化。我还注意到 Hikari 池日志"active=1 idle=4",但是当我在版本1.5.9.RELEASE 上查看相同的日志时,它在持久化数据后立即显示active=0 idle=5,因此应用程序肯定挂起。我进入调试器,甚至在跳转到存储库调用后看到,它通过 Spring AOP 库等(所有第三方)进入了几乎无限循环,我不相信会回到真正的应用程序/业务逻辑我写的。

3c22fb53ed64 2021-05-20 23:53:43.909 DEBUG
                    [HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=5, active=1, idle=4, waiting=0)

无论如何,我尝试了对其他人有用的最常见的解决方案:

    定义JpaTransactionManager @Bean 并将其注入Step 函数,同时使用PlatformTransactionManager 保留JobRepository。这没有没有工作。然后我也尝试在 JobRepository @Bean 中使用JpaTransactionManager,这也没有工作。 在我的应用程序中定义@RestController 端点以手动触发此作业,而不是从我的主Application.java 类手动执行。 (我会在下面详细讨论)。根据我在上面发布的一篇文章,即使在 spring >= 2.2.1 上,数据也能正确地保存到数据库中,我进一步怀疑 Spring Batch 持久性/实体/事务管理器的某些东西已经搞砸了。

代码基本上是这样的: BatchConfiguration.java

@Configuration
@EnableBatchProcessing
@Import(DatabaseConfiguration.class)
public class BatchConfiguration 

// Datasource is a Postgres DB defined in separate IntelliJ project that I add to my pom.xml
DataSource dataSource;

@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) 
    this.dataSource = dataSource;


@Bean
@Primary
public JpaTransactionManager jpaTransactionManager() 
    final JpaTransactionManager tm = new JpaTransactionManager();
    tm.setDataSource(dataSource);
    return tm;



 @Bean
 public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception 
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource);
    jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    jobRepositoryFactoryBean.setDatabaseType("POSTGRES");
    return jobRepositoryFactoryBean.getObject();


@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) 
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    return simpleJobLauncher;


@Bean(name = "jobToLoadTheData")
 public Job jobToLoadTheData() 
    return jobBuilderFactory.get("jobToLoadTheData")
            .start(stepToLoadData())
            .listener(new CustomJobListener())
            .build();


@Bean
@StepScope
public TaskExecutor taskExecutor() 
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(maxThreads);
    threadPoolTaskExecutor.setThreadGroupName("taskExecutor-batch");
    return threadPoolTaskExecutor;


@Bean(name = "stepToLoadData")
public Step stepToLoadData() 
    TaskletStep step = stepBuilderFactory.get("stepToLoadData")
            .transactionManager(jpaTransactionManager())
            .<List<FieldSet>, List<myCustomPayloadRecord>>chunk(chunkSize)
            .reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
            .processor(myCustomPayloadRecordItemProcessor())
            .writer(myCustomerWriter())
            .faultTolerant()
            .skipPolicy(new AlwaysSkipItemSkipPolicy())
            .skip(DataValidationException.class)
            .listener(new CustomReaderListener())
            .listener(new CustomProcessListener())
            .listener(new CustomWriteListener())
            .listener(new CustomSkipListener())
            .taskExecutor(taskExecutor())
            .throttleLimit(maxThreads)
            .build();
    step.registerStepExecutionListener(stepExecutionListener());
    step.registerChunkListener(new CustomChunkListener());
    return step;

我的主要方法: Application.java

  @Autowired
    @Qualifier("jobToLoadTheData")
    private Job loadTheData;

    @Autowired
    private JobLauncher jobLauncher;

    @PostConstruct
    public void launchJob () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException
    
        JobParameters parameters = (new JobParametersBuilder()).addDate("random", new Date()).toJobParameters();
        jobLauncher.run(loadTheData, parameters);
    

 public static void main(String[] args) 
        SpringApplication.run(Application.class, args);

现在,通常我正在从 Amazon S3 存储桶中读取此 .csv ,但由于我在本地进行测试,因此我只是将 .csv 放在项目目录中并通过触发 @987654349 中的作业直接读取它@main 类(如上所示)。另外,我确实在这个BatchConfiguration 类中定义了一些其他bean,但我不想让这篇文章变得过于复杂,而且从我所做的谷歌搜索来看,问题可能出在我发布的方法上(希望如此)。

另外,我想指出,类似于 Google/*** 上的其他帖子之一,用户遇到类似问题,我创建了一个 @RestController 端点,它简单地调用 .run() 方法 JobLauncher我传入JobToLoadTheData Bean,它会触发批量插入。你猜怎么了? 数据可以很好地保存到数据库中即使在 spring >= 2.2.1

这里发生了什么?这是一个线索吗?某种类型的实体或事务管理器是否有问题?我会接受任何建议!我可以提供你们可能需要的更多信息,所以请尽管询问。

【问题讨论】:

【参考方案1】:

您正在定义一个JobRepository 类型的bean,并期望它被Spring Batch 拾取。这是不正确的。您需要提供BatchConfigurer 并覆盖getJobRepository。这在reference documentation中有解释:

You can customize any of these beans by creating a custom implementation of the
BatchConfigurer interface. Typically, extending the DefaultBatchConfigurer
(which is provided if a BatchConfigurer is not found) and overriding the required
getter is sufficient.

这也记录在@EnableBatchProcessing 的Javadoc 中。所以在你的情况下,你需要定义一个Batchconfigurer类型的bean并覆盖getJobRepositorygetTransactionManager,类似于:

@Bean
public BatchConfigurer batchConfigurer(EntityManagerFactory entityManagerFactory, DataSource dataSource) 
    return new DefaultBatchConfigurer(dataSource) 
        @Override
        public PlatformTransactionManager getTransactionManager() 
            return new JpaTransactionManager(entityManagerFactory);
        

        @Override
        public JobRepository getJobRepository() 
            JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
            jobRepositoryFactoryBean.setDataSource(dataSource);
            jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
            // set other properties
            return jobRepositoryFactoryBean.getObject();
        
    ;

在 Spring Boot 上下文中,如果需要,您还可以覆盖 org.springframework.boot.autoconfigure.batch.JpaBatchConfigurercreateTransactionManagercreateJobRepository 方法。

【讨论】:

我已经为您的帖子添加了“BatchConfigurer”bean,但它不起作用。然后我删除了它,并让我在上面发布的“BatchConfiguration”类扩展了“DefaultBatchConfigurer”,然后我以这种方式覆盖了这些方法,但这不起作用。我尝试了与 PlatformTransactionManager 不同的事务管理器,并在 Step 与 JobRepository 中使用了不同的事务管理器,并且在 Spring >= 2.2.1 上几乎没有任何工作(它在 Spring 2.2.0 上工作正常)。 我对您关于 JobRepository 没有被我的 Spring 拾取的评论感到有些困惑,因为我已经完全按照这种方式定义了它,并且在 spring boot starter 2.2.0 上,数据可以很好地保存到数据库中?不需要定义任何 BatchConfigurer 匿名类或扩展任何类?我现在正在浏览文档..... 另外,根据文档“配置 JobRepository 使用 @EnableBatchProcessing 时,会为您提供开箱即用的 JobRepository。本节介绍如何配置您自己的。”这进一步解释了为什么我上面发布的配置确实有效,并且 JobRepository bean 定义为我所做的...... 请提供重现问题的minimal complete example。 嗨@Mahmoud Ben Hassine,我根据您的要求制作了一个非常干净且最小化的MVP/POC - github.com/alpizano/spring-batch-data-jpa-persistence-issue-mvp您能帮忙吗?我为 Postgres 制作了一个 Spring Profile,您可以运行 -Dspring.profiles.active=postgres,我现在将添加到 README.md 中

以上是关于Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时不会将数据持久化/保存到 Postgres 数据库的主要内容,如果未能解决你的问题,请参考以下文章

Spring Data JPA在Spring Boot中的应用

运行 Spring Boot 简单应用程序时找不到 Oracle 驱动程序

spring boot 应用程序:jpa 查询返回旧数据

从控制台应用程序使用带有休眠功能的spring-data-jpa时如何延迟加载收集

文档未保存在 Spring jpa 文档管理器应用程序中

JPA/Hibernate 在 Spring Boot 应用程序中插入不存在的表