Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入
Posted
技术标签:
【中文标题】Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入【英文标题】:Kafka Connect Transformation: Extract a Long value from json field and insert as key 【发布时间】:2019-03-15 20:13:45 【问题描述】:我有关于 JDBC 连接器发布到的主题的以下 json
"APP_SETTING_ID":9,"APP_SETTING_NAME":"my_name","SETTING_KEY":"my_setting_key"
这是我的连接器文件
name=data.app_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT APP_SETTING_ID, APP_SETTING_NAME, SETTING_KEY,FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=data.app_setting
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=APP_SETTING_ID
这确实添加了一个密钥,但它也是一个 json 格式,例如
"APP_SETTING_ID":9
虽然我只希望 9 成为键而不是地图。在数据库中,它存储为 Long 值。
【问题讨论】:
【参考方案1】:ValueToKey
获取值中的字段列表,并返回这些字段与其值的映射。
我认为您需要进行第二次转换才能仅提取其中一个字段。
transforms=ReplaceKey,ExtractKey
# Replaces the key with fields in the value. Creates a map for all listed fields
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ReplaceKey.fields=APP_SETTING_ID
# Extracts a specfic field from the key, assuming it's a map/struct type
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractKey.field=APP_SETTING_ID
【讨论】:
如果可能的话,您能否也解释一下这里发生了什么。特别是在 ExtractField$Key 部分 美元符号只是用于访问内部类的 Java 类语法 那么你提取密钥了吗?凭直觉,它应该是值 9 不?另外,在这种情况下 key.converter 应该是什么。目前它的 org.apache.kafka.connect.json.JsonConverter 即使它现在不是 JSON。它仍然有效。只是不确定转换器在做什么 是的,你的例子是 9。如果您的数据不是 JSON,那么转换器将无法序列化。不过,纯字符串或整数仍然是有效的 JSON 最后一个问题:) - 我尝试了 org.apache.kafka.connect.converters.LongConverter 但我的 kafka 无法识别它。我正在使用 kafka 1.0 和 Confluent 3.0 是需要安装的东西还是带有不同的版本号以上是关于Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中
Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON
如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]
Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis
需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话