Spring Batch -扩展和并行处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Batch -扩展和并行处理相关的知识,希望对你有一定的参考价值。
许多批处理问题可以通过单线程、单进程作业来解决, 因此,在考虑之前正确检查这是否满足您的需求总是一个好主意 关于更复杂的实现。衡量实际工作的表现,看看是否 最简单的实现首先满足您的需求。您可以读取和写入文件 不到一分钟就能达到几百兆字节,即使使用标准硬件也是如此。
当你准备好开始实现具有一些并行处理的作业时,Spring 批处理提供了一系列选项,本章将介绍这些选项,尽管有些 功能在别处介绍。在高级别上,有两种并行模式 加工:
- 单进程、多线程
- 多进程
这些也分为几类,如下所示:
- 多线程步骤(单进程)
- 并行步骤(单进程)
- 步骤的远程分块(多进程)
- 对步骤进行分区(单进程或多进程)
首先,我们回顾一下单进程选项。然后我们回顾多进程选项。
多线程步骤
开始并行处理的最简单方法是在步骤中添加 配置。TaskExecutor
例如,您可以向 添加一个属性 ,如下所示:tasklet
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
使用 Java 配置时,可以在步骤中添加一个, 如以下示例所示:TaskExecutor
爪哇配置
@Bean
public TaskExecutor taskExecutor()
return new SimpleAsyncTaskExecutor("spring_batch");
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager)
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
在此示例中,是对另一个 Bean 定义的引用 实现接口。TaskExecutor 是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用的详细信息 实现。最简单的多线程是 .taskExecutor
TaskExecutor
TaskExecutor
SimpleAsyncTaskExecutor
上述配置的结果是,通过读取、处理、 并将每个项目块(每个提交间隔)写入单独的执行线程中。 请注意,这意味着要处理的项目没有固定的顺序,并且块 可能包含与单线程大小写相比不连续的项目。在 除了任务执行程序设置的任何限制(例如它是否由 线程池),tasklet 配置具有限制(默认值:4)。 您可能需要增加此限制以确保线程池得到充分利用。Step
例如,您可以增加限制,如下所示:
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
使用 Java 配置时,构建器提供对限制的访问,如 遵循:
爪哇配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager)
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
另请注意,在 中使用的任何池资源可能会对并发性施加限制 您的步骤,例如 .确保至少在这些资源中建立池 与步骤中所需的并发线程数一样大。DataSource
使用 多线程实现存在一些实际限制 一些常见的批处理用例。许多参与者(如读者和作家) 是有状态的。如果状态未按线程隔离,则这些组件不是 可在多线程中使用。特别是大多数读者和 Spring Batch 中的编写器不是为多线程使用而设计的。然而, 可以使用无状态或线程安全的读取器和写入器,并且有一个示例 (称为)在春天 显示使用进程指示器(请参阅防止状态持久性)进行跟踪的批处理示例 已在数据库输入表中处理的项目。Step
Step
Step
parallelJob
Spring Batch 提供了 和 的一些实现。通常 他们在 Javadoc 中说它们是否线程安全,或者你必须做些什么来避免 并发环境中的问题。如果 Javadoc 中没有信息,您可以 检查实现以查看是否有任何状态。如果读取器不是线程安全的, 您可以使用提供的装饰它或自己使用它 同步委托人。可以将调用同步到 ,并且,只要 处理和写入是块中最昂贵的部分,你的步骤可能仍然 完成速度比单线程配置快得多。ItemWriter
ItemReader
SynchronizedItemStreamReader
read()
并行步骤
只要需要并行化的应用逻辑可以拆分为不同的 职责并分配给各个步骤,它可以在 单一进程。并行步骤执行易于配置和使用。
例如,并行执行步骤很简单, 如下:(step1,step2)
step3
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
使用 Java 配置时,并行执行步骤非常简单,如下所示:(step1,step2)
step3
爪哇配置
@Bean
public Job job(JobRepository jobRepository)
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
@Bean
public Flow splitFlow()
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
@Bean
public Flow flow1()
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
@Bean
public Flow flow2()
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
@Bean
public TaskExecutor taskExecutor()
return new SimpleAsyncTaskExecutor("spring_batch");
可配置的任务执行器用于指定哪个实现应执行各个流。默认值为 ,但需要异步才能运行中的步骤 平行。请注意,作业可确保拆分中的每个流在之前完成 聚合退出状态和转换。TaskExecutor
SyncTaskExecutor
TaskExecutor
有关更多详细信息,请参阅拆分流部分。
远程分块
在远程分块中,处理被拆分到多个进程中, 通过一些中间件相互通信。下图显示了 模式:Step
图1.远程分块
管理器组件是单个进程,工作线程是多个远程进程。 如果管理器不是瓶颈,则此模式效果最佳,因此处理必须更多 比阅读项目昂贵(在实践中经常出现这种情况)。
管理器是弹簧批处理的实现,其中被替换的 通过知道如何将项目块发送到中间件的通用版本作为 消息。工作线程是正在使用的任何中间件的标准侦听器(对于 例如,对于 JMS,它们将是实现),它们的作用是 通过接口使用标准或加号来处理项目块。使用它的优点之一 模式是读取器、处理器和写入器组件是现成的(相同 将用于步骤的本地执行)。项目是动态划分的, 并且工作通过中间件共享,因此,如果听众都渴望 使用者,负载平衡是自动的。Step
ItemWriter
MesssageListener
ItemWriter
ItemProcessor
ItemWriter
ChunkProcessor
中间件必须耐用,有保证交付,并且每个中间件都有一个消费者 消息。JMS是显而易见的候选者,但其他选项(如JavaSpaces)存在于 网格计算和共享内存产品空间。
有关更多详细信息,请参阅有关 Spring 批量集成 - 远程分块的部分。
分区
Spring Batch 还提供了用于对执行进行分区和执行的 SPI。 远程。在这种情况下,远程参与者是可以 易于配置并用于本地处理。下图显示了 模式:Step
Step
图2.分区
在左侧作为一系列实例运行,其中一个实例被标记为管理器。这张照片中的工人都是相同的 的实例,实际上可以代替管理器,导致 与 的结果相同。工作人员通常是远程服务,但 也可以是执行的本地线程。经理发送给工作人员的消息 在这种模式下不需要持久或有保证的交付。春季批次 中的元数据确保每个工作线程执行一次,并且只执行一次 每次执行。Job
Step
Step
Step
Job
JobRepository
Job
Spring Batch 中的 SPI 由一个特殊的实现(称为 )和两个策略接口组成,这两个接口需要针对特定的 环境。策略接口是 和 , 下面的序列图显示了它们的作用:Step
PartitionStep
PartitionHandler
StepExecutionSplitter
图3.分区 SPI
在这种情况下,右边是“远程”工作者,因此,可能存在 许多对象和/或进程扮演着这个角色,并且显示驱动 执行。Step
PartitionStep
以下示例显示了使用 XML 时的配置 配置:PartitionStep
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
以下示例显示了使用 Java 时的配置 配置:PartitionStep
爪哇配置
@Bean
public Step step1Manager()
return stepBuilderFactory.get("step1.manager")
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
与多线程步骤的属性类似,该属性可防止任务执行器被来自单个请求的请求饱和 步。throttle-limit
grid-size
与多线程步骤的方法类似,该方法可防止任务执行器被来自单个请求的请求饱和 步。throttleLimit
gridSize
春季单元测试套件 批处理示例(请参阅配置)有一个简单的示例,您可以复制和扩展。partition*Job.xml
Spring 批处理为调用的分区创建步骤执行,因此 上。许多人更喜欢调用经理步骤以保持一致性。您可以 为步骤使用别名(通过指定属性而不是属性)。step1:partition0
step1:manager
name
id
分区处理程序
PartitionHandler
是知道远程处理结构的组件,或者 网格环境。它能够向远程实例发送请求,以某种特定于结构的格式(如 DTO)包装。它不必知道 如何拆分输入数据或如何聚合多次执行的结果。 一般来说,它可能也不需要了解复原能力或故障转移, 因为在许多情况下,这些都是织物的特征。无论如何,弹簧批次总是 提供独立于结构的可重启性。失败的始终可以重新启动, 在这种情况下,只会重新执行失败的。StepExecution
Step
Step
Job
Steps
该接口可以具有针对各种 结构类型,包括简单的 RMI 远程处理、EJB 远程处理、定制 Web 服务、JMS、Java 空间、共享内存网格(如兵马俑或连贯)和网格执行结构 (如网格增益)。Spring Batch 不包含任何专有网格的实现 或远程织物。PartitionHandler
但是,Spring Batch确实提供了有用的实现。 使用 Spring 中的策略在单独的执行线程中本地执行实例。该实现称为 。PartitionHandler
Step
TaskExecutor
TaskExecutorPartitionHandler
是使用 XML 配置的步骤的默认值 前面显示的命名空间。您还可以显式配置它,如下所示:TaskExecutorPartitionHandler
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
您可以显式配置 与 Java 配置, 如下:TaskExecutorPartitionHandler
爪哇配置
@Bean
public Step step1Manager()
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
@Bean
public PartitionHandler partitionHandler()
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
该属性确定要创建的单独步骤执行的次数,因此 它可以与 中的线程池大小匹配。或者,它 可以设置为大于可用线程数,这使得块 工作更小。gridSize
TaskExecutor
对于 IO 密集型实例非常有用,例如 将大量文件拷贝或将文件系统复制到内容管理中 系统。它还可以通过提供实现来用于远程执行 这是远程调用的代理(例如使用 Spring Remoting)。TaskExecutorPartitionHandler
Step
Step
分区程序
具有更简单的职责:生成执行上下文作为输入 仅新步骤执行的参数(无需担心重新启动)。它有一个 单一方法,如以下接口定义所示:Partitioner
public interface Partitioner
Map<String, ExecutionContext> partition(int gridSize);
此方法的返回值将每个步骤执行的唯一名称 () 与 .名字显示 稍后在批处理元数据中作为分区中的步骤名称。这只是一袋名称-值对,因此可能包含一系列 主键、行号或输入文件的位置。遥控器然后 通常使用占位符绑定到上下文输入(步骤中的后期绑定 范围),如下一节所示。String
ExecutionContext
StepExecutions
ExecutionContext
Step
#…
步骤执行的名称(返回者 中的键)需要 在 A 的步骤执行中是唯一的,但没有任何其他特定 要求。执行此操作(并使名称对用户有意义)的最简单方法是 使用前缀+后缀命名约定,其中前缀是步骤 正在执行(它本身在 中是唯一的,后缀只是一个 计数器。在使用此约定的框架中有一个。Map
Partitioner
Job
Job
SimplePartitioner
您可以使用调用的可选接口来提供分区 与分区本身分开的名称。如果实现此 界面,则在重新启动时仅查询名称。如果分区成本高昂, 这可能是一个有用的优化。必须提供的名称 匹配 提供的那些。PartitionNameProvider
Partitioner
PartitionNameProvider
Partitioner
将输入数据绑定到步骤
对于由 执行的步骤来说,这是非常有效的 相同的配置,并且它们的输入参数在运行时从 .这很容易通过Spring Batch的StepScope功能来实现。 (在延迟绑定部分中有更详细的介绍)。为 例如,如果使用属性键创建实例 调用 ,指向每个步骤调用的不同文件(或目录), 输出可能类似于下表的内容:PartitionHandler
ExecutionContext
Partitioner
ExecutionContext
fileName
Partitioner
表 1.目标目录处理提供的执行上下文的示例步骤执行名称Partitioner
步骤执行名称(键) | 执行上下文(值) |
文件复制:分区0 | 文件名=/主页/数据/一 |
文件复制:分区 1 | 文件名=/主页/数据/二 |
文件复制:分区2 | 文件名=/主页/数据/三 |
然后,可以使用到执行上下文的后期绑定将文件名绑定到步骤。
下面的示例演示如何在 XML 中定义后期绑定:
XML 配置
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#stepExecutionContext[fileName]/*"/>
</bean>
以下示例演示如何在 Java 中定义后期绑定:
爪哇配置
@Bean
public MultiResourceItemReader itemReader(
@Value("#stepExecutionContext[fileName]/*") Resource [] resources)
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
以上是关于Spring Batch -扩展和并行处理的主要内容,如果未能解决你的问题,请参考以下文章
Spring boot spring.batch.job.enabled=false 无法识别
Spring boot spring.batch.job.enabled=false 无法识别
关于这个 Spring Batch @Scheduled() 注解以及如何手动启动 Spring Batch 作业的一些疑问?
当 Spring Batch 插入 BATCH_JOB_EXECUTION 时出现 BadSqlGrammarException