Spring Batch 中没有可用于作业范围的上下文持有者

Posted

技术标签:

【中文标题】Spring Batch 中没有可用于作业范围的上下文持有者【英文标题】:No context holder available for job scope in Spring Batch 【发布时间】:2021-11-13 19:42:13 【问题描述】:

我正在尝试在 Spring Batch 作业中使用多线程步骤,但我得到一个“Scope 'job' is not active for current thread...”。我在 Spring 中尝试了一些方法,但此时我正在使用我认为是 OOTB Spring 构造的东西,但它仍然失败。

错误是:

2021-09-19 22:40:03,432 ERROR [https-jsse-nio-8448-exec-4]: org.springframework.batch.core.step.AbstractStep Encountered an error executing step writeToDatabaseStep in job softlayerUploadJob
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.softLayerDataItemQueue': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:365)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
    at com.sun.proxy.$Proxy126.read(Unknown Source)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161)
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353)
    ... 19 common frames omitted

基本工作结构简化: 作业 SoftLayer 上传作业 步骤:softlayerUploadFileStep(不能多线程) 从 Excel 文件中读取 写入 SoftLayerDataItemQueue) bean,最终写入 java.util.Queue 步骤:writeToDatabaseStep 从 SoftLayerDataItemQueue bean 读取 使用 JpaWriter 写入数据库

SoftLayerJobConfiguration.java

public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> 
    private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);

    private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();

//  private Queue<SoftLayerData> queue = new LinkedList<>();

    // @Value("#stepExecution.jobExecution.jobInstance.instanceId")
    @Value("#jobExecution.jobInstance.instanceId")
    private int jobInstanceId;

    public Queue<SoftLayerData> getQueue() 
        Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
        logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
        if (result == null) 
            result = new LinkedList<>();
            queueMap.put(jobInstanceId, result);
            
        
        logger.info("Returning queue with item count=" + result.size());
        return result;
    

    @Override
    public void write(List<? extends SoftLayerData> items) throws Exception 
        logger.info("@@@ Attempting to add item to queue with bean hashCode=" + this.hashCode() + " job instanceid="
                + jobInstanceId + " ");
        logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
        if (logger.isDebugEnabled()) 
            for (SoftLayerData item : items) 
                logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
            
        
        getQueue().addAll(items);
    

    @Override
    public SoftLayerData read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException 
        logger.info("@@@ Attempting to remove item from queue with bean hashCode=" + this.hashCode()
                + " job instanceid=" + jobInstanceId + " ");
        SoftLayerData result = null;
        if (getQueue() != null && getQueue().size() > 0) 
            result = getQueue().remove();
            logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
         else 
            logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
        
        
        return result;
    

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException 
        logger.info("SoftLayerDataItemQueue: open()");
        /* Unused method */
    

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException 
        logger.info("SoftLayerDataItemQueue: update()");
        /* Unused method */
    

    @Override
    public void close() throws ItemStreamException 
        logger.info("SoftLayerDataItemQueue: close()");
        /* Unused method */
    



SoftLayerDataItemQueue.java

public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> 
    private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);

    private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();

    @Value("#jobExecution.jobInstance.instanceId")
    private int jobInstanceId;

    public Queue<SoftLayerData> getQueue() 
        Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
        logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
        if (result == null) 
            result = new LinkedList<>();
            queueMap.put(jobInstanceId, result);
            
        
        logger.info("Returning queue with item count=" + result.size());
        return result;
    

    @Override
    public void write(List<? extends SoftLayerData> items) throws Exception 
        logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
        if (logger.isDebugEnabled()) 
            for (SoftLayerData item : items) 
                logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
            
        
        getQueue().addAll(items);
    

    @Override
    public SoftLayerData read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException 
                + " job instanceid=" + jobInstanceId + " ");
        SoftLayerData result = null;
        if (getQueue() != null && getQueue().size() > 0) 
            result = getQueue().remove();
            logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
         else 
            logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
        
        
        return result;
    

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException 
        logger.info("SoftLayerDataItemQueue: open()");
        /* Unused method */
    

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException 
        logger.info("SoftLayerDataItemQueue: update()");
        /* Unused method */
    

    @Override
    public void close() throws ItemStreamException 
        logger.info("SoftLayerDataItemQueue: close()");
        /* Unused method */
    


注意:我不喜欢使用 SoftLayerDataItemQueue,但我想不出任何其他方法来编写在一个步骤中处理的项目并在另一个步骤中读取它们,特别是对于大容量和并行处理。我希望 Spring 有某种方法可以将数据从一个步骤写入另一个步骤,但我找不到它。 SO 上的其他人建议写入文件或工作或步骤上下文。

【问题讨论】:

我不明白为什么你认为你需要多个步骤?为什么首先在 1 步中读取,然后在下一步中写入,这应该是带有读取器和写入器的 1 步(中间没有队列)。也许是处理器。在我看来,你让事情变得比必要的更复杂。 我和 M. Deinum 有同样的问题,我在这里问过这个问题:***.com/questions/69149100/…。我认为不需要中间队列。单个(多线程或分区)面向块的步骤就足够了 IMO。 【参考方案1】:

从某种意义上说,这只是部分答案,因为这解决了我的问题,但我无法解释为什么这是必要的。以下建议 https://github.com/spring-projects/spring-batch/issues/1335

我创建了自己的 SimpleAsyncTaskExecutor,如下所示:

public class ParallelSimpleAsyncTaskExecutor extends SimpleAsyncTaskExecutor 
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public ParallelSimpleAsyncTaskExecutor(String prefix) 
        super(prefix);
    

    @Override
    protected void doExecute(Runnable task) 
        JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
        super.doExecute(new Runnable() 
            @Override
            public
            void run() 
                JobSynchronizationManager.register(jobExecution);
                try 
                    task.run();
                 finally 
                    JobSynchronizationManager.release();
                
                
            
        );
    



【讨论】:

以上是关于Spring Batch 中没有可用于作业范围的上下文持有者的主要内容,如果未能解决你的问题,请参考以下文章