如何调查 Apache Camel Route 上的数据?

Posted

技术标签:

【中文标题】如何调查 Apache Camel Route 上的数据?【英文标题】:How to investigate data on Apache Camel Route? 【发布时间】:2021-11-27 20:36:04 【问题描述】:

我的项目正在将数据从一个系统传输到另一个系统。我们使用 Apache Camel Routes 在 JBoss EAP v7 服务器之间发送数据。我的问题是,有没有办法调查包裹在不同路线时的内容?

我们已尝试增加日志记录,但我们的文件/控制台刚刚被淹没。我们还尝试在服务器上使用 Hawtio 来查看经过路由的消息,但未能成功识别我们的消息“卡住”的位置。

感谢任何帮助!

【问题讨论】:

您的路由使用什么消费者端点,它接收什么类型的消息? 我们的端点是一个 RHEL 7 服务器,然后从这里进一步处理。它接收的消息基本上只是一个字符串消息 消费者端点我的意思是你的骆驼路线的from(endpointURI),它是交换的入口点。 【参考方案1】:

您可以使用单元测试在本地测试您的路由,然后使用adviceWith 和weave 方法在特定点记录交换的内容。

借助单元测试,即使您在 Karaf 或 Red Hat fuse 之类的东西中运行骆驼,您也可以轻松地在您最喜欢的 IDE 中调试您的路线。

package com.example;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class ExampleRouteTests extends CamelTestSupport 
    
    @Test
    public void exampleTest() throws Exception 
    
        ContractDetails testDetails = new ContractDetails(1512, 1215);
        mockJDBCEndpoints();

        context.getRouteDefinition("exampleRoute")
            .adviceWith(context, new AdviceWithRouteBuilder()

            @Override
            public void configure() throws Exception 
                
                replaceFromWith("direct:start");

                weaveByToUri("direct:getDetailsFromAPI")
                    .replace()
                    .to("log:testLogger?showAll=true")
                    .to("mock:api")
                    .setBody(constant(testDetails));

                weaveByToUri("direct:saveToDatabase")
                    .replace()
                    .to("log:testLogger?showAll=true")
                    .to("mock:db");
            
        );

        MockEndpoint apiMockEndpoint = getMockEndpoint("mock:api");
        apiMockEndpoint.expectedMessageCount(1);

        MockEndpoint dbMockEndpoint = getMockEndpoint("mock:db");
        dbMockEndpoint.expectedMessageCount(1);
        
        context.start();

        String body = "\"name\":\"Bob\",\"age\":10";
        template.sendBody("direct:start", body);

        apiMockEndpoint.assertIsSatisfied();
        dbMockEndpoint.assertIsSatisfied();
      

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception 
        return new RouteBuilder()

            @Override
            public void configure() throws Exception 
                
                from("amqp:queue:example")
                    .routeId("exampleRoute")
                    .unmarshal().json(JsonLibrary.Jackson, 
                        Person.class)
                    .to("direct:getDetailsFromAPI")
                    .process(new SomeProcessor())
                    .to("direct:saveToDatabase");
                    
                from("direct:saveToDatabase")
                    .routeId("saveToDatabaseRoute")
                    .to("velocity:sql/insertQueryTemplate.vt")
                    .to("jdbc:exampleDatabase");

                from("direct:getDetailsFromAPI")
                    .removeHeaders("*")
                    .toD("http4:someAPI?name=$body.getName()")
                    .unmarshal().json(JsonLibrary.Jackson, 
                        ContractDetails.class);
            
        ;
    

    void mockJDBCEndpoints() throws Exception 

        context.getRouteDefinition("saveToDatabaseRoute")
            .adviceWith(context, new AdviceWithRouteBuilder()

                @Override
                public void configure() throws Exception 
                    weaveByToUri("jdbc:*")
                        .replace()
                        .to("mock:db");
                
        );
    

    @Override
    public boolean isUseAdviceWith() 
        return true;
    

现在,对于单元测试未发生的问题进行故障排除,您可以使用onException 配置通用或路由特定异常处理,并使用Dead letter channel 来处理和存储有关失败交换的信息。或者,您可以只使用stream 或文件组件将有关异常和交换失败的信息保存到单独的文件中,以避免日志泛滥。

【讨论】:

以上是关于如何调查 Apache Camel Route 上的数据?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Camel作为HTTP代理:如何使用参数路由到URL

请让我知道Apache Camel中的民意调查工作机制(CRON JOB)

Mark:Spring MVC实现对Camel Route的管理

Mark:Camel SQL Route

如何在 Apache Camel 中结合 Redelivery policy 和 Hystrix 断路器?

如何在apache camel 2.23.1版本的处理器交换对象中获取RouteId?