kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表
Posted
技术标签:
【中文标题】kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表【英文标题】:kafka jdbc sink connector throws org.apache.kafka.connect.errors.DataException (Struct schema's field name not specified properly) to insert PG table 【发布时间】:2021-11-06 09:42:39 【问题描述】:我在我的项目中使用 kafka-jdbc-sink-connector,我需要在 kafka 主题 (kafka_subs) 中发布一些 JSON,然后使用 jdbc-sink-connector,我需要在 postgres 表中插入该记录( subs)在模式(TESTDB)下。但我得到了以下异常。
Kafka 连接器版本是, confluentinc/cp-kafka-connect:最新
我正在从 docker compose 运行 Kafka 连接器,
下面是jdbc sink连接器配置,
curl -X POST http://localhost:8082/connectors -H "Content-Type: application/json" -d '
"name": "jdbc_sink_postgres_022",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"topics": "kafka_subs",
"auto.create": "true",
"insert.mode":"insert",
"table.name.format": "TESTDB.subs",
"mode":"bulk",
"pk.mode":"none",
"poll.interval.ms": 60000,
"pk.mofr":"bulk",
"value.converter.schema.enable":"true",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter ",
"schemas.enable":"true"
'
这是我在 kafka 主题上发布的记录,使用 ./bin/kafka-console-producer.sh,
"schema":
"type": "struct",
"name": "TESTDB",
"optional": false,
"fields": [
"name": "sub_id",
"optional": false,
"type": "string"
,
"name": "sub_name",
"optional": false,
"type": "string"
]
,
"payload":
"sub_id": "10000",
"sub_name": "Sssss"
以下是例外,
[2021-09-09 16:18:36,705] ERROR WorkerSinkTaskid=jdbc_sink_postgres_022-0 Error converting message value in topic 'kafka_subs' partition 0 at offset 0 and timestamp 1631204315678: Struct schema's field name not specified properly (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:534)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:382)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-09-09 16:18:36,707] ERROR WorkerSinkTaskid=jdbc_sink_postgres_022-0 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
我做错了吗?或者如何在 postgres 表中插入这个简单的 json?
谢谢,
【问题讨论】:
【参考方案1】:这个问题现在已经为我解决了,因为我将“字段”下的“名称”更改为“字段”,现在有了这个更改,它工作正常,
以前的 json 是:
“模式”: “类型”:“结构”, “名称”:“测试数据库”, “可选”:假, “字段”:[ “名称”:“sub_id”, “可选”:假, “类型”:“字符串” , “名称”:“子名称”, “可选”:假, “类型”:“字符串” ] , “有效载荷”: "sub_id": "10000", “sub_name”:“Sssss”
改变的json是:
“模式”: “类型”:“结构”, “名称”:“测试数据库”, “可选”:假, “字段”:[ “字段”:“sub_id”, “可选”:假, “类型”:“字符串” , “字段”:“子名称”, “可选”:假, “类型”:“字符串” ] , “有效载荷”: "sub_id": "10001", “sub_name”:“Ggggg”
【讨论】:
以上是关于kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表的主要内容,如果未能解决你的问题,请参考以下文章
提取和转换 jdbc sink 连接器的 kafka 消息特定字段
kafka connect - 使用 hdfs sink 连接器进行 ExtractTopic 转换抛出 NullPointerException
为jdbc sink连接器提取和转换kafka消息的特定字段。