在 Confluent Elasticsearch 连接器中为空值时避免覆盖字段
Posted
技术标签:
【中文标题】在 Confluent Elasticsearch 连接器中为空值时避免覆盖字段【英文标题】:Avoid overwriting fields when for null values in Confluent Elasticsearch Connector 【发布时间】:2021-11-26 16:56:57 【问题描述】:我有一个扩充管道,可以更新动态数量的字段,写入 Kafka,然后发送到 Elasticsearch。我们正在使用Confluent Elasticsearch Connector。
例如,如果发送到 ES 连接器的第一条记录是这样的:
id: 1, name: "Bob", age: null
丰富的记录是这样的:
id: 1, name: null, age: 34
我希望 Elasticsearch 中的结果记录为:
id: 1, name: "Bob", age: 34
丰富的记录必须有一个空值(即在我们上面的示例中name: null
)而不是根本不设置键的原因是它来自 Avro 数据,我们的架构列出了几个字段作为可选.由于扩充管道正在更新动态数量的字段,这似乎是最直接的解决方案(即,可能会更新一条记录中的 name
字段,但另一条记录中的 age
字段)。由于可选的 Avro 字段默认为 null
,这就是我们的空值的来源。
我尝试了write.method=upsert
设置as shown in this post,但这似乎仍会覆盖所有以null
作为丰富记录值的字段。 IE。根据上面的示例,ES 中的结果记录看起来像 id: 1, name: null, age: 34
。上面链接的帖子似乎通过为单个记录类型设置多个 Avro 模式解决了这个问题,这对我们不起作用,因为它增加了太多的复杂性。
我注意到 ES 连接器也有 behavior.on.null.values
的设置,但我的理解是,这是针对整个记录为 null
而不是单个字段的情况。
Confluent ES Sink 连接器中是否有类似nullToUnset
in the Datastax C* Connector 的设置?
如果没有,有没有好的方法来实现这个?
【问题讨论】:
既然为null,那么如果从记录中排除name字段会怎样呢? Avro 不关心该字段是否真的存在,但连接器呢? @OneCricketeer 我可能对 Avro 的工作原理感到困惑,但我的理解是,如果我排除 name 字段,它将作为默认值发送,即null
。因此,我不知道是否可以在不发送 null 的情况下实际排除 name 字段。你知道解决这个问题的方法吗?
连接转换器应该只将作为记录一部分的可用字段转发到接收器。我还没有调试足够多的 Avro 转换器来知道它是否首先完全反序列化记录以应用任何默认值
@OneCricketeer 我尝试过的一切似乎都表明它正在应用默认值。如果有办法解决这个问题那就太好了
【参考方案1】:
相关的代码行在这里: https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java#L170
这基本上意味着源文档按原样发送到索引 - 没有修改。
您最好的选择可能是添加一个读取源文档并删除任何具有空值的字段的 SMT。
【讨论】:
我在考虑这个问题 - 我很难找到实际删除字段的 SMT。例如,Mask Fields 似乎只删除了值,这似乎会让我和以前一样。有没有真正做到这一点的 SMT? ReplaceField 应该做你需要的,使用“黑名单”配置:docs.confluent.io/platform/current/connect/transforms/… 在这种情况下,您真正想要的是基于字段值的条件转换(如果 value == null 然后删除字段)。不过,我认为没有用于字段匹配的内置转换谓词。改用自定义 SMT 可能更有意义。 尝试从这里开始你的代码:github.com/apache/kafka/blob/trunk/connect/transforms/src/main/…。处理修改后的架构可能会很棘手,因此请仔细遵循代码。 这是最终解决方案,以防万一有人感兴趣:github.com/Anant/kafka-connect-SMT-remove-null-fields以上是关于在 Confluent Elasticsearch 连接器中为空值时避免覆盖字段的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 如何使用 SSL 连接 Elasticsearch?
如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?
在没有安装 Confluent 平台的情况下使用 Confluent Hub