kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表

Posted

技术标签:

【中文标题】kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表【英文标题】:kafka jdbc sink connector throws org.apache.kafka.connect.errors.DataException (Struct schema's field name not specified properly) to insert PG table 【发布时间】:2021-11-06 09:42:39 【问题描述】:

我在我的项目中使用 kafka-jdbc-sink-connector,我需要在 kafka 主题 (kafka_subs) 中发布一些 JSON,然后使用 jdbc-sink-connector,我需要在 postgres 表中插入该记录( subs)在模式(TESTDB)下。但我得到了以下异常。

Kafka 连接器版本是, confluentinc/cp-kafka-connect:最新

我正在从 docker compose 运行 Kafka 连接器,

下面是jdbc sink连接器配置,

curl -X POST http://localhost:8082/connectors -H "Content-Type: application/json" -d '
        "name": "jdbc_sink_postgres_022",
        "config": 
                "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
                "connection.url": "jdbc:postgresql://localhost:5432/postgres",
                "connection.user": "postgres",
                "connection.password": "postgres",
                "topics": "kafka_subs",
                "auto.create": "true",
                "insert.mode":"insert",
                "table.name.format": "TESTDB.subs",
                "mode":"bulk",
                "pk.mode":"none",
                "poll.interval.ms": 60000,
                "pk.mofr":"bulk",
        "value.converter.schema.enable":"true",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter ",
            "schemas.enable":"true"
                
        '

这是我在 kafka 主题上发布的记录,使用 ./bin/kafka-console-producer.sh,


    "schema": 
        "type": "struct",
        "name": "TESTDB",
        "optional": false,
        "fields": [
                "name": "sub_id",
                "optional": false,
                "type": "string"
            , 
                "name": "sub_name",
                "optional": false,
                "type": "string"
            
        ]
    ,
    "payload": 
        "sub_id": "10000",
        "sub_name": "Sssss"
    

以下是例外,

[2021-09-09 16:18:36,705] ERROR WorkerSinkTaskid=jdbc_sink_postgres_022-0 Error converting message value in topic 'kafka_subs' partition 0 at offset 0 and timestamp 1631204315678: Struct schema's field name not specified properly (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
        at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:534)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:382)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
[2021-09-09 16:18:36,707] ERROR WorkerSinkTaskid=jdbc_sink_postgres_022-0 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

我做错了吗?或者如何在 postgres 表中插入这个简单的 json?

谢谢,

【问题讨论】:

【参考方案1】:

这个问题现在已经为我解决了,因为我将“字段”下的“名称”更改为“字段”,现在有了这个更改,它工作正常,

以前的 json 是:

“模式”: “类型”:“结构”, “名称”:“测试数据库”, “可选”:假, “字段”:[ “名称”:“sub_id”, “可选”:假, “类型”:“字符串” , “名称”:“子名称”, “可选”:假, “类型”:“字符串” ] , “有效载荷”: "sub_id": "10000", “sub_name”:“Sssss”

改变的json是:

“模式”: “类型”:“结构”, “名称”:“测试数据库”, “可选”:假, “字段”:[ “字段”:“sub_id”, “可选”:假, “类型”:“字符串” , “字段”:“子名称”, “可选”:假, “类型”:“字符串” ] , “有效载荷”: "sub_id": "10001", “sub_name”:“Ggggg”

【讨论】:

以上是关于kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表的主要内容,如果未能解决你的问题,请参考以下文章

Kafka JDBC Sink 句柄数组数据类型

提取和转换 jdbc sink 连接器的 kafka 消息特定字段

kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException

为jdbc sink连接器提取和转换kafka消息的特定字段。

如何使用 kafka 连接 JDBC sink 和 source 使用 python

如何使用 FME 处理 Kafka JDBC Sink 连接器