尝试传递参数时,Spring批处理中的@StepScope抛出异常

Posted

技术标签:

【中文标题】尝试传递参数时,Spring批处理中的@StepScope抛出异常【英文标题】:@StepScope within Spring batch throw exception when trying pass param 【发布时间】:2015-03-04 09:06:18 【问题描述】:

我很难使用 Spring-boot 将 Spring-batch 与 Spring-batch-admin 结合起来。

因为我使用的是 Spring-batch-admin,所以我不得不禁用 @EnableBatchProcessing,并手动配置了它提供的两个构建器。

这来自我的问题:

Error Integrating spring-batch and Spring-Batch-admin

现在,当我想使用 @StepScope 并注入属性将参数从 jobContext 参数传递到 ItemReader 时,我正在尝试做一个简单的场景

我按照这里的例子(这个例子没有spring-batch-admin和spring boot)

using @StepScope explanation

我得到了这个例外:

2015-01-06 18:22:09.852 ERROR 10260 --- [nio-8080-exec-1] o.s.batch.core.step.AbstractStep         : Exception while closing step execution resources in step step2 in job MyTestJob

java.lang.ClassCastException: com.sun.proxy.$Proxy74 cannot be cast to org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:717)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
    at org.springframework.batch.item.file.FlatFileItemReader$$EnhancerBySpringCGLIB$$e99a9c6b.close(<generated>)
    at org.springframework.batch.item.support.CompositeItemStream.close(CompositeItemStream.java:85)
    at org.springframework.batch.core.step.tasklet.TaskletStep.close(TaskletStep.java:305)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:267)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:386)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at com.mycompany.notification.processor.service.controller.BatchJobController.job1(BatchJobController.java:38)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:777)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:706)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:943)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:877)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:966)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:857)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:618)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:842)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:725)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.springframework.boot.actuate.autoconfigure.EndpointWebMvcAutoConfiguration$ApplicationContextHeaderFilter.doFilterInternal(EndpointWebMvcAutoConfiguration.java:288)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.springframework.batch.admin.web.filter.ParameterUnpackerFilter.doFilterInternal(ParameterUnpackerFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.springframework.boot.actuate.autoconfigure.MetricFilterAutoConfiguration$MetricsFilter.doFilterInternal(MetricFilterAutoConfiguration.java:90)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:219)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:537)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1085)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:658)
    at org.apache.coyote.http11.Http11NioProtocol$Http11ConnectionHandler.process(Http11NioProtocol.java:222)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1556)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1513)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)

2015-01-06 18:22:09.866  INFO 10260 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=MyTestJob]] completed with the following parameters: [pathToFile=snids.csv, date=1420561328719] and the following status: [FAILED]

这就是我通过控制器创建作业的方式:

 @RequestMapping(value = "/job1")
    public @ResponseBody StatusResponse job1() 
        try 
            JobParameters jobParameters = new JobParametersBuilder().addString("pathToFile", "snids.csv").addDate("date", new Date()).toJobParameters();
            jobLauncher.run(job1,jobParameters);
            return new StatusResponse(true);

         catch (JobInstanceAlreadyCompleteException ex) 
            return new StatusResponse(false, "This job has been completed already!");

         catch (Exception e) 
            throw new RuntimeException(e);
        
    

Application.class:

@ComponentScan("com.mycompany.notification.processor.service")
@EnableAutoConfiguration
@Configuration
@ImportResource(
        "classpath:integration-context.xml","classpath:launch-context.xml")
@PropertySource("file:///d://etc/mycompany/services/pushExecuterService/pushExecuterServices.properties")
//@Import( ServletConfiguration.class, WebappConfiguration.class )
public class Application 
    public static void main(String[] args) 
        SpringApplication.run(Application.class, args);
        System.out.printf("hello man");
    

和 BatchConfiguration 类:

@Configuration
public class BatchConfiguration 

    private static final Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

    @Inject
    private WriteToKafkaTasklet writeToKafkaTasklet;

    @Inject
    private StepBuilderFactory steps;

    @Autowired
    private DownloadFileTasklet downloadFileTasklet;

    @Inject
    private JobBuilderFactory jobs;

    @Inject
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() throws Exception 
        return this.jobs.get("MyTestJob").start(downloadFileFromETStep()).next(processFileStep()).next(pushToKafkaStep()).build();

    


    private Step pushToKafkaStep() 
        return this.steps.get("step3").tasklet(writeToKafkaTasklet).build();
    

    private Step downloadFileFromETStep() 
        return this.steps.get("step1").tasklet(downloadFileTasklet).build();
    

    private static final String OVERRIDDEN_BY_EXPRESSION = null;


    @Bean
    public Step processFileStep() 
        return stepBuilderFactory.get("step2")
                .<PushItemDTO, PushItemDTO>chunk(1) //important to be one in this case to commit after every line read
                .reader(reader(OVERRIDDEN_BY_EXPRESSION))
                       //  .processor(processor())
                .writer(writer())
                        //    .listener(logProcessListener())
                        //     .faultTolerant()
                        //   .skipLimit(10) //default is set to 0
                        //     .skip(mysqlIntegrityConstraintViolationException.class)
                .build();
    

    @Bean
    public ItemWriter writer() 
        return new ItemWriter() 

            @Override
            public void write(List items) throws Exception 

                logger.debug("hello");
            
        ;
    
    @Bean
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public FlatFileItemReader<PushItemDTO> reader(@Value("#jobParameters[pathToFile]") String pathToFile)
        FlatFileItemReader<PushItemDTO> itemReader = new FlatFileItemReader<PushItemDTO>();
        itemReader.setLineMapper(lineMapper());
        itemReader.setResource(new ClassPathResource(pathToFile));
        return itemReader;
    

    @Bean
    public LineMapper<PushItemDTO> lineMapper() 
        DefaultLineMapper<PushItemDTO> lineMapper = new DefaultLineMapper<PushItemDTO>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        //  lineTokenizer.setDelimiter(",");
        lineTokenizer.setStrict(false);

        lineTokenizer.setNames(new String[]"SNID");
        BeanWrapperFieldSetMapper<PushItemDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<PushItemDTO>();
        fieldSetMapper.setTargetType(PushItemDTO.class);


        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(new PushItemFieldSetMapper());


        return lineMapper;
    

Servlet 配置:

@Configuration
@ImportResource("classpath:/org/springframework/batch/admin/web/resources/servlet-config.xml")
public class ServletConfiguration 

网页应用配置:

@Configuration
@ImportResource("classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml")
public class WebappConfiguration 

谢谢

我编辑了我的代码,而不是添加了@StepScope

@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)

按照建议。

现在我得到不同的异常:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'batchJobController': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.batch.core.Job com.mycompany.notification.processor.service.controller.BatchJobController.job1; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'job' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Job]: Factory method 'job' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'processFileStep' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'processFileStep' threw exception; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:334)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1202)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:537)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476)
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:762)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:757)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:109)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:691)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:321)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:961)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:950)
    at com.mycompany.notification.processor.service.main.Application.main(Application.java:20)
Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.batch.core.Job com.mycompany.notification.processor.service.controller.BatchJobController.job1; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'job' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Job]: Factory method 'job' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'processFileStep' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'processFileStep' threw exception; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:558)
    at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87)
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331)
    ... 16 common frames omitted

【问题讨论】:

【参考方案1】:

Spring Batch Admin 通过 XML 配置 StepScope,它提供 java 代理作为代理机制。但是,@StepScope 使用动态子类。为了使它工作,而不是使用@StepScope 快捷方式,使用以下:

@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)

除了上述更新之外,您还需要返回接口(而不是实现),以便正确代理它。因此,在这种情况下,您需要返回 ItemStreamReader 而不是 FlatFileItemReader

【讨论】:

我按照你的建议做了,我得到了 dif 异常:原因:org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.batch.core.Step]:工厂方法 'processFileStep'抛出异常;嵌套异常是 java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader at ..... 原因:java.lang.ClassCastException: com.sun .proxy.$Proxy59 无法转换为 org.springframework.batch.item.file.FlatFileItemReader 编辑了我的问题,以便您更清楚地看到它。谢谢。 你能更新你的配置吗?该堆栈跟踪对于发布的配置没有意义。 好的,我更新了问题。我已经更新了 Application.class、BatchConfiguration.class、ServletConfiguration.class、WebappConfiguration.class 。请检查出来,让我知道是否还有其他需要。谢谢。 我在配置中看不到任何明显的东西。如果您可以将项目(或重新创建问题的示例)扔到 github 上,我可以更深入地了解。

以上是关于尝试传递参数时,Spring批处理中的@StepScope抛出异常的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Dataflow - 处理任务中的参数

Spring Boot中的JPA @query中没有传递参数[重复]

spring mvc requestparam value可以写多个值吗

Spring Data JPA如何传递日期参数

如何获取Spring批处理作业参数并传递给sql

如何将 JobParameters 传递给 Spring Cloud Task 启动的批处理作业?