Spring批处理ItemWriter异常正在杀死工作
Posted
技术标签:
【中文标题】Spring批处理ItemWriter异常正在杀死工作【英文标题】:Spring batch ItemWriter exception is killing job 【发布时间】:2021-02-10 19:59:37 【问题描述】:我有一个spring批处理应用程序,它将通过rest API读取数据并将数据写入数据库。
下面是我的代码。
配置:
@Bean
@Qualifier("batchJob")
public Job batchJob()
return jobBuilderFactory.get("batchJob").incrementer(new RunIdIncrementer()).listener(batchJobExecutionListener)
.start(firstStep)
.next(secondStep)
.build();
@Bean
public Step firstStep()
return stepBuilderFactory.get("firstStep").<Input, Output>chunk(AppConst.BATCH_CHUNK)
.reader(firstReader)
.processor(firstProcessor)
.writer(firstWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(99999)
.listener(new FirstSkipListener())
.build();
FirstSkipListener.class:
@Component
@Slf4j
public class FirstSkipListener implements SkipListener<Input, Output>
@Override
public void onSkipInProcess(Input arg0, Throwable arg1)
// Do nothing in onSkipInProcess
@Override
public void onSkipInRead(Throwable arg0)
// Do nothing in onSkipInRead
@Override
public void onSkipInWrite(Output output, Throwable arg1)
log.error("Skipped output = ", output);
FirstWriter.class:
@Component
@Slf4j
public class FirstWriter implements ItemWriter<Output>
@Autowired
private OutputRepository outputRepository;
@Autowired
private BatchRunRepository batchRunRepository;
private Integer batchId;
private Integer updateCount;
/**
* This method will write output list in db.
* @param outputs
* @return void
* @throws Exception
*/
@Override
public void write(List<? extends Output> outputs)
log.info("Start writing output data.");
if(!CollectionUtils.isEmpty(outputs))
try
updateCount = outputs.size();
outputRepository.saveAll(outputs);
outputRepository.flush();
log.info("End writing output data.");
catch (Exception e)
log.error("Error occured while writing output data: ", e.getMessage());
else
log.info("No data to be writtern for output.");
OutputRepository.class:
@Repository
public interface OutputRepository extends JpaRepository<Output, Long>
application.yml:
spring:
jpa:
database-platform: org.hibernate.dialect.PostgreSQLDialect
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
default_schema: my_schema
format_sql: true
jdbc:
lob:
non_contextual_creation: true
batch:
initialize-schema: always
initializer:
enabled: false
job:
enabled: false
main:
allow-bean-definition-overriding: true
jackson:
serialization:
indent_output: true
profiles: local
datasource:
url: jdbc:postgresql://localhost:5432/mydb?currentSchema=my_schema
username: postgres
password: 12345
platform: postgres
jpa:
show-sql: false
hibernate:
ddl-auto: validate
现在当我运行我的工作时,我遇到了异常。
2020-10-28 17:56:01.520 INFO [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] c.m.c.d.m.b.job.writer.FirstWriter : Start writing output data.
2020-10-28 17:56:02.108 ERROR [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper : ERROR: null value in column "output_name" violates not-null constraint
Detail: Failing row contains (486, null, 4, null, null, null, null, Need Description, data, null, 439e00ef-45d5-3a4b-ab65-f620f641b6d3, f, A, user, 2020-10-28 17:56:02.101, 2020-10-28 17:56:02.101, user, null, ).
2020-10-28 17:56:02.118 ERROR [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] c.m.c.d.m.b.job.writer.FirstWriter : Error occured while writing output data: could not execute statement; SQL [n/a]; constraint [output_name]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement
2020-10-28 17:56:02.250 ERROR [my-app,5764b6867c680cf1,5764b6867c680cf1,false] 17548 --- [nio-8080-exec-1] o.s.batch.core.step.tasklet.TaskletStep : JobRepository failure forcing rollback
org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [UPDATE BATCH_STEP_EXECUTION_CONTEXT SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? WHERE STEP_EXECUTION_ID = ?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:89) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1443) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:862) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:917) ~[spring-jdbc-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.persistSerializedContext(JdbcExecutionContextDao.java:236) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:163) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:210) ~[spring-batch-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_261]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_261]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:366) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at com.sun.proxy.$Proxy158.updateExecutionContext(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_261]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_261]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_261]
在我的代码中,我正在设置跳过策略,如果我最初在输出列表中传递 10 个元素并且第 5 条记录有问题,那么 Spring Batch 应该抛出异常并重试调用 write 方法 10 次。这样只有一条记录会被跳过,其他 9 条记录会保留在数据库中。
但就我而言,我的批处理应用程序完全停止了。它甚至无法更新由 sprint 维护的 BATCH_STEP_EXECUTION_CONTEXT 表。
请注意,我正在使用 JPA 存储库,而不是尝试在任何地方管理事务。考虑到 spring 将代表我管理它,甚至没有提供事务管理器。
【问题讨论】:
根本原因是“错误:“output_name”列中的空值违反非空约束” 我无法确定来自 API 的数据。我需要插入数据库。当然会有一些记录可能会抛出异常。 好吧,事务的定义是“要么所有命令都成功,要么都不成功”。如果您的事务涉及一些失败的命令,您就不能指望其他命令成功。如果某些语句成功而其他语句不成功是可以接受的,那么您不能在单个事务中完成所有操作。您需要每次插入使用一个事务(例如 autocommit = on) 不,我并不是说 txn 应该成功。我是说我已经实现了跳过侦听器,当第一次调用 write 失败时它应该会出现。假设我在 write 方法中发送了 10 个项目,但由于第 5 条记录而失败,那么 spring batch 应该通过一项一项地传递来调用 write 方法。只有失败的项目应该被跳过。 这需要在每个语句周围使用保存点,我不知道如何在 Spring 或 JPA 中启用它 【参考方案1】:好吧,我得到了错误。在我的写类中,我有一个用于打印和异常的 catch 块。我删除了 try catch 块并让 spring 批处理找到异常并调用跳过侦听器。
【讨论】:
以上是关于Spring批处理ItemWriter异常正在杀死工作的主要内容,如果未能解决你的问题,请参考以下文章
Spring-Batch学习总结——重要概念,环境搭建,名词解释,第一个项目及异常处理