重启时 Kafka 和 Flink 重复消息

Posted

技术标签:

【中文标题】重启时 Kafka 和 Flink 重复消息【英文标题】:Kafka & Flink duplicate messages on restart 【发布时间】:2017-01-20 10:08:34 【问题描述】:

首先,这与Kafka consuming the latest message again when I rerun the Flink consumer 非常相似,但又不一样。该问题的答案似乎无法解决我的问题。如果我错过了该答案中的某些内容,请改写答案,因为我显然错过了某些内容。

但问题完全相同——Flink(kafka 连接器)重新运行它在关闭之前看到的最后 3-9 条消息。

我的版本

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的代码

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner 
  def main(args: Array[String]): Unit = 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  

我的 SBT 依赖项

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的过程

(3 个终端)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统中没有错误时,我希望能够打开和关闭 flink,而无需重新处理在先前运行中成功完成流的消息。

我的修复尝试

我已将调用添加到setStateBackend,认为可能是默认内存后端没有正确记住。这似乎没有帮助。

我已经删除了对enableCheckpointing 的调用,希望在 Flink 和 Zookeeper 中可能有一个单独的机制来跟踪状态。这似乎没有帮助。

我使用了不同的接收器,RollingFileSink,print();希望这个错误可能在kafka中。这似乎没有帮助。

我已经回滚到 flink(和所有连接器)v1.1.0 和 v1.1.1,希望这个 bug 可能在最新版本中。这似乎没有帮助。

我已将zookeeper.connect 配置添加到属性对象中,希望关于它仅在 0.8 中有用的评论是错误的。这似乎没有帮助。

我已将检查点模式明确设置为 EXACTLY_ONCE(好主意 drfloob)。这似乎没有帮助。

我的请求

救命!

【问题讨论】:

只是为了好玩,尝试明确设置 EXACTLY_ONCE。 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 我遇到了完全相同的问题,在启动 flink 流作业后再次消费相同的事件。也许保存检查点时偏移量没有正确增加? 显式检查点模式不起作用。我已经更新了帖子。不过是个好主意。 【参考方案1】:

(我已经在 J​​IRA 中发布了相同的回复,只是在这里交叉发布了相同的回复)

根据您的描述,我假设您是手动关闭作业,然后重新提交,对吗?

除非您使用保存点 (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则 Flink 不会在手动作业重新启动时保留完全一次。 一次性保证是指作业失败,然后自动从以前的检查点恢复(启用检查点时,就像您对 env.enableCheckpointing(500) 所做的那样)

实际发生的情况是,当您手动重新提交作业时,Kafka 使用者只是从 ZK / Kafka 中提交的现有偏移量开始读取。这些偏移量在您第一次执行作业时已提交给 ZK / Kafka。然而,它们并没有用于 Flink 的完全一次性语义; Flink 为此使用内部检查点的 Kafka 偏移量。 Kafka 消费者将这些偏移量提交给 ZK,只是为了向外部世界(wrt Flink)公开工作消耗进度的度量。

【讨论】:

嗨,戈登,如果我更新/更改工作并重新提交会怎样?为什么不能使用 ZK 的正确偏移量?通过将偏移量增加一来修复似乎很容易,还是我错过了一点?我不想在我的下游应用程序中使用命令行或添加额外的重复数据删除。还是我使用 flink 错了? 就像我说的,ZK 中的偏移量只被提交回来以暴露进度。 Flink 通过其内部的 checkpointing 机制实现了完全一次;状态后端是保存正确偏移快照的地方。提交给 ZK 的“外部”偏移量不一定是正确的偏移量,因为它们并不真正配合 Flink 的检查点。 如果要更新/更改作业,通常这是流作业的托管停机时间,Flink 为此提供了保存点。重新提交作业时,您可以提供以前的保存点作为起点。 Flink 也正在进行支持在单个命令中“保存并重新提交”作业的工作。 还有一点值得指出,以免我们在这里混淆:“Exactly-once”是指所有记录在 Flink 的用户/窗口状态中只计算一次,而不是只处理记录一次。如果发生故障,并且某个作业需要重放一些数据,Flink 的检查点确保即使是操作员的内部状态也被重放,因此它们似乎没有包含对要重放数据的任何更改。重要的是要知道 Flink 的exactly-once 是重放数据的组合,同时将内部状态回滚 我同意文档可以提供更多关于此的信息。让我们为此开张票;)【参考方案2】:

更新 2:我修复了偏移处理的错误,它被合并到当前的 MASTER 中。

更新:没问题,在取消作业之前使用手动保存点(感谢 Gordon)

我检查了日志,这似乎是偏移处理中的一个错误。我在https://issues.apache.org/jira/browse/FLINK-4618 下提交了一份报告。 当我收到反馈时,我会更新这个答案。

【讨论】:

以上是关于重启时 Kafka 和 Flink 重复消息的主要内容,如果未能解决你的问题,请参考以下文章

如何保证消息不被重复消费啊(如何保证消息消费时的幂等性)?

[转载]kafka入门笔记

Kafka消息保证不丢失和重复消费问题

kafka重复消费的原因

Kafka重复消费和丢失数据研究

Kafka二十三Kafka优化之防止消息丢失和重复消费