如何在 apache camel 中执行 gcp pubsub 消息的并行处理
Posted
技术标签:
【中文标题】如何在 apache camel 中执行 gcp pubsub 消息的并行处理【英文标题】:how to perform parallel processing of gcp pubsub messages in apache camel 【发布时间】:2020-08-13 17:15:03 【问题描述】:我在下面有这段代码,它从 pubsub 源主题获取消息 -> 根据模板对其进行转换 -> 然后将转换后的消息发布到目标主题。
但为了提高性能,我需要并行执行此任务。即我需要轮询 500 条消息,然后将其并行转换,然后将它们发布到目标主题。
从骆驼 gcp 组件文档中,我相信 maxMessagesPerPoll 和 concurrentConsumers 参数可以完成这项工作。由于缺乏文档,我不确定它在内部是如何工作的。
我的意思是 a)如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题 b)消息的排序如何 c)我应该查看并行处理EIP 作为替代方案
等等
这个概念我不清楚
去
// my route
private void addRouteToContext(final PubSub pubSub) throws Exception
this.camelContext.addRoutes(new RouteBuilder()
@Override
public void configure() throws Exception
errorHandler(deadLetterChannel("google-pubsub:gcp_project_id:pubsub.dead.letter.topic")
.useOriginalMessage().onPrepareFailure(new FailureProcessor()));
/*
* from topic
*/
from("google-pubsub:gcp_project_id:" + pubSub.getFromSubscription() + "?"
+ "maxMessagesPerPoll=consumer.maxMessagesPerPoll&"
+ "concurrentConsumers=consumer.concurrentConsumers").
/*
* transform using the velocity
*/
to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
/*
* attach header to the transform message
*/
setHeader("Header ", simple("$date:now:yyyyMMdd")).routeId(pubSub.getRouteId()).
/*
* log the transformed event
*/
log("$body").
/*
* publish the transformed event to the target topic
*/
to("google-pubsub:gcp_project_id:" + pubSub.getToTopic());
);
【问题讨论】:
【参考方案1】:当您提到concurrentConsumers
选项(比如说concurrentConsumers=10
)时,您是在要求 Camel 创建一个由 10 个线程组成的线程池,这 10 个线程中的每一个都会从 pub-sub 队列中获取不同的消息并处理它们。
这里要注意的是,当您指定 concurrentConsumers 选项时,线程池使用固定大小,这意味着始终有固定数量的活动线程在等待处理传入的消息。所以 10 个线程(因为我指定 concurrentConsumers=10)将等待处理我的消息,即使没有 10 条消息同时进入。
显然,这并不能保证传入消息将以相同的顺序进行处理。如果您希望消息以相同的顺序排列,您可以查看Resequencer EIP 来为您的消息排序。
至于您的第三个问题,我认为 google-pubsub 组件不允许并行处理选项。您可以使用Threads EIP 自己制作。这肯定会更好地控制您的并发性。
使用线程,您的代码将如下所示:
from("google-pubsub:project-id:destinationName?maxMessagesPerPoll=20")
// the 2 parameters are 'pool size' and 'max pool size'
.threads(5, 20)
.to("direct:out");
【讨论】:
谢谢你 Sneharghya,你已经非常清楚地解释了我正在寻找的概念。非常感谢【参考方案2】:a) 如果我轮询说 500 条消息,那么它会创建 500 条并行路由来处理消息并将其发布到目标主题
不,Camel 在这种情况下不会创建 500 个并行线程。正如您所怀疑的,并发消费者线程的数量是用concurrentConsumers
设置的。因此,如果您定义 5 个concurrentConsumers
,maxMessagesPerPoll
为 500,则每个消费者将获取多达 500 条消息并在单个线程中一个接一个地处理它们。也就是说,您有 5 条消息并行处理。
消息的顺序如何
一旦您并行处理消息,消息的顺序就会混乱。但是,当您遇到处理错误时,1 个消费者已经发生了这种情况,它们被绕道到您的 deadLetterChannel
并在稍后重新处理。
我是否应该考虑将并行处理 EIP 作为替代方案
仅当concurrentConsumers
选项不足时。
【讨论】:
谢谢 burki,你已经清楚地解释了我难以理解的问题。我现在很好。非常感谢 -以上是关于如何在 apache camel 中执行 gcp pubsub 消息的并行处理的主要内容,如果未能解决你的问题,请参考以下文章
如何将属性与 Spring XML 中 Apache Camel 路由中的(布尔)文字进行比较?
如何在Apache Camel Aggregator关联中加入多个标头
如何在apache camel DSL或camel Processor内部设置其他身份验证属性?
如何在 Apache Camel 中定义要通过 ref 抛出的异常
如何在 Apache Camels RouteBuilder.restConfiguration() 中添加 OpenApi/Swagger securitySchemes?