Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

Posted

技术标签:

【中文标题】Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON【英文标题】:Kafka Connect - JSON Converter - JDBC Sink Connector - Column Type JSON 【发布时间】:2019-06-15 14:35:56 【问题描述】:

用例是将整个消息(即 JSON)和键存储为表中的记录,该表具有两列“id”和“data”。

数据库为 Postgres,支持 JSON 列类型。

根据本文,JSONConverter 中支持的类型是字符串、int64 等 https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter

是否可以将数据字段类型为 JSON,然后可以将其存储在 Postgres DB 中,列类型为 JSON。

schema = `
"type":"struct",
"fields":[
    "type":"string", "optional": false, "field":"id",
    "type":"string", "optional": false, "field":"data"
]`

样本数据有效载荷是

"payload":  "id": 10000, "data": "hello":"world" 

Above 将数据存储为文本,并期望列在 Postgres 中为文本类型。 如果 Postgres 上的列是 JSON 类型,那么 JDBC Sink 连接器将抛出错误。

在 Postgres 上使用 JSON 类型将有助于在 JSON 字段等上创建索引。是否可以适当地使用 JSONConverter 和 JDBC Sink Converter 来存储列类型为 JSON 的记录。

【问题讨论】:

JDBC Sink Connector 抛出什么错误?你能添加一些日志吗? 【参考方案1】:

使用value.converter.schema.enable=true,并像这样发送 JSON 数据(将架构作为每条消息的一部分,并使用实际消息数据更新payload 部分),它应该与 JDBC Sink 一起使用。


    "schema": 
        "type": "struct",
        "fields": [
            "type": "int32",
            "optional": false,
            "field": "id"
        , 
            "type": "struct",
            "name": "data",
            "optional": false,
            "fields": [
               "type": "string",
               "name": "hello",
               "optional":false
            ]
        ],
        "optional": false,
        "name": "foobar"
    ,
    "payload": 
        "id": 10000,
        "data": "hello":"world"
    

或者您可以考虑将您的客户端转换为使用 Avro,并为自己节省一些网络带宽。

【讨论】:

感谢您的来信。抱歉,如果我解释得不好,我的意思是,这里的数据值不是字符串 'bar' 它本身是一个 JSON 对象,例如 "payload": "id": 10000, "data": “你好”:“世界” 我认为这没有问题。 data 将只有一个 struct 架构。 如果我有一个 pg 模式名 TESTDB 和表名作为测试,那么模式下的类型和名称是什么。 “字段”我相信列名对吗? @SuvenduGhosh 我并没有真正使用 JDBC 连接器,所以我建议创建一个新帖子【参考方案2】:

JDBC Sink Connector 不支持 PostgreSQL json、jsonb 类型。它支持原始类型的数量,日期时间。

在以下页面,您可以找到将架构类型映射到数据库类型 (PostgreSQL) https://docs.confluent.io/5.1.0/connect/kafka-connect-jdbc/sink-connector/index.html

虽然,JDBC Source 连接器在某些部分支持 json、jsonb 类型 - 这种类型的列不会映射到 STRUCT,但会映射到 STRING 类型。

【讨论】:

以上是关于Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

Kafka Connect 转换:从 json 字段中提取 Long 值并作为键插入

Kafka Connect JDBC 接收器连接器

使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

Kafka Connect:如何将String解析为Map