kafka 连接器 jdbc-sink 最后出现语法错误

Posted

技术标签:

【中文标题】kafka 连接器 jdbc-sink 最后出现语法错误【英文标题】:kafka connector jdbc-sink syntax error at the end 【发布时间】:2019-09-18 18:48:55 【问题描述】:

我有一个关于 jdbc-sink 的问题。

postgres1 ---> kafka ---> postgres2

生产者工作正常,但消费者有错误:

连接_1 | org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException:java.sql.BatchUpdateException:批处理条目 0 插入“客户”(“id”)值(1)ON CONFLICT(“id”)做更新 SET 已中止:错误:输入 connect_1 末尾的语法错误 | 位置:77 调用getNextException查看批处理中的其他错误。

这是我的 source.json


"name": "src-table",
"config": 
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres1_container",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "database.whitelist": "postgres",
    "database.server.name": "postgres1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"

这是我的 jdbc-sink.json


    "name": "jdbc-sink",
    "config": 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres2_container:5432/postgres?user=postgres&password=postgres",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    

debezium/动物园管理员:0.9

debezium/kafka:0.9

debezium/postgres:9.6

debezium/connect:0.9

PostgreSQL JDBC 驱动程序 42.2.5

Kafka Connect JDBC 5.2.1

我尝试降级 jdbc 驱动程序和 confluent kafka connect 但仍然有同样的错误

【问题讨论】:

我发了一个和你类似的问题,你能看看吗?***.com/questions/68575974/… 【参考方案1】:

解决问题,因为当我在 postgres1 中创建表时,我没有将 id 设置为 PK 值

【讨论】:

您能解释一下您是如何在 postgres1 中创建表并将 id 设置为 PK 值的吗?谢谢! 对不起,我已经忘记了。 @大卫 没问题,我认为PK部分指的是这样的CREATE TABLE test(id BIGSERIAL PRIMARY KEY, value VARCHAR(40) NOT NULL)(postgresqltutorial.com/postgresql-serial)。 实际上我的问题有所不同,有点愚蠢,并且与无法自动创建的 Kafka 连接器配置中缺少主题 config/offset/status.storage.topic 相关 (docs.confluent.io/5.5.0/connect/…)。该错误消息对我来说非常具有误导性。希望这对某人有帮助!

以上是关于kafka 连接器 jdbc-sink 最后出现语法错误的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 无法与 Zookeeper 连接,出现错误“处于状态时等待连接超时:CONNECTING”

Kafka Mongodb 接收器连接器 - 更新文档

Kafka 代理关闭,因为日志目录失败

在 Kafka Connect 分布式模式下为多个主题配置连接器

flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)

Flume连接Kafka的broker出错