如何在 apache camel 中执行 gcp pubsub 消息的并行处理

Posted

技术标签:

【中文标题】如何在 apache camel 中执行 gcp pubsub 消息的并行处理【英文标题】:how to perform parallel processing of gcp pubsub messages in apache camel 【发布时间】:2020-08-13 17:15:03 【问题描述】:

我在下面有这段代码,它从 pubsub 源主题获取消息 -> 根据模板对其进行转换 -> 然后将转换后的消息发布到目标主题。

但为了提高性能,我需要并行执行此任务。即我需要轮询 500 条消息,然后将其并行转换,然后将它们发布到目标主题。

从骆驼 gcp 组件文档中,我相信 maxMessagesPerPoll 和 concurrentConsumers 参数可以完成这项工作。由于缺乏文档,我不确定它在内部是如何工作的。

我的意思是 a)如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题 b)消息的排序如何 c)我应该查看并行处理EIP 作为替代方案

等等

这个概念我不清楚

// my route
private void addRouteToContext(final PubSub pubSub) throws Exception 

    this.camelContext.addRoutes(new RouteBuilder() 
        @Override
        public void configure() throws Exception 

            errorHandler(deadLetterChannel("google-pubsub:gcp_project_id:pubsub.dead.letter.topic")
                    .useOriginalMessage().onPrepareFailure(new FailureProcessor()));




            /*
             * from topic
             */
            from("google-pubsub:gcp_project_id:" + pubSub.getFromSubscription() + "?"
                    + "maxMessagesPerPoll=consumer.maxMessagesPerPoll&"
                    + "concurrentConsumers=consumer.concurrentConsumers").
            /*
             * transform using the velocity
             */
            to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
            /*
             * attach header to the transform message
             */
            setHeader("Header ", simple("$date:now:yyyyMMdd")).routeId(pubSub.getRouteId()).
            /*
             * log the transformed event
             */
            log("$body").
            /*
             * publish the transformed event to the target topic
             */
            to("google-pubsub:gcp_project_id:" + pubSub.getToTopic());
        
    );

【问题讨论】:

【参考方案1】:

当您提到concurrentConsumers 选项(比如说concurrentConsumers=10)时,您是在要求 Camel 创建一个由 10 个线程组成的线程池,这 10 个线程中的每一个都会从 pub-sub 队列中获取不同的消息并处理它们。

这里要注意的是,当您指定 concurrentConsumers 选项时,线程池使用固定大小,这意味着始终有固定数量的活动线程在等待处理传入的消息。所以 10 个线程(因为我指定 concurrentConsumers=10)将等待处理我的消息,即使没有 10 条消息同时进入。

显然,这并不能保证传入消息将以相同的顺序进行处理。如果您希望消息以相同的顺序排列,您可以查看Resequencer EIP 来为您的消息排序。

至于您的第三个问题,我认为 google-pubsub 组件不允许并行处理选项。您可以使用Threads EIP 自己制作。这肯定会更好地控制您的并发性。

使用线程,您的代码将如下所示:

from("google-pubsub:project-id:destinationName?maxMessagesPerPoll=20")
// the 2 parameters are 'pool size' and 'max pool size'
.threads(5, 20)
.to("direct:out");

【讨论】:

谢谢你 Sneharghya,你已经非常清楚地解释了我正在寻找的概念。非常感谢【参考方案2】:

a) 如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题

不,Camel 在这种情况下不会创建 500 个并行线程。正如您所怀疑的,并发消费者线程的数量是用concurrentConsumers 设置的。因此,如果您定义 5 个concurrentConsumersmaxMessagesPerPoll 为 500,则每个消费者将获取多达 500 条消息并在单个线程中一个接一个地处理它们。也就是说,您有 5 条消息并行处理。

消息的顺序如何

一旦您并行处理消息,消息的顺序就会混乱。但是,当您遇到处理错误时,1 个消费者已经发生了这种情况,它们被绕道到您的 deadLetterChannel 并在稍后重新处理。

我是否应该考虑将并行处理 EIP 作为替代方案

仅当concurrentConsumers 选项不足时。

【讨论】:

谢谢 burki,你已经清楚地解释了我难以理解的问题。我现在很好。非常感谢 -

以上是关于如何在 apache camel 中执行 gcp pubsub 消息的并行处理的主要内容,如果未能解决你的问题,请参考以下文章

如何将属性与 Spring XML 中 Apache Camel 路由中的(布尔)文字进行比较?

如何在apache camel中附加速度文件内容

如何在Apache Camel Aggregator关联中加入多个标头

如何在apache camel DSL或camel Processor内部设置其他身份验证属性?

如何在 Apache Camel 中定义要通过 ref 抛出的异常

如何在 Apache Camels RouteBuilder.restConfiguration() 中添加 OpenApi/Swagger securitySchemes?