Apache Camel 遍历列表
Posted
技术标签:
【中文标题】Apache Camel 遍历列表【英文标题】:Apache Camel iterate over List 【发布时间】:2015-01-22 04:56:30 【问题描述】:我是 Apache Camel 的新手,在理解如何实现简单的集成任务时遇到问题:
-
REST 服务正在通过 Apache Camel 路由调用 Spring Bean
Spring Bean 返回一个类的集合 (ArrayList)
我需要遍历集合中的每个项目,并通过自定义转换器将其转换为另一种类型。
似乎我应该使用拆分和聚合器,但我如何约束聚合器消耗原始列表中的所有项目,不多也不少。另外,如何将一个项目转换为另一个项目?我应该使用类型转换器吗?
谁能给我一个简单的例子?
更新 1
抱歉,我不得不撤销对提供的示例的接受,因为它实际上并没有回答我的问题。这是用例限定条件:
我需要从to("bean:someBean")
调用中拆分和转换方法返回值,而不是从某个端点拆分和转换输入值。
所以用例是
-
调用某个端点;例如休息服务的 GET 请求,在我的例子中:
from("endpoint")
调用一个bean并获取它的返回值;比如List
、to("bean:someBean")
)
将返回值转换为另一个List
将转换后的List
返回给消费者
更新 2
所以,我可以确认使用end()
方法并不能解决我的问题。
代码如下:
rest("some/service").get().produces("application/json").to("bean:someBean?method=getListOfObjects").route().split(body(), (oldExchange, newExchange) ->
List<ObjectToConvert> oldList = oldExchange.getIn(List.class);
List<NewObject> convertedList = taskList.stream().map(ObjectToConvert::new).collect(Collectors.toList());
newExchange.getOut().setBody(convertedList);
return newExchange;
).end();
使用这种路由,我在应用服务器上得到以下异常:
19:30:21,126 ERROR [org.jboss.as.controller.management-operation] (XNIO-1 task-5) JBAS014613: Operation ("full-replace-deployment") failed - address: (undefined) - failure description: "JBAS014671: Failed services" => "jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel" => "org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel: Failed to start service
Caused by: java.lang.RuntimeException: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[body -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[body -> []]
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[body -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[body -> []]
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[body -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[body -> []]
Caused by: java.lang.IllegalArgumentException: Definition has no children on Split[body -> []]"
【问题讨论】:
【参考方案1】:这是一个完整的例子,拆分聚合并转换列表消息。
-
骆驼拆分器提供了一个内置的聚合器,用于聚合
原始交换中的所有拆分消息。所以分离器只有
聚合每个列表(交换)的消息在“direct:start”中发送。您必须提供自定义
聚合策略,因为默认的将使用原始的
交换,在我的示例中为 InOrder。聚合策略是
拆分定义的第二个参数。
类型转换器让我们有机会在 DSL。您也可以使用 bean、处理器或在自定义聚合策略中进行所有转换来实现这一点。
package org.mydemocamel.app;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.support.TypeConverterSupport;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.toolbox.FlexibleAggregationStrategy;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class CamelSplitAggregateConvertTest extends CamelTestSupport
@Produce
private ProducerTemplate template;
@EndpointInject(uri = "mock:out")
private MockEndpoint mock;
@Test
public void testSplitAggregateConvertOrder()
InOrder inOrder1 = new InOrder();
inOrder1.setId("1");
InOrder inOrder2 = new InOrder();
inOrder2.setId("2");
List<InOrder> inOrderList = new ArrayList<InOrder>();
inOrderList.add(inOrder1);
inOrderList.add(inOrder2);
template.sendBody("direct:start", inOrderList);
mock.expectedMessageCount(1);
Exchange outList = mock.getReceivedExchanges().get(0);
List<OutOrder> outOrderList = outList.getIn().getBody(List.class);
assertEquals(1, outOrderList.get(0).getId());
assertEquals(2, outOrderList.get(1).getId());
@Override
protected RouteBuilder createRouteBuilder() throws Exception
return new RouteBuilder()
@Override
public void configure() throws Exception
context.getTypeConverterRegistry().addTypeConverter(OutOrder.class, InOrder.class, new MyOrderTypeConverter());
from("direct:start")
.split(body(), new AggregationStrategy()
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
if (oldExchange == null)
List<OutOrder> orders = new ArrayList<OutOrder>();
OutOrder newOrder = newExchange.getIn().getBody(OutOrder.class);
orders.add(newOrder);
newExchange.getIn().setBody(orders);
return newExchange;
List<OutOrder> orders = oldExchange.getIn().getBody(List.class);
OutOrder newOutOrder = newExchange.getIn().getBody(OutOrder.class);
orders.add(newOutOrder);
oldExchange.getIn().setBody(orders);
return oldExchange;
)
.convertBodyTo(OutOrder.class)
.end() //splitter ends here and the exchange body is now List<OutOrder>
.to("mock:out");
;
private static class MyOrderTypeConverter extends TypeConverterSupport
@SuppressWarnings("unchecked")
public <T> T convertTo(Class<T> type, Exchange exchange, Object value)
// converter from inorder to outorder bean
OutOrder order = new OutOrder();
order.setId(Integer.parseInt(((InOrder) value).getId()));
return (T) order;
private static class OutOrder
private int id;
public void setId(int id)
this.id = id;
public int getId()
return id;
private static class InOrder
private String id;
public void setId(String id)
this.id = id;
public String getId()
return id;
【讨论】:
这个例子没有完全涵盖我的情况:我需要从to("bean:someBean")
invovation 拆分和转换方法返回值,而不是从端点拆分和转换输入值。所以情况是 1) 调用某个端点(在我的情况下是 from("endpoint")
) 2)调用一些 bean 并获取它的返回值(它是 List,to("bean:someBean")
3)将返回值转换为另一个 List
你可以这样做 .to("bean:someBean").split(body(),)...... 如果使用直接端点没有区别,比如 .to(bean :someBean).to(direct:start)。直接端点在连接路由时提供同步调用。
很遗憾,我不能,因为 split 不能是交换中的最后一个活动。
我似乎无法理解问题出在哪里。最后一个活动将是转换后的列表的返回。根据您的更新步骤 2) 是调用一些 bean 并获取它的返回值 (它是 List, to("bean:someBean"))。我的例子是第 3 步和第 4 步。所以在第 2 步之后将交换发送到我的(直接:开始)。路线的终点是 .end()。聚合列表将返回给其余消费者是第 1 步。在我之前的评论中,我提到您可以使用不同的路由或不使用,只需附加 .split(body()..aggregate().. transform()...end()。在同一条路线中。
在您的示例中,您只是使用普通 java 转换列表,但聚合器不能像这样工作。您有 2 个选择:1)只需在 getListOfObjects 方法或之后的其他方法中执行这种转换,并避免为此使用骆驼拆分/聚合。这将按照您现在的预期工作。 2) 根据我的示例更新您的代码,这将适用于您的案例,并仔细研究camel.apache.org/splitter.html 中的 [Split aggregate request/reply sample] 部分,以了解有关使用 Camel 拆分/聚合消息的详细信息。希望我有所帮助。
【参考方案2】:
为了将来参考,您可以迭代列表的另一种方法是使用 Java DSL 的 loop
构造。这是一个例子:
from("direct:myDirect")
.loop(header("LIST_LENGTH")) // You will set this header in the processor before with the list lenght.
.process(new Processor()
@Override
public void proocess(Exchange arg0)
MyObj currentElement = (MyObj) arg0.getIn().getBody(List.class).get(LOOP_INDEX);
// Do your processing here.
)
.end()
.end();
LOOP_INDEX
属性将包含当前迭代,从 0 到 LIST_LENGHT 标头值,因此您可以使用它从列表中获取当前元素。
注意双重end()
方法调用:一个是结束循环,另一个是结束路由。
文档:http://camel.apache.org/loop.html
【讨论】:
LOOP_INDEX 来自哪里?【参考方案3】:您对拆分器和聚合器是正确的,http://camel.apache.org/splitter.html 的“拆分聚合请求/回复示例”显示了您的需求。
在拆分器之后是否需要“转换”的对象列表? 如果是,http://camel.apache.org/aggregator2.html 中的“在 AggregationStrategy 中使用列表”这一点看起来适合您的需求。
亲切的问候, 土工
【讨论】:
谢谢,但我看不出我应该如何编写完成谓词,以便聚合器只等待inputList.size()
消息的到来以上是关于Apache Camel 遍历列表的主要内容,如果未能解决你的问题,请参考以下文章