Apache Camel:轮询消费者
Posted
技术标签:
【中文标题】Apache Camel:轮询消费者【英文标题】:Apache Camel: Polling consumer 【发布时间】:2014-02-26 15:40:58 【问题描述】:我是 Apache Camel 的新手,我正在尝试在一个简单的项目中理解和使用轮询消费者 EIP,但我感到有点失落。 有人可以帮我解释一下,甚至是一个小的工作示例。
任何帮助将不胜感激 提前致谢
【问题讨论】:
欢迎来到 ***!你已经读过manual了吗?你有什么问题?你试过什么? 您可以查看File Component
以了解其工作原理:git-wip-us.apache.org/repos/asf?p=camel.git;a=tree;f=camel-core/…
【参考方案1】:
对于大多数用例,您可以通过在路由中的 from()
子句中定义它们来创建消费者...
from("activemq:inbox").to(new MyProcessor());
但是,您也可以编写自己的 POJO 轮询消费者逻辑,以更好地控制消费者逻辑...只需使用计时器定期启动它并调用 receive()
方法,如下所示:
from("timer://foo?period=5000").bean(MyBean, "processQueue");
public void processQueue()
while (true)
// receive the message from the queue, wait at most 3 sec
String msg = consumer.receiveBody("activemq:inbox", 3000, String.class);
if (msg == null)
// no more messages in queue
break;
// do something with body
有关详细信息,请参阅文档:http://camel.apache.org/polling-consumer
【讨论】:
非常感谢 :) 我现在明白了。问题是我对 spring 和 Camel 真的很陌生,只是找不到足够的资源。我正在尝试使用 Camel In Action 这本书,但有时我觉得它有点复杂。无论如何,谢谢你的回答,这对我真的很有帮助:)【参考方案2】:Camel 支持使用 PollingConsumer 接口从 EIP 模式实现轮询消费者,该接口可以通过 Endpoint.createPollingConsumer() 方法创建。
这也称为同步接收器,因为接收器线程阻塞,直到收到消息。我们称它为轮询消费者,因为接收者轮询消息,处理它,然后轮询另一个消息。为方便起见,消息传递 API 通常提供 receive() 方法,该方法会阻塞直到消息被传递,此外还有 receiveNoWait() 和 receive(0 ) 如果没有可用消息,则立即返回。
例如
ActiveMq 消费者
<from uri="activemq:someQueue"/>
<to uri="direct:somepath"/>
定期消费者
<from uri="timer://foo?period=5000"/>
<to uri="direct:somepath"/>
更多关于Polling Consumer的信息
【讨论】:
以上是关于Apache Camel:轮询消费者的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Camel Source 从 S3 到 Kafka
发生错误时如何暂停或减少 Camel Hazelcast SEDA 消费者的吞吐量