当spring批处理远程分块工作者应用程序启动时不要运行IntegrationFlow

Posted

技术标签:

【中文标题】当spring批处理远程分块工作者应用程序启动时不要运行IntegrationFlow【英文标题】:Do not run the IntegrationFlow when spring batch remote chunking worker application starts 【发布时间】:2022-01-17 05:59:12 【问题描述】:

我是 Spring 集成和批处理的新手,我想开发一个带有 master 和 worker 的远程分块批处理应用程序。我使用 spring 集成和 RabbitMQ 作为消息队列,应用程序运行良好但 worker itemProccessor 自动启动,但是我需要控制何时启动它。

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
public class WorkerConfig 


    @Autowired
    private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;

    @Bean
    public DirectChannel requestsChannel() 
        return new DirectChannel();
    

    @Bean
    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) 
        return IntegrationFlows
                .from(Amqp.inboundAdapter(connectionFactory,"requests"))
                .channel(requestsChannel())
                .get();
    

    @Bean
    public DirectChannel repliesChannel() 
        System.out.println("repliesChannel 3 ");
        return new DirectChannel();
    

    @Bean
    public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) 
        return IntegrationFlows
                .from(repliesChannel())
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("replies"))
                .get();
    

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() 
       ....
    

    @Bean
    public ItemWriter<Integer> itemWriter() 
       ...
    

    @Bean
    public IntegrationFlow workerIntegrationFlow() 
        return this.remoteChunkingWorkerBuilder
                .itemProcessor(itemProcessor())
                .itemWriter(itemWriter())
                .inputChannel(requestsChannel())
                .outputChannel(repliesChannel())
                .build();
    



那么我可以做些什么来手动启动工人部分?

【问题讨论】:

【参考方案1】:

给适配器一个id 并将自动启动设置为false。

@Bean
public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) 
    return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory,"requests")
                .id("inbound")
                .autoStartup(false))
            .channel(requestsChannel())
            .get();

然后@Autowire适配器并启动它...

@Autowired
AmqpInboundChannelAdapter inbound;

...
    inbound.start();

【讨论】:

嗨,感谢您的回复,但我认为它仅适用于 inboundPolledAdapter 而不是 inboundAdapter 我认为我错过了一些东西,因为在编译时它给了我:“无法解析方法'from(org.springframework.integration.amqp.dsl.AmqpInboundChannelAdapterSMLCSpec,)'” .任何解决方案请 对不起,我的错误; idautoStartup 直接进入适配器规范 - 请参阅编辑。 谢谢,现在很好,但我收到以下错误:“需要一个找不到的 'org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter' 类型的 bean”...如果你能指出什么是错的,谢谢你非常匹配 这没有意义;集成流程使用 id 作为名称注册 bean。您可能需要将 @DependsOn("inboundFlow") 添加到自动连接适配器的 bean。

以上是关于当spring批处理远程分块工作者应用程序启动时不要运行IntegrationFlow的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch 远程分区和远程分块的区别

Spring Boot异常处理详解

Flyway Spring Boot应用程序在启动时不应用插入脚本

使用@Cacheable 的Spring 缓存在启动@PostConstruct 时不起作用

在 Windows Phone 中启动应用程序时不显示 Toast

远程 Spring 启动应用程序仍然连接到我的本地 mysql