EIP/Apache Camel - 如何同时处理消息,但每个组原子处理?

Posted

技术标签:

【中文标题】EIP/Apache Camel - 如何同时处理消息,但每个组原子处理?【英文标题】:EIP/Apache Camel - How to process message concurrently, but atomically per group? 【发布时间】:2019-05-16 01:07:52 【问题描述】:

我有以下情况:

有固定数量的组。 有传入消息的 TCP 流。每条消息都与一个组相关。

我开始骆驼路线如下:

public class MyMessage implements Runnable 
    public void run() 
        // omitted here
    


from("netty:tcp://localhost:7777?textline=true&sync=false")
   ... // omitted here: parse message to pojo MyMessage, set header "group-identifier"
   .to(seda:process);

此 Camel 路由使用 TCP 流,解析每个传入消息的有效负载并将其转换为 MyMessage pojo,并在与消息对应的交换器上设置 group-identifier 标头...

现在我想消费seda:process如下:

不能同时执行属于同一组的消息。 属于不同组的消息可以同时执行。 应通过调用run() 来执行每条消息。我想为此提供/定义一个ExecutorService,这样我就可以控制线程数。

我可以在这里应用哪些企业集成模式?如何将这些概念映射到 Camel?

了解到 ActiveMQ 有消息组的概念(http://activemq.apache.org/message-groups.html)。这可能会提供一种方法来确保同一组的两条消息永远不会同时执行。不过,我不确定仅为此引入 ActiveMQ 是不是矫枉过正。这也可以通过“核心”Camel/Java 来实现吗?

【问题讨论】:

【参考方案1】:

在 ActiveMQ 中很容易做到这一点。以下代码sn -p根据需要模拟执行消息:

属于同一组的消息按顺序执行。 属于不同组的消息同时执行。

这依赖于 http://activemq.apache.org/message-groups.html 中解释的 ActiveMQ 消息组。

final CamelContext context = new DefaultCamelContext();

context.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"));
context.addRoutes(new RouteBuilder() 
    @Override
    public void configure() 
        from("activemq:queue:q?concurrentConsumers=5")
                .process(exchange -> 
                    System.out.println(Thread.currentThread() + " - " + exchange.getIn().getBody());
                    Thread.sleep(5000);
                );
    
);
context.start();

for (int i = 0; i < 1000; ++i) 
    context.createFluentProducerTemplate()
            .withBody("This is a message from group : " + (i % 5))
            .withHeader("JMSXGroupID", "" + (i % 5))
            .to("activemq:queue:q")
            .send();

也就是说,我(仍然)想知道这是否可以使用纯 EIP/Camel-core 来完成。

【讨论】:

以上是关于EIP/Apache Camel - 如何同时处理消息,但每个组原子处理?的主要内容,如果未能解决你的问题,请参考以下文章

如何在JUnit测试中模拟Camel处理器

Apache camel 错误处理如何与多播和事务一起使用

使用 Camel 并行处理大型 SQL 表

如何在 apache camel 中执行 gcp pubsub 消息的并行处理

如何在有条件的 apache camel 中添加后处理拦截器?

如何在apache camel 2.23.1版本的处理器交换对象中获取RouteId?