Spring批处理ItemWriter异常正在杀死工作

Posted

技术标签:

【中文标题】Spring批处理ItemWriter异常正在杀死工作【英文标题】:Spring batch ItemWriter exception is killing job 【发布时间】:2021-02-10 19:59:37 【问题描述】:

我有一个spring批处理应用程序,它将通过rest API读取数据并将数据写入数据库。

下面是我的代码。

配置:

@Bean
@Qualifier("batchJob")
public Job batchJob() 
    return jobBuilderFactory.get("batchJob").incrementer(new RunIdIncrementer()).listener(batchJobExecutionListener)
            .start(firstStep)
            .next(secondStep)
            .build();


@Bean
public Step firstStep() 
    return stepBuilderFactory.get("firstStep").<Input, Output>chunk(AppConst.BATCH_CHUNK)
            .reader(firstReader)
            .processor(firstProcessor)
            .writer(firstWriter)
            .faultTolerant()
            .skip(Exception.class)
            .skipLimit(99999)
            .listener(new FirstSkipListener())
            .build();

FirstSkipListener.class:

@Component
@Slf4j
public class FirstSkipListener implements SkipListener<Input, Output> 

    @Override
    public void onSkipInProcess(Input arg0, Throwable arg1) 
        // Do nothing in onSkipInProcess
    

    @Override
    public void onSkipInRead(Throwable arg0) 
        // Do nothing in onSkipInRead
    

    @Override
    public void onSkipInWrite(Output output, Throwable arg1) 
        log.error("Skipped output =  ", output);
    


FirstWriter.class:

@Component
@Slf4j
public class FirstWriter implements ItemWriter<Output> 

    @Autowired
    private OutputRepository outputRepository;
    
    @Autowired
    private BatchRunRepository batchRunRepository;
    
    private Integer batchId;
    
    private Integer updateCount;
    
    /**
     * This method will write output list in db.
     * @param outputs
     * @return void
     * @throws Exception
     */
    @Override
    public void write(List<? extends Output> outputs) 
        log.info("Start writing output data.");
        if(!CollectionUtils.isEmpty(outputs)) 
            try 
                updateCount = outputs.size();
                outputRepository.saveAll(outputs);
                outputRepository.flush();
                log.info("End writing output data.");
             catch (Exception e) 
                log.error("Error occured while writing output data: ", e.getMessage());
            
         else 
            log.info("No data to be writtern for output.");
        
    

OutputRepository.class:

@Repository
public interface OutputRepository extends JpaRepository<Output, Long> 

application.yml:

spring:
  jpa:
    database-platform: org.hibernate.dialect.PostgreSQLDialect
    properties:
      hibernate:
        dialect: org.hibernate.dialect.PostgreSQLDialect
        default_schema: my_schema
        format_sql: true
        jdbc: 
          lob:
            non_contextual_creation: true
  batch:
    initialize-schema: always
    initializer:
      enabled: false
    job:
      enabled: false
  main:
    allow-bean-definition-overriding: true
  jackson:
    serialization:
      indent_output: true
  profiles: local
    
  datasource:
    url: jdbc:postgresql://localhost:5432/mydb?currentSchema=my_schema
    username: postgres
    password: 12345
    platform: postgres
  jpa:             
    show-sql: false
    hibernate:
      ddl-auto: validate

现在当我运行我的工作时,我遇到了异常。

2020-10-28 17:56:01.520  INFO [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] c.m.c.d.m.b.job.writer.FirstWriter     : Start writing output data.
2020-10-28 17:56:02.108 ERROR [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper   : ERROR: null value in column "output_name" violates not-null constraint
  Detail: Failing row contains (486, null, 4, null, null, null, null, Need Description, data, null, 439e00ef-45d5-3a4b-ab65-f620f641b6d3, f, A, user, 2020-10-28 17:56:02.101, 2020-10-28 17:56:02.101, user, null, ).
2020-10-28 17:56:02.118 ERROR [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] c.m.c.d.m.b.job.writer.FirstWriter     : Error occured while writing output data: could not execute statement; SQL [n/a]; constraint [output_name]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement
2020-10-28 17:56:02.250 ERROR [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] o.s.batch.core.step.tasklet.TaskletStep  : JobRepository failure forcing rollback

org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [UPDATE BATCH_STEP_EXECUTION_CONTEXT SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? WHERE STEP_EXECUTION_ID = ?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:89) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1443) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:862) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:917) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.persistSerializedContext(JdbcExecutionContextDao.java:236) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:163) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:210) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_261]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_261]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:366) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at com.sun.proxy.$Proxy158.updateExecutionContext(Unknown Source) ~[na:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_261]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_261]

在我的代码中,我正在设置跳过策略,如果我最初在输出列表中传递 10 个元素并且第 5 条记录有问题,那么 Spring Batch 应该抛出异常并重试调用 write 方法 10 次。这样只有一条记录会被跳过,其他 9 条记录会保留在数据库中。

但就我而言,我的批处理应用程序完全停止了。它甚至无法更新由 sprint 维护的 BATCH_STEP_EXECUTION_CONTEXT 表。

请注意,我正在使用 JPA 存储库,而不是尝试在任何地方管理事务。考虑到 spring 将代表我管理它,甚至没有提供事务管理器。

【问题讨论】:

根本原因是“错误:“output_name”列中的空值违反非空约束 我无法确定来自 API 的数据。我需要插入数据库。当然会有一些记录可能会抛出异常。 好吧,事务的定义是“要么所有命令都成功,要么都不成功”。如果您的事务涉及一些失败的命令,您就不能指望其他命令成功。如果某些语句成功而其他语句不成功是可以接受的,那么您不能在单个事务中完成所有操作。您需要每次插入使用一个事务(例如 autocommit = on) 不,我并不是说 txn 应该成功。我是说我已经实现了跳过侦听器,当第一次调用 write 失败时它应该会出现。假设我在 write 方法中发送了 10 个项目,但由于第 5 条记录而失败,那么 spring batch 应该通过一项一项地传递来调用 write 方法。只有失败的项目应该被跳过。 这需要在每个语句周围使用保存点,我不知道如何在 Spring 或 JPA 中启用它 【参考方案1】:

好吧,我得到了错误。在我的写类中,我有一个用于打印和异常的 catch 块。我删除了 try catch 块并让 spring 批处理找到异常并调用跳过侦听器。

【讨论】:

以上是关于Spring批处理ItemWriter异常正在杀死工作的主要内容,如果未能解决你的问题,请参考以下文章

在itemwriter中,春季批处理未在运行时异常上滚动

Spring-Batch学习总结——重要概念,环境搭建,名词解释,第一个项目及异常处理

ItemWriter的Spring Boot多线程

Spring批处理中的多个项目编写器

Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入

使用 JPA 存储库的春季批处理 ItemWriter 存在问题