Apache Camel:文件到 BeanIO 并基于 id 合并 beanIO 对象

Posted

技术标签:

【中文标题】Apache Camel:文件到 BeanIO 并基于 id 合并 beanIO 对象【英文标题】:Apache Camel : File to BeanIO and merge beanIO objects based on id 【发布时间】:2018-04-02 07:12:16 【问题描述】:

我有一个用例来并行读取员工、地址和联系人文件并将其转换为 beanIO 对象并合并 beanIO 对象以生成完整的employeeDetails 对象。

Emp 文件:

1 Foo Engineer
2 Bar AssistantEngineer

Emp 联系文件:

1 8912345678  foo@org.com
2 7812345678    bar@org.com

Emp 地址文件:

 1 city1 1234
 2 city2 2345

Exchange 中 EmployeeDetailsBeanIODataFormat 对象的预期输出:

1 Foo Engineer foo@org.com city1 1234
2 Bar AssistantEngineer bar@org.com city2 2345

我有以下路线

from("file://C:/cameltest/employee.txt").to("seda:beanIO");
from("file://C:/cameltest/employeeContact.txt").to("seda:beanIOContact");
from("file://C:/cameltest/employeeAddress.txt").to("seda:beanIOAddress");

每个文件都转换为beanio对象

BeanIODataFormat empFormat = new BeanIODataFormat("beanIO.xml","emp");
BeanIODataFormat empContactFormat = new BeanIODataFormat("beanIO.xml", "empContact");
BeanIODataFormat empAddressFormat = new BeanIODataFormat("beanIO.xml", "empAddress");

from("seda:beanIO").unmarshal(empFormat).log("body - $body");        
from("seda:beanIOContact").unmarshal(empContactFormat).log("Contact body $body");
from("seda:beanIO").unmarshal(empAddressFormat).log("Address body - $body");     

输出正确记录了 bean 对象。

现在我需要合并对象以形成 EmployeeDetails 对象。有人可以让我知道该怎么做吗?我读过,似乎聚合器可以用来完成这项工作,但不确定这种方法。

有关此示例的任何想法都会有所帮助。 欢迎提出建议,是否建议先根据员工 ID 合并文件并从中创建对象?在这种情况下,我不想将合并的文件写入磁盘,因为 IO 会降低性能。

提前致谢。

【问题讨论】:

【参考方案1】:

在解组后使用拆分器拆分每条消息

from("seda:beanIO").unmarshal(empFormat).split(body()).to("seda:aggregate");
from("seda:beanIOContact").unmarshal(empContactFormat).split(body()).to("seda:aggregate");
from("seda:beanIOAddress").unmarshal(empAddressFormat).split(body()).to("seda:aggregate");

这就是聚合器的样子。 详细信息对象作为标头存储在 olddExchange 中。 最重要的参数如下

    correlationExpression: simple("$body.id") 关联具有相同 id(1 或 2)的所有消息 completionSize=3。每个文件一个。

from("seda:aggregate").aggregate(simple("$body.id"), (oldExchange,newExchange) -> 
        if (oldExchange == null) 
            EmployeeDetails details = buildDetails(new EmployeeDetails(), newExchange);
            newExchange.getIn().setHeader("details", details);
            return newExchange;
        
        EmployeeDetails details = oldExchange.getIn().getHeader("details", EmployeeDetails.class);
        buildDetails(details, newExchange);
        oldExchange.getIn().setHeader("details", details);
        return oldExchange;
    ).completionSize(3).log("Details - $header.details")

还有

private EmployeeDetails buildDetails(EmployeeDetails details, Exchange newExchange) 
        Object newBody = newExchange.getIn().getBody();
        if (newBody instanceof Employee) 
            details.setId(((Employee) newBody).getId());
            details.setName(((Employee) newBody).getName());
            details.setJob(((Employee) newBody).getJob());
         else if (newBody instanceof EmployeeContact) 
            details.setEmail(((EmployeeContact) newBody).getEmail());
         else if (newBody instanceof EmployeeAddress) 
            details.setCity(((EmployeeAddress) newBody).getCity());
            details.setCode(((EmployeeAddress) newBody).getCode());
        
        return details;
    

那么结果将是 2 个细节对象

Details - EmployeeDetails(id=1, name=Foo, job=Engineer, email=foo@org.com, city=city1, code=1234)
Details - EmployeeDetails(id=2, name=Bar, job=AssistantEnginee, email=bar@org.com, city=city2, code=2345)

【讨论】:

它工作正常,但是 EmployeeDetails 被一一打印。但我需要 Exchange 正文中的最终输出 EmployeeDetails[] 以便我可以在下一条路线上传递它来处理最终列表并将其转换为 Avro 格式。是否需要再次聚合? ......completionSize(3).to("seda"formList") ? 这样的工作是否可行,或者是否有任何其他选项可以对所有消息进行分组。 你可以再次聚合(虽然这是一个不同的问题).setBody(header("details")).aggregate(new GroupedExchangeAggregationStrategy()).constant(true).complet‌​ionSize(2)其中 2 是 id 的数量 @Itsallas,尝试了 GroupedExchangeAggregationStrategy 并在对交换进行分组时出错,为该问题创建了一个新问题。 ***.com/questions/46870073/…

以上是关于Apache Camel:文件到 BeanIO 并基于 id 合并 beanIO 对象的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Camel Source 从 S3 到 Kafka

Apache camel - SQL到CSV

文件目录无法从 apache camel 上传 azure blob 上的文件

Apache Camel:是啥推动消息?

Apache Camel读取MongoDB Collection - 不处理任何行

Apache Camel 与 Spring Boot 集成,通过FTP定时采集处理文件