如果从计划的作业中调用,Spring Boot 存储库不会保存到数据库

Posted

技术标签:

【中文标题】如果从计划的作业中调用,Spring Boot 存储库不会保存到数据库【英文标题】:Spring boot repository does not save to the DB if called from scheduled job 【发布时间】:2019-06-24 16:55:17 【问题描述】:

我有一个 Spring Boot 应用程序,我需要在其中安排一个作业来从特定目录读取文件并将数据存储到数据库中。

我使用 Spring 批处理来处理文件部分,因为文件的数量非常大。

应用程序有一个名为PraserStarer 的组件,它有一个名为startParsing 的方法。该方法使用@scheduled注解进行注解。

@scheduled(fixedDelay = 60 * 1000)
public startParsing()
    // start spring batch job

我有一个repository接口NewsRepositry注入到spring批处理第一步的writer中。

应用程序有一个简单的控制器来手动调用startParsing 方法。从控制器调用 startParsing 方法时,一切正常。 spring批处理作业正常启动,读取文件,将数据写入DB,归档文件。

当从调度框架调用startParsing方法时,spring批处理作业正常启动,并读取文件但没有存储数据库。

我怀疑这里的问题是有两种不同的上下文,一种用于调度部分,另一种用于应用程序的其余部分。

由于某种原因,调度上下文中没有事务管理器导致没有任何东西进入数据库。

1- 我的怀疑正确吗?

2- 如果是,如何强制将事务管理器加载到其他上下文?

编辑

解析器入门类的代码如下

@Component
public class ParserStarter 

    @Autowired
    JobLauncher jobLauncher;

    @Value("$app.data_directory")
    private String dataDir;

    @Autowired
    private ParserJobListener jobListener;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    public Resource[] getResources() throws IOException 
        // return array of file resource to be processed
    

//  @Scheduled(fixedDelay = 60 * 1000)
    public void startParsing() throws Exception 
        String jobName = System.currentTimeMillis() + " New Parser Job";

        JobParameters jobParameters = new JobParametersBuilder().addString("source", jobName).toJobParameters();

        jobLauncher.run(getParsingJob(), jobParameters);
    

    @Bean(name="getParsingJob")
    private Job getParsingJob() throws IOException 

        jobListener.setResources(getResources());

        Step processingStep = jobListener.processingStep();

        Step archivingStep = jobListener.archivingStep();

        Job job = jobBuilderFactory.get("Store News").incrementer(new RunIdIncrementer())
                .listener(jobListener).start(processingStep).next(archivingStep).build();

        return job;
    

作业监听器的代码如下

@Component
public class ParserJobListener extends JobExecutionListenerSupport 

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private Resource[] resources;

    @Value("$app.archive_directory")
    private String archiveDirectory;

    @Autowired
    private Writer writer;

    public MultiResourceItemReader<DataRecord> multiResourceItemReader() 
        MultiResourceItemReader<DataRecord> resourceItemReader = new MultiResourceItemReader<DataRecord>();
        resourceItemReader.setResources(resources);
        resourceItemReader.setDelegate(new Reader());
        return resourceItemReader;
    

    public Step archivingStep() 
        FileArchivingTask archivingTask = new FileArchivingTask(resources, archiveDirectory);
        return stepBuilderFactory.get("Archiving step").tasklet(archivingTask).build();
    

    public Step processingStep() 
        return stepBuilderFactory.get("Process news file").<DataRecord, DataRecord>chunk(1000)
                .reader(multiResourceItemReader()).writer(writer).build();
    

    @Override
    public void afterJob(JobExecution jobExecution) 
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) 
            System.out.println("Job finished")
        
    

    public void setResources(Resource[] resources) 
        this.resources = resources;
    


剩下的是作家,它在下面

@Component
public class Writer implements ItemWriter<DataRecord>

    @Autowired
    private DataRepository dataRepo;

    @Override
    public void write(List<? extends DataRecord> items) throws Exception 
        dataRepo.saveAll(items);
    


编辑 2

我已经改变了 writer 的 write 方法,如下所示分别保存和刷新每个项目

@Transactional
    public void write(List<? extends GdeltRecord> items) throws Exception 
        for (GdeltRecord gdeltRecord : items) 
            dataRepo.saveAndFlush(gdeltRecord);
        
//      dataRepo.saveAll(items);
 

这一次应用程序抛出了一个TransactionRequiredException: no transaction is in progress 异常。

这是异常的完整堆栈跟踪

Caused by: javax.persistence.TransactionRequiredException: no transaction is in progress
    at org.hibernate.internal.SessionImpl.checkTransactionNeeded(SessionImpl.java:3552) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
    at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1444) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
    at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1440) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerInvocationHandler.invoke(ExtendedEntityManagerCreator.java:350) ~[spring-orm-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at com.sun.proxy.$Proxy87.flush(Unknown Source) ~[na:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:308) ~[spring-orm-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at com.sun.proxy.$Proxy87.flush(Unknown Source) ~[na:na]
    at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:533) ~[spring-data-jpa-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:504) ~[spring-data-jpa-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:359) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:200) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:644) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.doInvoke(RepositoryFactorySupport.java:608) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.lambda$invoke$3(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.invoke(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:59) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    ... 66 common frames omitted

【问题讨论】:

您的配置不正确,我会从监听器中取出步骤定义,并将它们放在单独的类中的作业定义旁边(典型的作业配置可以在getting started guide 中找到)。关于事务,可以分享一下你的事务管理器的配置吗? 我认为问题不在于步骤。我尝试了你的建议,情况是一样的。关于事务管理器配置,我没有做任何配置。它是使用 spring 自动配置来配置的。 我也尝试手动配置数据源、事务管理器和实体管理器,但出现两个事务管理器 bean 异常。这两个 bean 之一是由 spring batch 提供的,我认为这可能是问题所在。 ***.com/questions/22509529/…的可能重复 @MahmoudBenHassine +1 【参考方案1】:

我们必须明确提及 JpaTransactionManager 而不是默认的 spring 批处理事务。

@Configuration
@EnableBatchProcessing
public class MyJob extends DefaultBatchConfigurer 
       
       @Autowired
       private DataSource dataSource;

       @Bean
       @Primary
       public JpaTransactionManager jpaTransactionManager() 
            final JpaTransactionManager tm = new JpaTransactionManager();
            tm.setDataSource(dataSource);
            return tm;
       

【讨论】:

我们是否将它与 STEP 和 JOB REPOSITORY 的单独事务管理器一起使用?实体管理器呢?我看到人们将它传递给 JpaTransactionManager 构造函数【参考方案2】:

我尝试了这个问题 (JpaItemWriter: no transaction is in progress) 中描述的方法,它对我有用。

我定义了一个JpaTransactionManager bean 并将其与步骤配置一起使用。

    @Bean
    @Primary
    public JpaTransactionManager jpaTransactionManager() 
        final JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    

并在步骤配置中

    @Autowired
    JpaTransactionManager trxm;

    public Step processingStep(Resource[] resources) throws IOException 
        return stepBuilderFactory.get("Process CSV File")
                .transactionManager(trxm)
                .<DataRecord, DataRecord>chunk(1000)
                .reader(multiResourceItemReader()).writer(writer).build();
    

【讨论】:

好吧,我尝试了这个确切的东西,我的代码 JPA save 和 saveAll 方法只是“挂起”并且什么都不做,这对我不起作用,但这一切在 spring 1.xx 上都很好(现在我正在尝试 Spring 2.2.1)【参考方案3】:

如果不查看完整代码,就很难分析到底发生了什么。但是,基于documentation,spring 使用了面向块的处理。它是这样说的:

一旦读取的项目数等于提交间隔,就会通过 ItemWriter 写出整个块,然后提交事务。

这可能是您没有立即看到任何数据库写入的原因。

关于事务管理器,可以这样定义(解释here):

@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) 
    return this.jobBuilderFactory.get("sampleJob")
                            .repository(jobRepository)
                .start(sampleStep)
                .build();


/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) 
        return this.stepBuilderFactory.get("sampleStep")
                                .transactionManager(transactionManager)
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();

【讨论】:

Reagrding 项目数和提交间隔,不幸的是,情况并非如此。当从控制器启动作业时,记录立即出现在数据库中,但是当从预定方法启动时,作业开始和结束,没有任何内容存储到数据库中 那么,当你从控制器调用startParsing时它工作,当你从调度程序调用它时它不起作用? 当我从控制器调用startParsing时有效,当调度框架触发startParsing方法时无效。 试一试,你能不能在控制器中写另一个方法,用@Schedule注解,然后从那里调用startParser 我已经这样做了。我用预定的方法注释了相同的方法,情况仍然相同。我现在将对该问题添加新的编辑。请检查一下。

以上是关于如果从计划的作业中调用,Spring Boot 存储库不会保存到数据库的主要内容,如果未能解决你的问题,请参考以下文章

springboot stringredistemplate 能存对象吗

Spring Boot教程11——计划任务

Spring Boot实战笔记-- Spring高级话题(计划任务)

如何从另一个新的 Spring Boot 项目调用一个 Spring Boot 项目中存在的 Spring Boot api

Spring Boot,Cron作业同步

Spring Boot 批处理作业识别