Kafka Connect:将消息从字节转换为 Json
Posted
技术标签:
【中文标题】Kafka Connect:将消息从字节转换为 Json【英文标题】:Kafka Connect: Convert message from bytes to Json 【发布时间】:2020-12-15 03:29:12 【问题描述】:我正在尝试使用 Google PubSub 源连接器从我的谷歌云获取数据到 kafka。我确实得到了数据,但消息以字节的形式出现。我提到了here,如上所述,我使用了 JSON 转换器来更改它。
这是我的连接器代码部分:
name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha
这就是我在我的 kafka 中得到的:
"schema":
"type":"struct",
"fields":[
"type":"bytes",
"optional":false,
"field":"message"
,
"type":"string",
"optional":false,
"field":"subFolder"
,
"type":"string",
"optional":false,
"field":"deviceId"
,
"type":"string",
"optional":false,
"field":"deviceRegistryLocation"
,
"type":"string",
"optional":false,
"field":"projectId"
,
"type":"string",
"optional":false,
"field":"deviceNumId"
,
"type":"string",
"optional":false,
"field":"deviceRegistryId"
],
"optional":false
,
"payload":
"message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=",
"subFolder":"",
"deviceId":"deviceid",
"deviceRegistryLocation":"region_value",
"projectId":"projectid",
"deviceNumId":"device_num_value",
"deviceRegistryId":"registryid"
即使在提供连接器之后,我也会以字节形式收到详细信息。我应该做些什么来将其转换为 json 格式?
【问题讨论】:
【参考方案1】:Cloud Pub/Sub Kafka 连接器不会检查或转换它接收到的消息中的数据;它只是将数据字段作为字节传递,这是 PubsubMessage 中字段的类型。目前没有办法让连接器本身读取消息的内容并将其转换为 JSON。
【讨论】:
不,连接器不以任何方式检查消息中的数据,因此不提供任何转换。它总是使用字节,无论是传递only the message 还是传递the message with additional fields。以上是关于Kafka Connect:将消息从字节转换为 Json的主要内容,如果未能解决你的问题,请参考以下文章
需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话
如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]
Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?