一个Camel Multicast组件聚合策略问题的解决过程

Posted 华为云开发者社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个Camel Multicast组件聚合策略问题的解决过程相关的知识,希望对你有一定的参考价值。

摘要:本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。希望本文可以帮助到遇到同样问题的Camel用户。

本文分享自华为云社区《使用Apache Camel Multicast组件遇到的一个问题》,作者:中间件小哥。

1 前言

本文翻译自华为加拿大研究所的Reji Mathews发表于Apache Camel社区的《ROUTING MULTICAST OUTPUT AFTER ENCOUNTERING PARTIAL FAILURES》一文。在征得原作者同意后,本文对原文的部分内容作了少许修改。

2 Multicast组件简介

Multicast是Apache Camel(以下简称“Camel”)中一个功能强大的EIP组件,可以将消息发送至多条子路径,然后并行地执行它们。

参考官网文档,我们可以使用两种方式配置Multicast组件:

  • 独立执行所有子路径,并将最后响应的子路径的结果作为最终输出。这也是Multicast组件的默认配置。
  • 通过实现Camel的聚合策略(Aggregation Strategy),使用自定义的聚合器来处理所有子路径的输出。

3 问题描述

本文使用案例如下:使用Jetty组件发布一个API,调用该API后,消息会分别发送至"direct:A"和"direct:B"两条子路径。在使用自定义的聚合策略处理后,继续执行后续步骤。其中在"direct:A"中抛出一个异常,来模拟运行失败;"direct:B"正常运行。同时在onException中定义了异常处理策略。

本文使用的Camel版本为3.8.0

@Override
public void configure() throws Exception {
    onException(Exception.class)
        .useOriginalMessage()
        .handled(true)
        .log("Exception handler invoked")
        .transform().constant("{\\"data\\" : \\"err\\"}")
        .end();
 
 from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET")
        .log("received request")
        .log("Entering multicast")
        .multicast(new SimpleFlowMergeAggregator())
        .parallelProcessing().to("direct:A", "direct:B")
        .end()
        .log("Aggregated results ${body}")
        .log("Another log")
        .transform(simple("{\\"result\\" : \\"success\\"}"))
        .end();
 
    from("direct:A")
        .log("Executing PATH_1 - exception path")
        .transform(constant("DATA_FROM_PATH_1"))
        .log("Starting exception throw")
        .throwException(new Exception("USER INITIATED EXCEPTION"))
        .log("PATH_1")
        .end();
 
    from("direct:B")
        .log("Executing PATH_2 - success path")
        .delayer(1000)
        .transform(constant("DATA_FROM_PATH_2"))
        .log("PATH_2")
        .end();
}

自定义聚合器SimpleFlowMergeAggregator定义如下,其中我们将所有子路径的结果放入一个list对象。

public class SimpleFlowMergeAggregator implements AggregationStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
        if(oldExchange == null) {
            String data = newExchange.getIn().getBody(String.class);
            List<String> aggregatedDataList = new ArrayList<>();
            aggregatedDataList.add(data);
            newExchange.getIn().setBody(aggregatedDataList);
            return newExchange;
        }
 
        List<String> oldData = oldExchange.getIn().getBody(List.class);
        oldData.add(newExchange.getIn().getBody(String.class));
        oldExchange.getIn().setBody(oldData);
 
        return oldExchange;
    }
}

基于对Multicast组件执行逻辑的理解,我们认为存在多个子路径时,其运行结果应该为:如果其中有一条子路径能运行成功,则使用聚合的结果继续执行后续步骤;如果所有子路径都运行失败,则停止整个路由(route)。本案例中,由于子路径"direct:A"运行异常,子路径"direct:B"运行正常,则应该正常执行后续两个步骤日志(log)和转换(transform)。

运行上述案例,日志信息如下:

2021-05-06 12:43:18.565 INFO 13956 --- [qtp916897446-42] route1 : received request
2021-05-06 12:43:18.566 INFO 13956 --- [qtp916897446-42] route1 : Entering multicast
2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Executing PATH_1 - exception path
2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Starting exception throw
2021-05-06 12:43:18.578 INFO 13956 --- [ #4 - Multicast] route2 : Exception handler invoked
2021-05-06 12:43:18.579 INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}
2021-05-06 12:43:19.575 INFO 13956 --- [ #3 - Multicast] route3 : Executing PATH_2 - success path
2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] route3 : PATH_2
2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2

观察上述日志,我们发现完成两条子路径结果的聚合后,后续的两个步骤日志(log)和转换(transform)并未执行。这并不符合我们期望的结果。

经过多次测试,我们还发现,只有当到达聚合器SimpleFlowMergeAggregator的第一个子路径("direct:A")执行异常时,便会发生这种后续步骤未执行的情况;而如果第一个子路径("direct:A")执行成功,即使另一个子路径("direct:B")执行失败,也会继续执行后续的步骤。

4 问题分析

接下来,我们通过查看Camel源代码,来找出上述现象的原因。

在camel-core-processors模块的Pipeline.java 中,其run()方法中有这样一段代码:

@Override
public void run() {
    boolean stop = exchange.isRouteStop();
    int num = index;
    boolean more = num < size;
    boolean first = num == 0;
 
    if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
 
 // prepare for next run
        if (exchange.hasOut()) {
            exchange.setIn(exchange.getOut());
            exchange.setOut(null);
        }
 
 // get the next processor
        AsyncProcessor processor = processors.get(index++);
 
        processor.process(exchange, this);
    } else {
 // copyResults is needed in case MEP is OUT and the message is not an OUT message
        ExchangeHelper.copyResults(exchange, exchange);
 
 // logging nextExchange as it contains the exchange that might have altered the payload and since
 // we are logging the completion if will be confusing if we log the original instead
 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
        if (LOG.isTraceEnabled()) {
            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
        }
 
        AsyncCallback cb = callback;
        taskFactory.release(this);
        reactiveExecutor.schedule(cb);
    }
}

其中,这个if判断决定了是否继续执行后续步骤:

if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG)))

可以看出,在如下三种情况下,后续步骤将不会被执行:

1. 之前的步骤已经将exchange 对象标记为停止状态。

boolean stop = exchange.isRouteStop();

2. 后续没有步骤可执行。

boolean more = num < size;

3. continueProcessing()方法返回false。

我们来看看continueProcessing()方法的代码。

public final class PipelineHelper {
    public static boolean continueProcessing(Exchange exchange, String message, Logger log) {
        ExtendedExchange ee = (ExtendedExchange) exchange;
        boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()
                || (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled());
        if (stop) {
            if (log.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange);
                if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {
                    sb.append(" Marked as rollback only.");
                }
                if (exchange.getException() != null) {
                    sb.append(" Exception: ").append(exchange.getException());
                }
                if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {
                    sb.append(" Handled by the error handler.");
                }
                log.debug(sb.toString());
            }
 
            return false;
        }
        if (ee.isRouteStop()) {
            if (log.isDebugEnabled()) {
                log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);
            }
            return false;
        }
 
        return true;
    }
}

可以看出,当执行过程发生异常并且被异常处理器捕获时,continueProcessing()方法将返回false。

再回到我们的案例,第一个到达聚合器SimpleFlowMergeAggregator的子路径("direct:A"),会作为后续聚合的基础,其它子路径("direct:B")会在此基础上追加各自的body数据。实际上,很多Camel用户都会采用这种方式来实现自定义聚合策略。但这样做存在一个问题:在异常处理时,子路径"direct:A"的exchange对象会被设置一个状态标识,而此状态标识会被传递到下游,用于判断是否继续执行后续步骤。由于作为聚合基础的"direct:A"子路径的exchange对象状态为“异常”,最终continueProcessing()方法将返回false,后续的步骤也就不会再执行。

5 解决方案

对于上述问题,用户可以使用多种方式来设置异常处理时exchange对象的状态。本文采用如下解决方案:如果第一个子路径执行正常,则继续执行后续步骤;如果第一个子路径执行异常,则将其与其它执行成功的子路径交换,然后继续执行后续步骤。

更新后的自定义聚合器SimpleFlowMergeAggregator如下:

public class SimpleFlowMergeAggregator implements AggregationStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
        if(oldExchange == null) {
            String data = newExchange.getIn().getBody(String.class);
            List<String> aggregatedDataList = new ArrayList<>();
            aggregatedDataList.add(data);
            newExchange.getIn().setBody(aggregatedDataList);
            return newExchange;
        }
 
        if(hadException(oldExchange)) {
            if(!hadException(newExchange)) {
 // aggregate and swap the base
                LOGGER.info("Found new exchange with success. swapping the base exchange");
                List<String> oldData = oldExchange.getIn().getBody(List.class);
                oldData.add(newExchange.getIn().getBody(String.class));
            // swapped the base here
                newExchange.getIn().setBody(oldData);                 
                return newExchange;
            }
        }
 
        List<String> oldData = oldExchange.getIn().getBody(List.class);
        oldData.add(newExchange.getIn().getBody(String.class));
        oldExchange.getIn().setBody(oldData);
 
        return oldExchange;
    }
 
 
    private boolean hadException(Exchange exchange) {
 
        if(exchange.isFailed()) {
            return true;
        }
 
        if(exchange.isRollbackOnly()) {
            return true;
        }
 
        if(exchange.isRollbackOnlyLast()) {
            return true;
        }
 
        if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()
                && ((ExtendedExchange)exchange).isErrorHandlerHandled()) {
            return true;
        }
 
        return false;
    }
}

再次运行上述案例,日志信息如下:

2021-05-06 12:46:19.122 INFO 2576 --- [qtp174245837-45] route1 : received request
2021-05-06 12:46:19.123 INFO 2576 --- [qtp174245837-45] route1 : Entering multicast
2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Executing PATH_1 - exception path
2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Starting exception throw
2021-05-06 12:46:19.134 INFO 2576 --- [ #3 - Multicast] route2 : Exception handler invoked
2021-05-06 12:46:19.135 INFO 2576 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}
2021-05-06 12:46:20.130 INFO 2576 --- [ #4 - Multicast] route3 : Executing PATH_2 - success path
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] route3 : PATH_2
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Found new exchange with success. swapping the base exchange
2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Aggregated results {"data" : "err"},DATA_FROM_PATH_2
2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Another log

可以看出,使用新的自定义聚合策略后,后续的日志(log)和转换(transform)步骤都成功执行。

6 结语

本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。

希望本文可以帮助到遇到同样问题的Camel用户。

 

点击关注,第一时间了解华为云新鲜技术~

Apache camel 聚合多个 REST 服务响应

【中文标题】Apache camel 聚合多个 REST 服务响应【英文标题】:Apache camel to aggregate multiple REST service responses 【发布时间】:2012-05-23 13:26:30 【问题描述】:

我是 Camel 的新手,想知道如何使用 Camel 实现下面提到的用例,

我们有一个 REST Web 服务,假设它有两个服务操作 callA 和 callB。 现在我们在前面有 ESB 层,它在访问这个实际的 Web 服务 URL 之前拦截客户端请求。

现在我正在尝试做这样的事情 - 在 ESB 中公开客户端将实际调用的 URL。在 ESB 中,我们使用 Camel 的 Jetty 组件,它只是代理此服务调用。所以假设这个 URL 是 /my-service/scan/

现在在收到@ESB 请求时,我想调用这两个 REST 端点(callA 和 callB)-> 获取它们的响应 - resA 和 resB -> 将其聚合到单个响应对象 resScan -> 返回到客户端。

我现在只有-

<route id="MyServiceScanRoute">
<from uri="jetty:http://host.port./my-service/scan/?matchOnUriPrefix=true&amp;bridgeEndpoint=true"/>
<!-- Set service specific headers, monitoring etc. -->  
<!-- Call performScan -->
<to uri="direct:performScan"/>
</route>

<route id="SubRoute_performScan">
<from uri="direct:performScan"/>
<!--  HOW DO I??
Make callA, callB service calls. 
Get their responses resA, resB.
Aggregate these responses to resScan
 -->
</route>

【问题讨论】:

【参考方案1】:

我认为您不必要地使解决方案复杂化了一点。 :) 在我看来,调用两个独立远程 Web 服务并连接结果的最佳方法是:

使用multicast并行调用服务 使用GroupedExchangeAggregationStrategy 聚合结果

上述解决方案的路由可能如下所示:

from("direct:serviceFacade")
  .multicast(new GroupedExchangeAggregationStrategy()).parallelProcessing()
    .enrich("http://google.com?q=Foo").enrich("http://google.com?q=Bar")
  .end();

传递给direct:serviceFacadeResponse 的Exchange 将包含属性Exchange.GROUPED_EXCHANGE 设置为调用您的服务的结果列表(在我的示例中为Google 搜索)。

这就是你如何将direct:serviceFacade 连接到 Jetty 端点:

from("jetty:http://0.0.0.0:8080/myapp/myComplexService").enrich("direct:serviceFacade").setBody(property(Exchange.GROUPED_EXCHANGE));

现在,您使用 Jetty 组件在 ESB 上公开的对服务 URL 的所有 HTTP 请求都将生成来自对子服务的两次调用的串联响应。

关于消息和端点的动态部分的进一步考虑

在许多情况下,在端点中使用静态 URL 不足以满足您的需求。在将负载传递给每个 Web 服务之前,您可能还需要准备负载。

一般来说 - 用于实现动态端点或有效负载参数的路由类型高度依赖于您用于使用 Web 服务的组件(HTTP、CXFRS、Restlet、RSS 等)。每个组件的程度和动态配置方式各不相同。

如果您的端点/有效负载应该受到动态影响,您还可以考虑以下选项:

使用多播端点的onPrepareRef 选项预处理传递到每个端点的交换副本。您可以使用它来引用自定义处理器,该处理器将在将有效负载传递到多播端点之前对其进行修改。这可能是使用 HTTP 组件的 Exchange.HTTP_URI 标头组合 onPrepareRef 的好方法。

使用Recipient List(它也像多播一样提供parallelProcessing)来动态创建 REST 端点 URL。

使用Splitter 模式(启用parallelProcessing)将请求拆分为专用于每个服务的较小消息。再次,此选项可以很好地与 HTTP 组件的 Exchange.HTTP_URI 标头一起使用。这只有在两个子服务都可以使用相同的端点类型定义时才有效。

如您所见,Camel 非常灵活,可让您通过多种方式实现目标。考虑您的问题的背景,然后选择最适合您的解决方案。

如果您向我展示您希望针对聚合服务的每个请求调用的 REST URL 的更具体示例,我可以建议您选择哪种解决方案以及如何实现它。特别重要的是要知道请求的哪一部分是动态的。我还需要知道您要使用哪个服务使用者(这取决于您将从服务接收到的数据类型)。

【讨论】:

感谢您接受我的回答。也欢迎投票:P。 多播指令的精彩演示,Henryk。但是,似乎需要准备有效负载,而不仅仅是对每个服务的请求的 URL。你也可以用多播来做到这一点吗?不是将同一消息多播到多个端点吗? @Petter 当我到达 REST 端点时,它几乎只是 URL(GET 请求)。但是我在这里有一个问题,我需要将参数传递给各个服务调用。例如:0.0.0.0:8080/myapp/myComplexService?keyword=abc 所以这个“关键字”参数必须在单独的服务调用中传递 -> myserver.com/serviceA?keyword=abc & myserver.com/serviceB?keyword=abc @Rishi,Henryk 在上面证明了这一点。查看 URI,q=Foo 和 q=Bar。这将代表您的关键字=abc。 @Petter 其实想说的是——当我调用服务时,客户端传递的所有 url 参数都应该传递(结转)。我在示例中看到的是一个硬编码参数,仅用于简化演示(两个服务调用将始终使用 q=foo 和 q=bar)。阅读 Camel 文档后,我发现 http 组件在这方面很有用。这样我会将客户端发送的 URL 参数转发到不同的端点。这听起来正确吗?【参考方案2】:

这看起来是一个很好的例子,应该使用 Content Enricher 模式。 Described here

<from uri="direct:performScan"/>
   <enrich uri="ServiceA_Uri_Here" strategyRef="aggregateRequestAndA"/>
   <enrich uri="ServiceA_Uri_Here" strategyRef="aggregateAandB"/>
</route>

聚合策略必须用 Java 编写(或者可能是某种脚本语言,Scala/groovy? - 但我没有尝试过)。

聚合策略只需要一个实现 org.apache.camel.processor.aggregate.AggregationStrategy 的 bean,这反过来又需要你实现一个方法:

Exchange aggregate(Exchange oldExchange, Exchange newExchange);

所以,现在由您将请求与来自丰富服务调用的响应合并。你必须做两次,因为你有 callA 和 callB。有两种预定义的聚合策略可能有用也可能没用,UseLatestAggregationStrategy 和 UseOriginalAggregationStrategy。这些名称很容易解释。

祝你好运

【讨论】:

感谢指点!真的很感激。让我开始阅读内容丰富模式等。 在与团队进行一轮讨论后,我们发现 Enricher 选项的成本很高,因为它是连续的(我们可能有超过 2 个服务调用)。现在我正在评估使用的选项 - 1. 组合消息处理器 EIP/拆分器-聚合器 2. 或分散-收集 EIP 3. 或具有聚合器策略的收件人列表 任何输入?哪一个最适合用例? 问题是将请求和聚合响应保持在同一路由中,因为您要回复 HTTP 调用。最吸引人的选择似乎是指定了聚合策略且“parallellProcessing”选项设置为 true 的接收者列表。棘手的部分是编写聚合策略,因为它可能以任何顺序接收答案。我对有效载荷一无所知,但它应该相当直截了当。 camel.apache.org/aggregator.html 有一些示例如何聚合多个响应,例如响应序列中的最高数字。 gl 看起来在多播和收件人列表模式中,“相同”消息被路由到多个消费者。但在这种情况下,两个不同的 Web 服务调用( callA 和 callB )是完全不同的。所以我要尝试这样的事情 -

以上是关于一个Camel Multicast组件聚合策略问题的解决过程的主要内容,如果未能解决你的问题,请参考以下文章

Apache camel 聚合多个 REST 服务响应

Apache Camel 遍历列表

找不到端点:测试,请检查您的类路径是不是包含所需的 Camel 组件 jar

Camel如何动态添加和启动路由?

为啥 ActiveMQ 5.14.x 无法开始嵌入 camel-jms 组件 2.18.3

Apache Camel http组件计时器dockerization