Spring Batch - 循环读取器、处理器和写入器 N 次

Posted

技术标签:

【中文标题】Spring Batch - 循环读取器、处理器和写入器 N 次【英文标题】:Spring Batch - Loop reader, processor and writer for N times 【发布时间】:2018-02-06 16:11:50 【问题描述】:

在Spring Batch中,如何循环读取器、处理器和写入器N次?

我的要求是:

我有“N”个。客户/客户。 对于每个客户/客户,我需要从数据库(读取器)中获取记录,然后我必须处理(处理器)客户/客户的所有记录,然后我必须将记录写入文件(写入器)。

如何将spring批处理作业循环N次?

【问题讨论】:

到目前为止你尝试过什么?你能发布你的代码/配置文件吗? 我为一位客户做过。所以我想找到一种方法为 N 个客户循环它。 【参考方案1】:

AFAIK 我担心这种情况没有框架支持。至少不是你想要解决它的方式。 我建议以不同的方式解决问题:

选项 1

一次读取/处理/写入所有客户的所有记录。只有当它们都在同一个数据库中时,您才能执行此操作。否则我不会推荐它,因为您必须配置 JTA/XA 事务,这不值得。

选项 2

为每个客户运行一次工作(我认为最好的选择)。将每个客户端的必要信息保存在不同的属性文件中(数据库数据连接、按客户端过滤记录的值、您可能需要特定于客户端的任何其他数据)并通过参数传递给它必须使用的客户端的作业。通过这种方式,您可以控制处理哪个客户端以及何时使用 bash 文件和/或 cron。如果您使用 Spring Boot + Spring Batch,您可以将客户端配置存储在配置文件(application-clientX.properties)中并运行如下过程:

$>  java -Dspring.profiles.active="clientX"  \
     -jar "yourBatch-1.0.0-SNAPSHOT.jar"     \
     -next

奖金 - 选项 3

如果上面没有一个适合您的需求,或者您坚持以他们提出的方式解决问题,那么您可以根据参数动态配置作业并使用 JavaConf 为每个客户端创建一个步骤:

@Bean
public Job job()
    JobBuilder jb = jobBuilders.get("job");
    for(Client c : clientsToProcess) 
            jb.flow(buildStepByClient(c));
    ;
    return jb.build();

再次,我强烈建议您不要这样做:丑陋,违反框架哲学,难以维护,调试,您可能还必须在这里使用 JTA/XA,...

希望对你有所帮助!

【讨论】:

感谢您的建议【参考方案2】:

Local Partitioning 将解决您的问题。

在您的分区器中,您将把所有客户端 ID 放入映射中,如下所示(只是伪代码),

public class PartitionByClient implements Partitioner 

        @Override
        public Map<String, ExecutionContext> partition(int gridSize) 
            Map<String, ExecutionContext> result = new HashMap<>();
            int partitionNumber = 1;
            for (String client: allClients) 
            ExecutionContext value = new ExecutionContext();
            value.putString("client", client);
            result.put("Client [" + client+ "] : THREAD " + partitionNumber, value);
            partitionNumber++;
            

         

        return result;
        
    

这只是一个伪代码。您必须查看分区的详细文档。

您必须在@StepScope 中标记您的阅读器、处理器和编写器(即哪个部分需要您的client 的值)。读者将在 SQL 的WHERE 子句中使用这个client。您将在阅读器等定义中使用@Value("#stepExecutionContext[client]") String client 来注入此值。

现在是最后一部分,如果您在主分区器步骤配置中设置此任务执行器,您将需要一个任务执行器,并且等于 concurrencyLimit 的客户端将并行启动。

@Bean
    public TaskExecutor taskExecutor() 
    SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
    return simpleTaskExecutor;
    

如果您希望一次只运行一个客户端,concurrencyLimit 将是 1

【讨论】:

感谢您的建议

以上是关于Spring Batch - 循环读取器、处理器和写入器 N 次的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch SkipPolicy在处理异常时陷入无限循环

在 Spring Batch Step、Tasklet 或 Chunks 之间做出决定

在 Spring Batch 中对字段进行处理时读取新文件

如何在 Spring Batch 中分别读取平面文件头和正文

Spring Batch中如何读取多个CSV文件合并数据进行处理?

Spring Batch:ItemProcessor 不处理所有记录