Apache Camel:轮询消费者

Posted

技术标签:

【中文标题】Apache Camel:轮询消费者【英文标题】:Apache Camel: Polling consumer 【发布时间】:2014-02-26 15:40:58 【问题描述】:

我是 Apache Camel 的新手,我正在尝试在一个简单的项目中理解和使用轮询消费者 EIP,但我感到有点失落。 有人可以帮我解释一下,甚至是一个小的工作示例。

任何帮助将不胜感激 提前致谢

【问题讨论】:

欢迎来到 ***!你已经读过manual了吗?你有什么问题?你试过什么? 您可以查看File Component 以了解其工作原理:git-wip-us.apache.org/repos/asf?p=camel.git;a=tree;f=camel-core/… 【参考方案1】:

对于大多数用例,您可以通过在路由中的 from() 子句中定义它们来创建消费者...

from("activemq:inbox").to(new MyProcessor());

但是,您也可以编写自己的 POJO 轮询消费者逻辑,以更好地控制消费者逻辑...只需使用计时器定期启动它并调用 receive() 方法,如下所示:

from("timer://foo?period=5000").bean(MyBean, "processQueue");

public void processQueue() 
    while (true) 
        // receive the message from the queue, wait at most 3 sec
        String msg = consumer.receiveBody("activemq:inbox", 3000, String.class);
        if (msg == null) 
            // no more messages in queue
            break;
        

        // do something with body
    

有关详细信息,请参阅文档:http://camel.apache.org/polling-consumer

【讨论】:

非常感谢 :) 我现在明白了。问题是我对 spring 和 Camel 真的很陌生,只是找不到足够的资源。我正在尝试使用 Camel In Action 这本书,但有时我觉得它有点复杂。无论如何,谢谢你的回答,这对我真的很有帮助:)【参考方案2】:

Camel 支持使用 PollingConsumer 接口从 EIP 模式实现轮询消费者,该接口可以通过 Endpoint.createPollingConsumer() 方法创建。

这也称为同步接收器,因为接收器线程阻塞,直到收到消息。我们称它为轮询消费者,因为接收者轮询消息,处理它,然后轮询另一个消息。为方便起见,消息传递 API 通常提供 receive() 方法,该方法会阻塞直到消息被传递,此外还有 receiveNoWait()receive(0 ) 如果没有可用消息,则立即返回。

例如

ActiveMq 消费者

<from uri="activemq:someQueue"/>
<to uri="direct:somepath"/>

定期消费者

<from uri="timer://foo?period=5000"/>
 <to uri="direct:somepath"/>

更多关于Polling Consumer的信息

【讨论】:

以上是关于Apache Camel:轮询消费者的主要内容,如果未能解决你的问题,请参考以下文章

Apache Camel 生产者和消费者

使用 Apache Camel Source 从 S3 到 Kafka

发生错误时如何暂停或减少 Camel Hazelcast SEDA 消费者的吞吐量

Camel 和 JMS 以正确的顺序从高级队列中消费消息

RabbitMQ - Apache Camel 读取消息如何处理失败的消息

由于消息发送到多个并发消费者,骆驼拆分和聚合失败