Storm 中的延迟队列实现——Kafka、Cassandra、Redis 还是 Beanstalk?
Posted
技术标签:
【中文标题】Storm 中的延迟队列实现——Kafka、Cassandra、Redis 还是 Beanstalk?【英文标题】:Delayed Queue implementation in Storm – Kafka, Cassandra, Redis or Beanstalk? 【发布时间】:2016-05-24 22:25:34 【问题描述】:我有一个风暴拓扑来处理来自 Kafka 的消息,并根据手头的任务在 Cassandra 中进行 HTTP 调用/保存。我一收到消息就处理它们。由于响应来自外部源(例如 HTTP),很少有消息没有被完全处理。我想为重试实现指数退避机制,以防 HTTP 服务器不响应/返回错误消息以在一段时间后重试。我可以想出一些可以用来实现它们的想法。如果有任何其他我可以使用的容错解决方案,我想知道它们中的哪一个将是一个更好的解决方案。由于这用于实现指数退避,因此每条消息都会有不同的延迟时间。
在 Kafka 中发送另一个主题,稍后使用。 我的首选解决方案。我知道我们可以使用 Kafka 偏移量,以便在后期使用消息。我怎么找不到文档/示例代码来做同样的事情。如果有人能帮我解决这个问题,那将非常有帮助。 编写消息 Cassandra / Redis 并编写调度程序以获取未处理且准备好使用的消息并将其发送到 Kafka,以便我的风暴拓扑可以使用它。 (其他遗留项目中的现有解决方案(非 Storm)) 延迟发送到 Beanstalk(其他遗留项目中的现有解决方案(非风暴)。我想避免使用此解决方案,仅在我没有选择的情况下使用它)。虽然这几乎是我想做的。如Kafka - Delayed Queue implementation using high level consumer 中所述,我无法找到实施 delayProcessingUntil 的文档
我过去曾使用 Beanstalk 从 Data-store 完成预定工作,但我更喜欢使用 Kafka。
【问题讨论】:
【参考方案1】:Kafka spout 内置了指数退避消息重试。您可以通过 spout 配置来配置初始延迟、延迟乘数和最大延迟。如果bolt有错误,可以调用collector.fail(input)。之后,您只需将其留给 spout 进行重试。
https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
【讨论】:
抱歉回复晚了。正如建议的那样,我通过 spout 配置配置了初始延迟、延迟乘数和最大延迟,并调用了 collector.fail(input)。我的设置是 Spout -> Bolt 1 -> Bolt 2-> Bolt 3。如果在任何一个 Bolt 中执行失败,我想重新启动该过程。当从 Bolt 1 调用错误 collector.fail(input) 并且 Back-off 工作完美时,建议的解决方案工作正常。但是,当我从 Bolt 2 或 Bolt 3 调用 collector.fail(input) 时,重试不起作用。此外,如果我不从 Bolt 1 调用 collector.ack(input),它会在一段时间后重试。 我正在使用以下代码将数据从一个粗体发送到另一个粗体。collector.emit(streamId,new Values(data));
declarer.declareStream(streamId,new Fields(field_name));
如果你能在这里帮助我,这将非常有帮助@abhishek-agarwal
@AnkitGupta - 您必须将发送的元组从一个螺栓固定到另一个螺栓。将 Bolt 中的代码更改为 collector.emit(streamId, 我认为您的用例描述了对数据库而不是队列的需求。您希望将记录暂时存储到它们的时间,然后将它们删除,这样它们就不会出现在以后的搜索中。正如您的分析所示,在队列中尝试这样做充其量是尴尬的。
我建议您在 Cassandra 中创建另一个列族来保存这些延迟的请求。您将存储请求本身以及重试时间。您是否还希望有一系列失败的 HTTP 尝试和相关数据取决于您。随着延迟的请求最终得到满足,您将从 CF 中删除相应的行。延迟请求的搜索也很简单。
当然,任何数据库,甚至是本地驱动器或 HDFS 中的文件也可以工作。
【讨论】:
【参考方案3】:您可能对 Kafka Retry 项目 https://github.com/IBM/kafka-retry 感兴趣。它使用单个重试主题提供延迟重试队列。
【讨论】:
以上是关于Storm 中的延迟队列实现——Kafka、Cassandra、Redis 还是 Beanstalk?的主要内容,如果未能解决你的问题,请参考以下文章