Apache Camel 遍历列表

Posted

技术标签:

【中文标题】Apache Camel 遍历列表【英文标题】:Apache Camel iterate over List 【发布时间】:2015-01-22 04:56:30 【问题描述】:

我是 Apache Camel 的新手,在理解如何实现简单的集成任务时遇到问题:

    REST 服务正在通过 Apache Camel 路由调用 Spring Bean Spring Bean 返回一个类的集合 (ArrayList)

我需要遍历集合中的每个项目,并通过自定义转换器将其转换为另一种类型。

似乎我应该使用拆分和聚合器,但我如何约束聚合器消耗原始列表中的所有项目,不多也不少。另外,如何将一个项目转换为另一个项目?我应该使用类型转换器吗?

谁能给我一个简单的例子?

更新 1

抱歉,我不得不撤销对提供的示例的接受,因为它实际上并没有回答我的问题。这是用例限定条件: 我需要从to("bean:someBean") 调用中拆分和转换方法返回值,而不是从某个端点拆分和转换输入值。

所以用例是

    调用某个端点;例如休息服务的 GET 请求,在我的例子中:from("endpoint") 调用一个bean并获取它的返回值;比如Listto("bean:someBean")) 将返回值转换为另一个List 将转换后的List 返回给消费者

更新 2

所以,我可以确认使用end() 方法并不能解决我的问题。

代码如下:

rest("some/service").get().produces("application/json").to("bean:someBean?method=getListOfObjects").route().split(body(), (oldExchange, newExchange) -> 
                List<ObjectToConvert> oldList = oldExchange.getIn(List.class);
                List<NewObject> convertedList = taskList.stream().map(ObjectToConvert::new).collect(Collectors.toList());
                newExchange.getOut().setBody(convertedList);

                return newExchange;
            ).end();

使用这种路由,我在应用服务器上得到以下异常:

19:30:21,126 ERROR [org.jboss.as.controller.management-operation] (XNIO-1 task-5) JBAS014613: Operation ("full-replace-deployment") failed - address: (undefined) - failure description: "JBAS014671: Failed services" => "jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel" => "org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel: Failed to start service
    Caused by: java.lang.RuntimeException: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[body -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[body -> []]
    Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[body -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[body -> []]
    Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[body -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[body -> []]
    Caused by: java.lang.IllegalArgumentException: Definition has no children on Split[body -> []]"

【问题讨论】:

【参考方案1】:

这是一个完整的例子,拆分聚合并转换列表消息。

    骆驼拆分器提供了一个内置的聚合器,用于聚合 原始交换中的所有拆分消息。所以分离器只有 聚合每个列表(交换)的消息在“direct:start”中发送。您必须提供自定义 聚合策略,因为默认的将使用原始的 交换,在我的示例中为 InOrder。聚合策略是 拆分定义的第二个参数。

    类型转换器让我们有机会在 DSL。您也可以使用 bean、处理器或在自定义聚合策略中进行所有转换来实现这一点。

    package org.mydemocamel.app;
    import org.apache.camel.*;
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.mock.MockEndpoint;
    import org.apache.camel.processor.aggregate.AggregationStrategy;
    import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
    import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
    import org.apache.camel.support.TypeConverterSupport;
    import org.apache.camel.test.junit4.CamelTestSupport;
    import org.apache.camel.util.toolbox.FlexibleAggregationStrategy;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class CamelSplitAggregateConvertTest extends CamelTestSupport 
    
        @Produce
        private ProducerTemplate template;
    
        @EndpointInject(uri = "mock:out")
        private MockEndpoint mock;
    
        @Test
        public void testSplitAggregateConvertOrder()
            InOrder inOrder1 = new InOrder();
            inOrder1.setId("1");
    
            InOrder inOrder2 = new InOrder();
            inOrder2.setId("2");
    
            List<InOrder> inOrderList = new ArrayList<InOrder>();
            inOrderList.add(inOrder1);
            inOrderList.add(inOrder2);
    
            template.sendBody("direct:start", inOrderList);
    
            mock.expectedMessageCount(1);
            Exchange outList = mock.getReceivedExchanges().get(0);
            List<OutOrder> outOrderList = outList.getIn().getBody(List.class);
    
            assertEquals(1, outOrderList.get(0).getId());
            assertEquals(2, outOrderList.get(1).getId());
    
    
        
    
        @Override
        protected RouteBuilder createRouteBuilder() throws Exception 
            return new RouteBuilder() 
                @Override
                public void configure() throws Exception 
    
                    context.getTypeConverterRegistry().addTypeConverter(OutOrder.class, InOrder.class, new MyOrderTypeConverter());
    
                    from("direct:start")
                    .split(body(), new AggregationStrategy() 
                        @Override
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
                            if (oldExchange == null) 
                                List<OutOrder> orders = new ArrayList<OutOrder>();
                                OutOrder newOrder = newExchange.getIn().getBody(OutOrder.class);
                                orders.add(newOrder);
                                newExchange.getIn().setBody(orders);
                                return newExchange;
                            
                            List<OutOrder> orders = oldExchange.getIn().getBody(List.class);
                            OutOrder newOutOrder = newExchange.getIn().getBody(OutOrder.class);
                            orders.add(newOutOrder);
                            oldExchange.getIn().setBody(orders);
                            return oldExchange;
                        
                    )
                    .convertBodyTo(OutOrder.class)
                    .end()  //splitter ends here and the exchange body  is now List<OutOrder> 
                    .to("mock:out");
    
                
            ;
        
    
        private static class MyOrderTypeConverter extends TypeConverterSupport 
    
            @SuppressWarnings("unchecked")
            public <T> T convertTo(Class<T> type, Exchange exchange, Object value) 
                // converter from inorder to outorder bean
                OutOrder order = new OutOrder();
                order.setId(Integer.parseInt(((InOrder) value).getId()));
                return (T) order;
            
        
    
        private static class OutOrder 
            private int id;
    
            public void setId(int id) 
                this.id = id;
            
    
            public int getId() 
                return id;
            
        
    
        private static class InOrder 
            private String id;
    
            public void setId(String id) 
                this.id = id;
            
    
            public String getId() 
                return id;
            
        
    
    

【讨论】:

这个例子没有完全涵盖我的情况:我需要从to("bean:someBean") invovation 拆分和转换方法返回值,而不是从端点拆分和转换输入值。所以情况是 1) 调用某个端点(在我的情况下是 from("endpoint")) 2)调用一些 bean 并获取它的返回值(它是 List,to("bean:someBean") 3)将返回值转换为另一个 List 你可以这样做 .to("bean:someBean").split(body(),)...... 如果使用直接端点没有区别,比如 .to(bean :som​​eBean).to(direct:start)。直接端点在连接路由时提供同步调用。 很遗憾,我不能,因为 split 不能是交换中的最后一个活动。 我似乎无法理解问题出在哪里。最后一个活动将是转换后的列表的返回。根据您的更新步骤 2) 是调用一些 bean 并获取它的返回值 (它是 List, to("bean:someBean"))。我的例子是第 3 步和第 4 步。所以在第 2 步之后将交换发送到我的(直接:开始)。路线的终点是 .end()。聚合列表将返回给其余消费者是第 1 步。在我之前的评论中,我提到您可以使用不同的路由或不使用,只需附加 .split(body()..aggregate().. transform()...end()。在同一条路线中。 在您的示例中,您只是使用普通 java 转换列表,但聚合器不能像这样工作。您有 2 个选择:1)只需在 getListOfObjects 方法或之后的其他方法中执行这种转换,并避免为此使用骆驼拆分/聚合。这将按照您现在的预期工作。 2) 根据我的示例更新您的代码,这将适用于您的案例,并仔细研究camel.apache.org/splitter.html 中的 [Split aggregate request/reply sample] 部分,以了解有关使用 Camel 拆分/聚合消息的详细信息。希望我有所帮助。 【参考方案2】:

为了将来参考,您可以迭代列表的另一种方法是使用 Java DSL 的 loop 构造。这是一个例子:

from("direct:myDirect")
.loop(header("LIST_LENGTH")) // You will set this header in the processor before with the list lenght.
.process(new Processor()

      @Override
      public void proocess(Exchange arg0)
          MyObj currentElement = (MyObj) arg0.getIn().getBody(List.class).get(LOOP_INDEX);
          // Do your processing here.
      
)
.end()
.end();

LOOP_INDEX 属性将包含当前迭代,从 0 到 LIST_LENGHT 标头值,因此您可以使用它从列表中获取当前元素。

注意双重end()方法调用:一个是结束循环,另一个是结束路由。

文档:http://camel.apache.org/loop.html

【讨论】:

LOOP_INDEX 来自哪里?【参考方案3】:

您对拆分器和聚合器是正确的,http://camel.apache.org/splitter.html 的“拆分聚合请求/回复示例”显示了您的需求。

在拆分器之后是否需要“转换”的对象列表? 如果是,http://camel.apache.org/aggregator2.html 中的“在 AggregationStrategy 中使用列表”这一点看起来适合您的需求。

亲切的问候, 土工

【讨论】:

谢谢,但我看不出我应该如何编写完成谓词,以便聚合器只等待inputList.size() 消息的到来

以上是关于Apache Camel 遍历列表的主要内容,如果未能解决你的问题,请参考以下文章

从列表中获取匹配字符串并创建新列表

Apache Camel

[每日一学]apache camel|BDD方式开发apache camel|Groovy|Spock

apache camel 条件路由

Apache Camel 压缩包

如何在apache camel中附加速度文件内容