Spring Batch 从相同的执行和步骤重新启动未完成的作业

Posted

技术标签:

【中文标题】Spring Batch 从相同的执行和步骤重新启动未完成的作业【英文标题】:Spring Batch restart uncompleted jobs from the same execution and step 【发布时间】:2019-01-05 05:21:15 【问题描述】:

我使用以下逻辑重启Spring Batch未完成的(例如应用程序异常终止后)作业:

public void restartUncompletedJobs() 

    LOGGER.info("Restarting uncompleted jobs");

    try 
        jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

        List<String> jobs = jobExplorer.getJobNames();
        for (String job : jobs) 
            Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);

            for (JobExecution runningJob : runningJobs) 
                runningJob.setStatus(BatchStatus.FAILED);
                runningJob.setEndTime(new Date());
                jobRepository.update(runningJob);
                jobOperator.restart(runningJob.getId());
                LOGGER.info("Job restarted: " + runningJob);
            
        
     catch (Exception e) 
        LOGGER.error(e.getMessage(), e);
    

这很好用,但有一个副作用——它不会重新启动失败的作业执行,而是创建一个新的执行实例。如何更改此逻辑以便从失败的步骤重新开始失败的执行而不创建新的执行?

更新

当我尝试以下代码时:

public void restartUncompletedJobs() 
try 
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) 

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) 
        jobOperator.restart(jobExecution.getId());
    
    
 catch (Exception e) 
    LOGGER.error(e.getMessage(), e);


它失败并出现以下异常:

2018-07-30 06:50:47.090 ERROR 1588 --- [           main] c.v.p.d.service.batch.BatchServiceImpl   : Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters=ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL

org.springframework.batch.core.UnexpectedJobExecutionException: Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters=ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL
    at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684) [spring-aop-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$7659d4c.restart(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at com.example.pipeline.domain.service.batch.BatchServiceImpl.restartUncompletedJobs(BatchServiceImpl.java:143) ~[domain-0.0.1.jar!/:0.0.1]

以下代码在作业存储数据库中创建新的执行:

public void restartUncompletedJobs() 
try 
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) 

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) 

        jobExecution.setStatus(BatchStatus.STOPPED);
        jobExecution.setEndTime(new Date());
        jobRepository.update(jobExecution);

        Long jobExecutionId = jobExecution.getId();
        jobOperator.restart(jobExecutionId);
    
    
 catch (Exception e) 
    LOGGER.error(e.getMessage(), e);



问题是 - 如何在应用程序重启后继续运行旧的未完成的执行而不创建新的执行?

【问题讨论】:

请不要重复问题。在***.com/questions/51568654/…中查看我的回答 这不是重复 - 这个问题涉及从相同的执行和步骤简单地重新启动未完成的作业。链接的问题与集群环境中未完成作业的重新启动有关。我很抱歉,但你的回答并没有解决那里描述的问题。目前尚不清楚如何区分未完成且未运行的作业与未完成但正在运行的作业。 在单节点集群上,应用程序重启后,我很确定此时没有一个正在运行的作业,可以重新启动所有作业,但在具有共享作业的多节点集群中存储库 - 我不知道正在运行的确切作业是什么 - 不是。 好吧,如果你改变那里的问题,你希望答案如何仍然解决那里描述的问题? “如何在应用重启后继续运行旧的未完成的执行而不创建新的执行?” > Spring批处理的设计不是这样工作的。每当您提交作业时,它都会创建一个新的执行。但是,如果您使用失败作业实例的参数提交作业,则会从先前失败的位置开始创建新的作业执行。 【参考方案1】:

TL;DR:Spring Batch 将始终创建新的作业执行,并且不会重用先前失败的作业执行来继续执行。

更长的答案:首先你需要了解 Spring Batch 中三个相似但不同的概念:Job、Job Instance、Job Execution

我总是用这个例子:

工作:日终批处理 作业实例:2018-01-01 的日终批次 作业执行:2018-01-01 的日终批处理,执行 #1

概括地说,这就是 Spring Batch 的恢复工作原理:

假设您在第 3 步中的第一次执行失败。您可以使用相同的参数 (2018-01-01) 提交相同的作业(日后批处理)。 Spring Batch 将尝试查找提交的 Job Instance(End- of-Day Batch for 2018-01-01),发现之前在第 3 步失败了。Spring Batch 然后会创建一个 NEW 执行,[End-Of-Day Batch for 2018-01-01, execution #2],然后从第 3 步开始执行。

所以按照设计,Spring 试图恢复的是以前失败的 Job Instance(而不是 Job Execution)。当您重新运行先前失败的执行时,Spring Batch 不会重用执行。

【讨论】:

@Adian 你能帮帮我吗:***.com/questions/63713450/…

以上是关于Spring Batch 从相同的执行和步骤重新启动未完成的作业的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch - 使用相同的作业参数重新运行作业

我们如何在 Spring Batch 中 Job 的不同步骤之间共享数据?

Spring Batch:根据 itemReader 中的结果执行作业

为啥我的 Spring Batch 多线程步骤在任何处理之前执行所有读取?

从 tasklet 步骤将参数添加到作业上下文并在 Spring Batch 的后续步骤中使用

在 Spring Batch 步骤中移动 HashMap