使用 Kafka Connect Elasticsearch 连接器的消息顺序

Posted

技术标签:

【中文标题】使用 Kafka Connect Elasticsearch 连接器的消息顺序【英文标题】:Message order with Kafka Connect Elasticsearch Connector 【发布时间】:2019-06-11 12:22:43 【问题描述】:

我们在执行使用 Kafka Connect Elasticsearch 连接器将来自 Kafka 主题的消息发送到 Elasticsearch 的顺序时遇到问题。在主题中,消息的顺序正确且偏移量正确,但如果连续快速创建两条具有相同 ID 的消息,它们会以错误的顺序间歇性地发送到 Elasticsearch。这会导致 Elasticsearch 获得来自倒数第二条消息的数据,而不是来自最后一条消息的数据。如果我们在主题中的两条消息之间添加一两秒的人为延迟,问题就会消失。

文档here 指出:

使用分区级别确保文档级别的更新顺序 Kafka offset作为文档版本,使用version_mode=external

但是,我在任何地方都找不到有关此 version_mode 设置的任何文档,以及我们是否需要将其设置在某个地方。

在来自 Kafka Connect 系统的日志文件中,我们可以看到两条消息(对于相同的 ID)以错误的顺序处理,相隔几毫秒。看起来这些是在不同的线程中处理的,这可能很重要。另请注意,该主题只有一个分区,因此所有消息都在同一个分区中。

以下是日志 sn-p,为清楚起见稍作编辑。 Kafka 主题中的消息由 Debezium 填充,我认为这与问题无关,但恰好包含时间戳值。这表明消息的​​处理顺序错误(尽管它们在由 Debezium 填充的 Kafka 主题中的顺序正确):

[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "

  "op": "u",
  "before": 
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE SECOND UPDATE >> ...
  ,
  "after": 
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER SECOND UPDATE >> ...
  ,
  "source":  ... ,
  "ts_ms": 1547716205205

" (org.apache.http.wire)

...

[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "

  "op": "u",
  "before": 
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE FIRST UPDATE >> ...
  ,
  "after": 
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER FIRST UPDATE >> ...
  ,
  "source":  ... ,
  "ts_ms": 1547716204190

" (org.apache.http.wire)

有谁知道在将消息发送到 Elasticsearch 时如何强制此连接器维护给定文档 ID 的消息顺序?

【问题讨论】:

您的主题有多少个分区?你的分区键是什么? 有一个分区,键是单个 UUID 值的 JSON 表示(由 Debezium 生成),它是 Postgres 数据库中一行的主键,例如 "id": "ac025cbe-1a37-11e9-9c89-7945a1bd7dd1" 你为连接器配置了多少tasks.max 对于 Elasticsearch 连接器和 Debezium 连接器,tasks.max1。 (Debezium 连接器从 Postgres 读取数据并将其放入 Kafka 主题;然后由 Elasticsearch 连接器发送到 Elasticsearch。) 【参考方案1】:

问题在于我们的 Elasticsearch 连接器将 key.ignore 配置设置为 true

我们在连接器的 Github 源代码中发现了这一行(DataConverter.java):

final Long version = ignoreKey ? null : record.kafkaOffset();

这意味着,使用key.ignore=true,正在生成并发送到 Elasticsearch 的索引操作实际上是“无版本的”......基本上,Elasticsearch 收到的文档的最后一组数据将替换任何以前的数据,即使它是“旧数据”。

从日志文件看,连接器似乎有几个消费者线程读取源主题,然后将转换后的消息传递给 Elasticsearch,但它们传递给 Elasticsearch 的顺序不一定与主题顺序相同。

使用key.ignore=false,现在每个 Elasticsearch 消息都包含一个等于 Kafka 记录偏移量的版本值,如果 Elasticsearch 已经接收到更高“版本”的数据,它会拒绝更新文档的索引数据。

这不是解决此问题的唯一项。我们仍然需要对来自 Kafka 主题的 Debezium 消息进行转换,以将密钥转换为 Elasticsearch 满意的纯文本格式:

"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"

【讨论】:

以上是关于使用 Kafka Connect Elasticsearch 连接器的消息顺序的主要内容,如果未能解决你的问题,请参考以下文章

无法在 kafka connect docker 映像中运行 kafka connect datagen

Kafka-connect 是不是必须使用模式注册表?

kafka connect 使用说明

使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

MySql 查询在 Kafka-connect 中失败

Kafka-connect,Bootstrap 代理断开连接