Apache Camel:文件到 BeanIO 并基于 id 合并 beanIO 对象
Posted
技术标签:
【中文标题】Apache Camel:文件到 BeanIO 并基于 id 合并 beanIO 对象【英文标题】:Apache Camel : File to BeanIO and merge beanIO objects based on id 【发布时间】:2018-04-02 07:12:16 【问题描述】:我有一个用例来并行读取员工、地址和联系人文件并将其转换为 beanIO 对象并合并 beanIO 对象以生成完整的employeeDetails 对象。
Emp 文件:
1 Foo Engineer
2 Bar AssistantEngineer
Emp 联系文件:
1 8912345678 foo@org.com
2 7812345678 bar@org.com
Emp 地址文件:
1 city1 1234
2 city2 2345
Exchange 中 EmployeeDetailsBeanIODataFormat 对象的预期输出:
1 Foo Engineer foo@org.com city1 1234
2 Bar AssistantEngineer bar@org.com city2 2345
我有以下路线
from("file://C:/cameltest/employee.txt").to("seda:beanIO");
from("file://C:/cameltest/employeeContact.txt").to("seda:beanIOContact");
from("file://C:/cameltest/employeeAddress.txt").to("seda:beanIOAddress");
每个文件都转换为beanio对象
BeanIODataFormat empFormat = new BeanIODataFormat("beanIO.xml","emp");
BeanIODataFormat empContactFormat = new BeanIODataFormat("beanIO.xml", "empContact");
BeanIODataFormat empAddressFormat = new BeanIODataFormat("beanIO.xml", "empAddress");
from("seda:beanIO").unmarshal(empFormat).log("body - $body");
from("seda:beanIOContact").unmarshal(empContactFormat).log("Contact body $body");
from("seda:beanIO").unmarshal(empAddressFormat).log("Address body - $body");
输出正确记录了 bean 对象。
现在我需要合并对象以形成 EmployeeDetails 对象。有人可以让我知道该怎么做吗?我读过,似乎聚合器可以用来完成这项工作,但不确定这种方法。
有关此示例的任何想法都会有所帮助。 欢迎提出建议,是否建议先根据员工 ID 合并文件并从中创建对象?在这种情况下,我不想将合并的文件写入磁盘,因为 IO 会降低性能。
提前致谢。
【问题讨论】:
【参考方案1】:在解组后使用拆分器拆分每条消息
from("seda:beanIO").unmarshal(empFormat).split(body()).to("seda:aggregate");
from("seda:beanIOContact").unmarshal(empContactFormat).split(body()).to("seda:aggregate");
from("seda:beanIOAddress").unmarshal(empAddressFormat).split(body()).to("seda:aggregate");
这就是聚合器的样子。 详细信息对象作为标头存储在 olddExchange 中。 最重要的参数如下
-
correlationExpression: simple("$body.id") 关联具有相同 id(1 或 2)的所有消息
completionSize=3。每个文件一个。
from("seda:aggregate").aggregate(simple("$body.id"), (oldExchange,newExchange) ->
if (oldExchange == null)
EmployeeDetails details = buildDetails(new EmployeeDetails(), newExchange);
newExchange.getIn().setHeader("details", details);
return newExchange;
EmployeeDetails details = oldExchange.getIn().getHeader("details", EmployeeDetails.class);
buildDetails(details, newExchange);
oldExchange.getIn().setHeader("details", details);
return oldExchange;
).completionSize(3).log("Details - $header.details")
还有
private EmployeeDetails buildDetails(EmployeeDetails details, Exchange newExchange)
Object newBody = newExchange.getIn().getBody();
if (newBody instanceof Employee)
details.setId(((Employee) newBody).getId());
details.setName(((Employee) newBody).getName());
details.setJob(((Employee) newBody).getJob());
else if (newBody instanceof EmployeeContact)
details.setEmail(((EmployeeContact) newBody).getEmail());
else if (newBody instanceof EmployeeAddress)
details.setCity(((EmployeeAddress) newBody).getCity());
details.setCode(((EmployeeAddress) newBody).getCode());
return details;
那么结果将是 2 个细节对象
Details - EmployeeDetails(id=1, name=Foo, job=Engineer, email=foo@org.com, city=city1, code=1234)
Details - EmployeeDetails(id=2, name=Bar, job=AssistantEnginee, email=bar@org.com, city=city2, code=2345)
【讨论】:
它工作正常,但是 EmployeeDetails 被一一打印。但我需要 Exchange 正文中的最终输出 EmployeeDetails[] 以便我可以在下一条路线上传递它来处理最终列表并将其转换为 Avro 格式。是否需要再次聚合? ......completionSize(3).to("seda"formList") ? 这样的工作是否可行,或者是否有任何其他选项可以对所有消息进行分组。 你可以再次聚合(虽然这是一个不同的问题).setBody(header("details")).aggregate(new GroupedExchangeAggregationStrategy()).constant(true).completionSize(2)其中 2 是 id 的数量 @Itsallas,尝试了 GroupedExchangeAggregationStrategy 并在对交换进行分组时出错,为该问题创建了一个新问题。 ***.com/questions/46870073/…以上是关于Apache Camel:文件到 BeanIO 并基于 id 合并 beanIO 对象的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Camel Source 从 S3 到 Kafka
文件目录无法从 apache camel 上传 azure blob 上的文件