风暴批处理后向kafka提交偏移量

Posted

技术标签:

【中文标题】风暴批处理后向kafka提交偏移量【英文标题】:Submitting offsets to kafka after storm batch 【发布时间】:2017-03-17 05:50:26 【问题描述】:

当批处理螺栓完成处理批处理时,仅提交每个分区的最高偏移量的正确方法是什么?我主要担心的是机器在处理批处理时死机,因为整个 shebang 将要运行在 AWS 现场实例中。

我是 Storm 开发的新手,我似乎找不到 IMO 的答案是非常直接地使用 kafka 和storm。

场景:

基于Guaranteeing Message Processing guide,假设我有一个("word",count) 元组的蒸汽(kafka 主题),处理 X 元组的批处理螺栓,进行一些聚合并创建 CSV 文件,将文件上传到 hdfs/db 和确认。

在非 strom“天真”实现中,我会读取 X msgs(或读取 Y 秒),聚合,写入 hdfs,一旦上传完成,将每个分区的最新(最高)偏移量提交给 kafka。如果机器或进程在 db 提交之前死亡 - 下一次迭代将从上一个位置开始。

在storm中,我可以创建批处理bolt,它将锚定所有批处理元组并立即确认它们,但是我找不到将每个分区的最高偏移量提交给kafka的方法,因为spout不知道批处理,所以一旦批处理螺栓确认元组,每个喷口实例都会一个接一个地确认他的元组,所以我可以按照我的看法:

    在 spout 的每个 ack 上提交 acked 消息的偏移量。这将导致许多提交(每批可能是几 K 的元组),可能是乱序的,如果在提交偏移量时 spout 工作失败,我最终将部分替换一些事件。 与 1 相同。但我可以在提交的最高偏移量中添加一些本地偏移量管理(修复无序偏移量提交)并提交每隔几秒看到的 highets 偏移量(减少大量提交)但我仍然可以结束如果 spout 死了,则增加部分提交的偏移量 将偏移提交逻辑移动到螺栓 - 我可以将每条消息的分区和偏移添加到发送到批处理螺栓的数据中,并将每个分区的最高处理偏移作为批处理的一部分提交(发送到“偏移提交者”在批次结束时使用螺栓)。这将解决偏移跟踪、多次提交和局部重播问题,但这会为螺栓添加特定于 kafka 的逻辑,从而将螺栓代码与 kafka 相结合,一般来说,在我看来,这似乎是在重新发明***。 在***改造方面走得更远,在 ZK 中手动管理最高处理的 patition-offset 组合,并在我初始化 spout 时读取此值。

【问题讨论】:

你找到解决方案了吗? 【参考方案1】:

你的问题有很多,所以不确定这是否完全解决了这个问题,但如果你担心发送到 kafka 的确认数量(例如,在每条消息之后),你应该能够为消耗,例如 1000 可以减少很多。

【讨论】:

以上是关于风暴批处理后向kafka提交偏移量的主要内容,如果未能解决你的问题,请参考以下文章

Kafka手动提交偏移量的作用到底是什么???

使用 commitAsync 提交偏移量时出现 Kafka 异常

Kafka consumerGroup 丢失了所有分区中提交的偏移量信息,并从头开始消费偏移量

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

Kafka消费者手动提交消息偏移

SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)