我们如何在Spring Batch的作业的不同步骤之间共享数据?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了我们如何在Spring Batch的作业的不同步骤之间共享数据?相关的知识,希望对你有一定的参考价值。
深入研究Spring Batch,我想知道如何在作业的不同步骤之间共享数据?
我们可以使用JobRepository吗?如果是的话,我们怎么做?
有没有其他方法可以做到/达到同样的目的?
作业存储库间接用于在步骤之间传递数据(Jean-Philippe是正确的,最好的方法是将数据放入StepExecutionContext
,然后使用详细命名的ExecutionContextPromotionListener
将步骤执行上下文键提升为JobExecutionContext
。
值得注意的是,有一个倾向于将JobParameter
密钥推广到StepExecutionContext
(更加详细地命名为JobParameterExecutionContextCopyListener
);如果你的工作步骤不完全相互独立,你会发现你经常使用这些。
否则,您将使用更复杂的方案(如JMS队列或(天堂禁止)硬编码文件位置)在步骤之间传递数据。
至于在上下文中传递的数据的大小,我还建议你保持小的(但我没有任何具体细节)
正如Nenad Bozic在他的第3个选项中所说,使用临时表来共享步骤之间的数据,使用上下文共享也做同样的事情,它写入表并在下一步加载回来,但是如果你写入临时表你可以清理工作结束了。
另一个非常简单的方法,留待将来参考:
class MyTasklet implements Tasklet
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext)
getExecutionContext.put("foo", "bar");
class MyOtherTasklet implements Tasklet
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext)
getExecutionContext.get("foo");
getExecutionContext
here是:
ExecutionContext getExecutionContext(ChunkContext chunkContext)
return chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
把它放在超类中,作为default
方法在界面中,或者只是粘贴在你的Tasklet
s中。
从一个步骤,您可以将数据放入StepExecutionContext
。然后,通过监听器,您可以将数据从StepExecutionContext
提升到JobExecutionContext
。
这个JobExecutionContext
可用于以下所有步骤。
笨拙:数据必须简短。这些上下文通过序列化保存在JobRepository
中,长度有限(如果我记得很清楚,则为2500个字符)。
因此,这些上下文很适合共享字符串或简单值,但不适用于共享集合或大量数据。
共享大量数据并不是Spring Batch的理念。 Spring Batch是一组不同的操作,而不是一个巨大的业务处理单元。
我会说你有3个选择:
- 使用
StepContext
并将其推广到JobContext
并且您可以从每个步骤访问它,您必须遵守规定服从限制 - 创建
@JobScope
bean并将数据添加到该bean,@Autowire
在需要的地方使用它(缺点是它是内存中的结构,如果作业失败,数据丢失,migh导致可重启性问题) - 我们需要跨步骤处理更大的数据集(读取csv中的每一行并写入DB,从DB读取,聚合并发送到API),因此我们决定在新表中将数据建模在与弹簧批量元表相同的数据库中,保持
ids
中的JobContext
并在需要时访问,并在作业成功完成后删除该临时表。
您可以使用Java Bean对象
- 执行一步
- 将结果存储在Java对象中
- 下一步将引用相同的java对象来获取步骤1存储的结果
通过这种方式,您可以根据需要存储大量数据
这是我为保存可通过步骤访问的对象所做的操作。
- 创建了一个用于在作业上下文中设置对象的侦听器
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener
public void beforeJob(JobExecution jobExecution)
String myValue = someService.getValue();
jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
- 在作业上下文中定义了侦听器
<listeners>
<listener ref="myJobListener"/>
</listeners>
- 使用BeforeStep注释在步骤中消耗该值
@BeforeStep
public void initializeValues(StepExecution stepExecution)
String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");
您可以将数据存储在简单对象中。喜欢:
AnyObject yourObject = new AnyObject();
public Job build(Step step1, Step step2)
return jobBuilderFactory.get("jobName")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.build();
public Step step1()
return stepBuilderFactory.get("step1Name")
.<Some, Any> chunk(someInteger1)
.reader(itemReader1())
.processor(itemProcessor1())
.writer(itemWriter1(yourObject))
.build();
public Step step2()
return stepBuilderFactory.get("step2Name")
.<Some, Any> chunk(someInteger2)
.reader(itemReader2())
.processor(itemProcessor2(yourObject))
.writer(itemWriter2())
.build();
只需将数据添加到编写器中的对象或任何其他方法,并在下一步的任何阶段获取它
使用`ExecutionContextPromotionListener。
public class YourItemWriter implements ItemWriter<Object>
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception
// Some Business Logic
// put your data into stepexecution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
@BeforeStep
public void saveStepExecution(Final StepExecution stepExecution)
this.stepExecution = stepExecution;
现在您需要将promotionListener添加到您的作业中
@Bean
public Step step1()
return stepBuilder
.get("step1")<Company,Company> chunk(10)
.reader(reader()).processor(processor()).writer(writer())
.listener(promotionListener()).build();
@Bean
public ExecutionContextPromotionListener promotionListener()
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] "someKey");
listener.setStrict(true);
return listener;
现在,在step2中从作业ExecutionContext获取数据
public class RetrievingItemWriter implements ItemWriter<Object>
private Object someObject;
public void write(List<? extends Object> items) throws Exception
// ...
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution)
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
如果您正在使用tasklet,那么使用以下来获取或放置ExecutionContext
List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");
我被赋予了一个逐个调用批处理作业的任务。每个工作都依赖于另一个。第一个工作结果需要执行后续的工作程序。我正在搜索如何在执行作业后传递数据。我发现这个ExecutionContextPromotionListener派上用场了。
1)我为“ExecutionContextPromotionListener”添加了一个bean,如下所示
@Bean
public ExecutionContextPromotionListener promotionListener()
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys( new String[] "entityRef" );
return listener;
2)然后我将一个听众附加到我的步骤
Step step = builder.faultTolerant()
.skipPolicy( policy )
.listener( writer )
.listener( promotionListener() )
.listener( skiplistener )
.stream( skiplistener )
.build();
3)我在我的Writer步骤实现中添加了stepExecution作为参考,并在Beforestep中填充
@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
this.stepExecution = stepExecution;
4)在我的编写步骤结束时,我将stepexecution中的值填充为如下所示的键
lStepContext.put( "entityRef", lMap );
5)执行作业后,我从lExecution.getExecutionContext()
中检索了值并填充为作业响应。
6)从作业响应对象中,我将获取值并在其余作业中填充所需的值。
上面的代码用于使用ExecutionContextPromotionListener将步骤中的数据提升为ExecutionContext。它可以在任何步骤中完成。
Spring Batch为自己创建元数据表(如 以上是关于我们如何在Spring Batch的作业的不同步骤之间共享数据?的主要内容,如果未能解决你的问题,请参考以下文章 Spring Batch 从相同的执行和步骤重新启动未完成的作业 从 tasklet 步骤将参数添加到作业上下文并在 Spring Batch 的后续步骤中使用 如何将 Spring Batch Cron 作业迁移到 Spring Cloud 任务 关于这个 Spring Batch @Scheduled() 注解以及如何手动启动 Spring Batch 作业的一些疑问?batch_job_execution
,batch_job_execution_context
,batch_step_instan