Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入

Posted

技术标签:

【中文标题】Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入【英文标题】:Kafka Connect Transformation: Extract a Long value from json field and insert as key 【发布时间】:2019-03-15 20:13:45 【问题描述】:

我有关于 JDBC 连接器发布到的主题的以下 json

"APP_SETTING_ID":9,"APP_SETTING_NAME":"my_name","SETTING_KEY":"my_setting_key"

这是我的连接器文件

name=data.app_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT APP_SETTING_ID, APP_SETTING_NAME, SETTING_KEY,FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=data.app_setting

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=APP_SETTING_ID

这确实添加了一个密钥,但它也是一个 json 格式,例如

"APP_SETTING_ID":9

虽然我只希望 9 成为键而不是地图。在数据库中,它存储为 Long 值。

【问题讨论】:

【参考方案1】:

ValueToKey 获取值中的字段列表,并返回这些字段与其值的映射。

我认为您需要进行第二次转换才能仅提取其中一个字段。

transforms=ReplaceKey,ExtractKey

# Replaces the key with fields in the value. Creates a map for all listed fields
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ReplaceKey.fields=APP_SETTING_ID

# Extracts a specfic field from the key, assuming it's a map/struct type
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractKey.field=APP_SETTING_ID

【讨论】:

如果可能的话,您能否也解释一下这里发生了什么。特别是在 ExtractField$Key 部分 美元符号只是用于访问内部类的 Java 类语法 那么你提取密钥了吗?凭直觉,它应该是值 9 不?另外,在这种情况下 key.converter 应该是什么。目前它的 org.apache.kafka.connect.json.JsonConverter 即使它现在不是 JSON。它仍然有效。只是不确定转换器在做什么 是的,你的例子是 9。如果您的数据不是 JSON,那么转换器将无法序列化。不过,纯字符串或整数仍然是有效的 JSON 最后一个问题:) - 我尝试了 org.apache.kafka.connect.converters.LongConverter 但我的 kafka 无法识别它。我正在使用 kafka 1.0 和 Confluent 3.0 是需要安装的东西还是带有不同的版本号

以上是关于Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis

需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话

Kafka Connect JDBC 接收器连接器