Spring Integration - 集成流模板

Posted

技术标签:

【中文标题】Spring Integration - 集成流模板【英文标题】:Spring Integration - Integration Flow Template 【发布时间】:2020-11-12 00:42:51 【问题描述】:

我已经在 DSL 中成功实现了一些集成流程,在 PROD 中一切运行良好。 对于我的几乎所有流(+-10),我只有一个消息源和一个处理程序以及一些对于每个流始终相同的额外功能:

创建发送电子邮件的错误流 添加断路器 添加 httpinbound 以强制运行 ...

我想知道是否有一些优雅的方式来分解它,比如抽象配置类或模板(可能使用 flowIntegrationAdatper ?)

鉴于这个抽象级别,在每个流配置类中我只想提供/覆盖两个方法:

消息源 处理程序

@Configuration
@ConfigurationProperties("app.flows.sample")
public class SampleFlowConfiguration 

    public static final String FLOW_NAME = "SampleFlow";
    public static final String POLLER = "poller";

    private final Service service;
    private final TaskExecutorFactory taskExecutorFactory;
    private final ErrorFlowFactory errorFlowFactory;

    public SampleFlowConfiguration(Service service,
                                   TaskExecutorFactory taskExecutorFactory,
                                   ErrorFlowFactory errorFlowFactory) 
        this.service = service;
        this.taskExecutorFactory = taskExecutorFactory;
        this.errorFlowFactory = errorFlowFactory;
    

    @Bean
    public IntegrationFlow sampleFlow() 
        return IntegrationFlows
                .from(objectToTreatsSource(), sampleProducer())
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, sampleErrorChannel()))
                .channel(MessageChannels.executor(sampleConsumerTaskExecutor()))
                .handle(handler())
                .get();
    

    @Bean
    public MessageSource objectToTreatsSource() 
        return service.getObjectToTreat();
    

    @Bean
    public Consumer<SourcePollingChannelAdapterSpec> sampleProducer() 
        return c -> c.poller(Pollers.cron("* * * * * *")
                .maxMessagesPerPoll(10)
                .errorChannel(sampleErrorChannel())
                .taskExecutor(samplePollerTaskExecutor()))
                .autoStartup(false)
                .id(POLLER);
    

    @Bean
    public MessageHandler objectHandler() 
        return new AbstractReplyProducingMessageHandler() 
            @Override
            protected Object handleRequestMessage(Message<?> message) 
                service.handle(message.getPayload());
                return message;
            
        ;
    

    @Bean
    public Executor samplePollerTaskExecutor() 
        return taskExecutorFactory.getTaskExecutor(10, "sampleProducerExec");
    

    @Bean
    public Executor sampleConsumerTaskExecutor() 
        return taskExecutorFactory.getTaskExecutor(10, "sampleConsumerExec");
    

    @Bean
    public DirectChannel sampleErrorChannel() 
        return MessageChannels.direct().get();
    

    @Bean
    public IntegrationFlow samplesExpirationErrorFlow() 
        return errorFlowFactory.getDefaultErrorFlow(
                sampleErrorChannel(),
                m -> POLLER,
                mailTransformer());
    

    @Bean
    public MessagingExceptionToMailTransformer mailTransformer() 
        FlowErrorMessageBuilder flowErrorMessageBuilder = messageException ->
                "Error while processing sample";
        return new MessagingExceptionToMailTransformer(FLOW_NAME, flowErrorMessageBuilder, true);
    



谢谢

【问题讨论】:

【参考方案1】:

IntegrationFlows 是一个工厂,它确实可以从一些通用方法中使用:

@Bean
public IntegrationFlow sampleFlow() 
    return myFlowBuilder(objectToTreatsSource(), handler());


private IntegrationFlow myFlowBuilder(MessageSource<?> messageSource, MessageHandler  handler) 
 return IntegrationFlows
            .from(messageSource, sampleProducer())
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, sampleErrorChannel()))
            .channel(MessageChannels.executor(sampleConsumerTaskExecutor()))
            .handle(handler)
            .get();

另一方面,您可以将这些流的公共部分提取到一个独立的流中,并使用gateway() 对该流执行请求并等待回复。

请记住:MessageChannel 抽象是框架中的一等公民之一,因此您始终可以从某个地方发送到某个频道。

【讨论】:

谢谢,我会试试这个,如果需要,我会回复你。

以上是关于Spring Integration - 集成流模板的主要内容,如果未能解决你的问题,请参考以下文章

Spring批量集成:java.lang.ClassCastException:org.springframework.integration.file.FileReadingMessageSourc

集成框架Spring Integration, Mule ESB or Apache Camel比较

Spring Integration运行时配置

Spring Integration 5.1 - 使用 @IntegrationConverter 的集成流转换不起作用

Spring Integration Dispatcher 没有频道订阅者

使用 Spring Integration JDBC Outbound Gateway 更新数据