Kafka Connect - 如何发送和处理嵌套的 JSON?
Posted
技术标签:
【中文标题】Kafka Connect - 如何发送和处理嵌套的 JSON?【英文标题】:Kafka Connect - how to send and handle nested JSON? 【发布时间】:2021-04-06 02:27:31 【问题描述】:我有一条消息,它适用于我的 Redis Sink 连接器(连接器将价值赋予 Redis):
"schema":
"type": "struct",
"fields": [
"type": "int64",
"optional": false,
"field": "registertime"
,
"type": "string",
"optional": false,
"field": "userid"
,
"type": "string",
"optional": false,
"field": "regionid"
,
"type": "string",
"optional": false,
"field": "after"
],
"optional": false,
"name": "ksql.users"
,
"payload":
"registertime": 1493819497170,
"userid": "User_1",
"regionid": "Region_5",
"after": "MALE"
但我希望将字段“之后”作为嵌套对象:
"after":
"one": null,
"two": "one"
并在此基础上处理数据(即,如果“one”为空,则跳过)。
所以我有一个连接器:
"name": "connector1",
"config":
"topics": "topic1",
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "1",
"connect.redis.error.policy": "NOOP",
"connect.redis.host": "localhost",
"connect.redis.port": "6379",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.redis.kcql": "INSERT INTO prod- SELECT * FROM topic1 PK after;"
我想从消息中的嵌套 json 中提取数据。 我的命令引以为豪:
topc=topic1
message=message.json
echo "key:$(jq -rc . $message)" | $kafka_dir/bin/kafka-console-producer.sh --topic $topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
我如何发送一个嵌套的 json 对象,我如何通过 Transforms 从中提取一个字段,并且基于它的值处理与否?
【问题讨论】:
【参考方案1】:发送嵌套数据的工作方式与发送常规消息相同。
您应该能够更新消息以包含类似此架构信息的内容,用于 after 字段
"type": "struct",
"fields": ["field": "one", "optional": false, "type":"string",... ],
"optional": false,
"field": "after"
然后相应地更新有效负载。
我个人从未使用过 JSONConverter 架构/有效负载类型,因为 Avro 更适合这种情况
据我所知,Kafka Connect 不能跳过消息;它会处理所有这些。也没有用于提取深度超过 1 的任意嵌套值的内置转换,因此获取 after
结构中的字段 内 可能是一个问题。但是,您可以通过修改此特定连接器的 SELECT * FROM topic1
KCQL 语句来获得它
一般来说,如果您需要这样的逻辑,您会使用流处理器(例如 KSQL 或 Kafka Streams)在转储到数据库之前过滤/修改主题
【讨论】:
谢谢,但 AFAIK Kafka Connect 可以跳过消息 - 通过转换,例如 ExtractField($Key 或 $Value),然后是 Tombstone 或过滤器($Key 或 $Value)。我的问题是如何发送有效负载嵌套对象 - 而不是字符串(正如我试图展示的那样)。我想要获得的是不是在字符串上而是在对象“one”:“two”,“three”:“four”上使用Transforms。如何发送它,在 ExtractField$Value 之后拥有另一个对象,然后通过下一个转换(例如 Tombstone 或 Filter)处理对象,而不是字符串 对于 kafka-console-producer 来说,生成嵌套数据与单行文本没有任何不同。如果您希望嵌套 after 字段,它需要是 Struct 模式类型,而不是"type": "string"
,并且根据您的生产者控制的有效负载更新,而不是 Connect 或转换
是的,我在问如何生成这样的有效负载,因为如果我在架构中将string
更改为object
,连接器会显示错误,它不理解这种类型。所以我的问题是,这样的消息(有效负载)应该是什么样子
你尝试过这样的事情吗? "type": "struct", "fields": ["field": "one", "type":"string",... ], "field": "after"
?否则,我建议您在不需要架构和有效负载的情况下使用 Avro 而不是 JSON
哦,我有!我使用的是连接器 com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector
insted of com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector
所以这是问题的原因以上是关于Kafka Connect - 如何发送和处理嵌套的 JSON?的主要内容,如果未能解决你的问题,请参考以下文章
如何在没有 Confluent 的情况下使用 Kafka Connect 从 Kafka 向 AWS S3 发送数据?
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?
如何隐藏 Kafka connect api 未处理异常的堆栈跟踪
kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?