kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?
Posted
技术标签:
【中文标题】kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?【英文标题】:kafka-connect-elasticsearch: When using "write.method" as upsert, is it possible to use same AVRO object on kafka topic to send partial document?kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是否可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档? 【发布时间】:2020-08-16 15:07:10 【问题描述】:我正在尝试将“write.method”upsert 用于 Elasticsearch (ES) kafka 连接器。从我的 kafka 流应用程序中,我正在编写我想要 upsert 的文档,关于 ES 连接器配置为从中读取的 kafka 主题。我在这个主题上使用 avro 对象作为 kafka 值。我的文档的 AVRO 定义如下所示:
"type": "record",
"name": "Document",
"fields": [
"name": "id",
"type": ["null", "string"],
,
"name": "name",
"type": ["null", "string"]
,
"name": "address",
"type": ["null", "string"]
]
文档有时仅包含 id 和 name,有时仅包含地址。当我只发送地址时,id 和 name 会被覆盖,反之亦然。我已将 behavior.on.null.values
设置为 ignore
,希望 ES 连接器会忽略空 id 和 name 值,但这不能按预期工作。
虽然当我在我的 kafka 主题上使用两个不同的 AVRO 对象时,第一个只包含 id 和 name,另一个只包含地址,但 upsert 模式的行为符合预期。但是对于同一个kafka主题允许多个AVRO对象定义,我需要将主题的兼容模式设置为NONE,这并不理想。
解决手头问题的正确方法是什么?
【问题讨论】:
【参考方案1】:设置behavior.on.null.values = ignore
只是告诉连接器,如果它收到一条整个消息为空的消息,则忽略该消息(其他选项是失败,或者删除目标文档在 Elasticsearch 中,将消息的键与空值匹配,即墓碑消息)。
连接器不支持您描述的部分更新行为。它可以插入/更新/删除,但只能插入整个文档
如果您想要部分 upsert 行为,那么您需要自己实现这一点,无论是在自定义连接器中还是通过在您的 Kafka Streams 应用程序中存储状态,以便能够在每次出现增量时发出完整的记录。
write.method=upsert
可以进行部分更新
【讨论】:
谢谢@Robin!因此,如果我使用两个不同的 kafka 主题从 kafka 流中写入我的部分文档,并且在 ES sink 连接器配置“topic.index.map”属性中,将两个主题路由到同一个索引,这是解决这个问题的好方法吗? 我不确定它会不会。连接器从 Kafka 主题读取的点,文档(消息)需要完整。我更多地考虑的是一个流应用程序,它将获取部分文档并确定它们是否需要对现有文档进行更新。 我在本地测试了两个 kafka 主题,通过带有 write.method 'upsert' 的 ES sink 连接器写入相同的 ES 索引,并且我能够成功地将 ES 文档的一部分更新为 kafka 主题的兼容模式作为 BACKWARD .另外我想知道当 write.method 是 upsert 时,为什么 ES sink 连接器需要完整的文档? upsert 意味着您应该能够更新现有文档的一个或多个字段。 你是对的,我错了。我今天学到了一些东西:)以上是关于kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?的主要内容,如果未能解决你的问题,请参考以下文章