Confluent Kafka Sink Connector 未将数据加载到 Postgres 表

Posted

技术标签:

【中文标题】Confluent Kafka Sink Connector 未将数据加载到 Postgres 表【英文标题】:Confluent Kafka Sink Connector is not loading data to Postgres table 【发布时间】:2018-11-02 19:53:28 【问题描述】:

我正在尝试通过 Kafka Sink 连接器将数据加载到 Postgres 表,但出现以下错误:

原因:org.apache.kafka.connect.errors.ConnectException:无法更改以添加缺少的字段 SinkRecordFieldschema=SchemaSTRING, name='A_ABBREV', isPrimaryKey=false,因为它不是可选的并且没有默认值

Postgres DB 中的表已经包含字段 A_ABBREV,但现在可以确定为什么我会收到缺少字段错误。

有人遇到过类似的问题吗?

以下是我的接收器连接器配置:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=AGENCY
connection.password=passcode
topics=AGENCIES
tasks.max=1
batch.size=10000
fields.whitelist=A_ID, A_NAME, A_ABBREV
connection.user=pmmdevuser
name=partner5-jdbcSinkConnector
connection.url=jdbc:postgresql://aws-db.sdfdgfdrwwisc.us-east- 1.rds.amazonaws.com:3306/pmmdevdb?currentSchema=ams
insert.mode=upsert
pk.mode=record_value
pk.fields=A_ID
auto.create=false

我正在使用 Liquibase 脚本创建表,下面是通过 Liquibase 脚本创建的 postgres DB 中的创建查询:

"CREATE TABLE gds.agency
(
    a_id integer NOT NULL,
    a_name character varying(100) COLLATE pg_catalog."default" NOT NULL,
    a_abbrev character varying(8) COLLATE pg_catalog."default" NOT NULL,
    source character varying(255) COLLATE pg_catalog."default" NOT NULL DEFAULT 'AMS'::character varying,
    CONSTRAINT pk_agency PRIMARY KEY (a_id),
    CONSTRAINT a_abbrev_uk1 UNIQUE (a_abbrev)
)"

【问题讨论】:

听起来您的表或配置在您第一次启动后发生了变化 【参考方案1】:

根据我的经验,这意味着接收器的字段定义与源表/数据库的字段定义不匹配。确保字段定义匹配。检查接收器连接器尝试写入目标数据库的单个记录。您应该能够在堆栈跟踪中以调试模式看到此插入语句。获取该查询并手动运行它,以便更清楚地了解数据库中的错误。

【讨论】:

以上是关于Confluent Kafka Sink Connector 未将数据加载到 Postgres 表的主要内容,如果未能解决你的问题,请参考以下文章

Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?

Confluent Kafka Sink Connector 未将数据加载到 Postgres 表

Confluent Kafka Connect HDFS Sink 连接器延迟

Confluent Cloud Kafka - 审计日志集群:接收器连接器

在 Confluent S3 Kafka 连接器中压缩 Avro 数据

如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库