如何配置 Debezium 的 MongoDB 源连接器以按照 Postgres JDBC 接收器连接器的预期发送 record_value 中的 pk 字段

Posted

技术标签:

【中文标题】如何配置 Debezium 的 MongoDB 源连接器以按照 Postgres JDBC 接收器连接器的预期发送 record_value 中的 pk 字段【英文标题】:How can I configure Debezium's MongoDB source connector to send the pk fields in the record_value as expected by the Postgres JDBC sink connector 【发布时间】:2021-07-16 13:02:03 【问题描述】:

我正在尝试使用 Debezium 的 MongoDB 源连接器(1.5.0 版)和 Confluent 的 JDBC Sink 连接器(10.1.1 版)通过 Kafka Connect 将 MongoDB(4.2.0 版)与 PostgresDB(10 版)链接。

我对源连接器有以下配置:

name=mongodb-debezium-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
tasks.max=1
mongodb.hosts=rs0/127.0.0.1:27019
mongodb.name=gtmhub
tombstones.on.delete=true

对于接收器连接器,我有:

name=postgres-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=gtmhub.gtmhub.goals
connection.url=jdbc:postgresql://localhost:5432/gtmhub
connection.user=user
connection.password=password
auto.create=true
insert.mode=upsert
pk.fields=_id
pk.mode=record_value
transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.operation.header=true

我在独立模式下运行 Kafka Connect。源连接器在 test.test.goals 主题中发布了以下消息:

"schema":"type":"struct","fields":["type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after","type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch","type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter","type":"struct","fields":["type":"string","optional":false,"field":"version","type":"string","optional":false,"field":"connector","type":"string","optional":false,"field":"name","type":"int64","optional":false,"field":"ts_ms","type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":"allowed":"true,last,false","default":"false","field":"snapshot","type":"string","optional":false,"field":"db","type":"string","optional":true,"field":"sequence","type":"string","optional":false,"field":"rs","type":"string","optional":false,"field":"collection","type":"int32","optional":false,"field":"ord","type":"int64","optional":true,"field":"h","type":"int64","optional":true,"field":"tord","type":"string","optional":true,"field":"stxnid"],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source","type":"string","optional":true,"field":"op","type":"int64","optional":true,"field":"ts_ms","type":"struct","fields":["type":"string","optional":false,"field":"id","type":"int64","optional":false,"field":"total_order","type":"int64","optional":false,"field":"data_collection_order"],"optional":true,"field":"transaction"],"optional":false,"name":"gtmhub.gtmhub.goals.Envelope","payload":"after":"\"_id\": \"$oid\": \"607ff0a569460208cb3aa3f4\",\"accountId\": \"$oid\": \"604f3dda3935ce0001ce97ef\",\"sessionId\": \"$oid\": \"605b66dccc3b499a4a0d0afa\",\"name\": \"alabala1\",\"description\": \"\",\"ownerId\": \"$oid\": \"604f3dda3935ce0001ce97f0\",\"dateCreated\": \"$date\": 1616603048895,\"dateFrom\": \"$date\": 1616450400000,\"dateTo\": \"$date\": 1625086799999,\"createdById\": \"$oid\": \"604f3dda3935ce0001ce97f0\",\"attainment\": 0.0,\"aggregatedAttainment\": 0.0,\"fullAggregatedAttainment\": 0.0,\"childrenAggregatedAttainment\": 0.0,\"metricsAttainment\": 0.0,\"fullSubTreeCount\": 0.0,\"private\": false,\"isDeleted\": false","patch":null,"filter":null,"source":"version":"1.5.0.Final","connector":"mongodb","name":"gtmhub","ts_ms":1619002912000,"snapshot":"true","db":"gtmhub","sequence":null,"rs":"rs0","collection":"goals","ord":1,"h":0,"tord":null,"stxnid":null,"op":"r","ts_ms":1619002912791,"transaction":null
"schema":"type":"struct","fields":["type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after","type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch","type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter","type":"struct","fields":["type":"string","optional":false,"field":"version","type":"string","optional":false,"field":"connector","type":"string","optional":false,"field":"name","type":"int64","optional":false,"field":"ts_ms","type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":"allowed":"true,last,false","default":"false","field":"snapshot","type":"string","optional":false,"field":"db","type":"string","optional":true,"field":"sequence","type":"string","optional":false,"field":"rs","type":"string","optional":false,"field":"collection","type":"int32","optional":false,"field":"ord","type":"int64","optional":true,"field":"h","type":"int64","optional":true,"field":"tord","type":"string","optional":true,"field":"stxnid"],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source","type":"string","optional":true,"field":"op","type":"int64","optional":true,"field":"ts_ms","type":"struct","fields":["type":"string","optional":false,"field":"id","type":"int64","optional":false,"field":"total_order","type":"int64","optional":false,"field":"data_collection_order"],"optional":true,"field":"transaction"],"optional":false,"name":"gtmhub.gtmhub.goals.Envelope","payload":"after":null,"patch":"\"$v\": 1,\"$set\": \"name\": \"alabala\"","filter":"\"_id\": \"$oid\": \"607ff0a569460208cb3aa3f4\"","source":"version":"1.5.0.Final","connector":"mongodb","name":"gtmhub","ts_ms":1619006391000,"snapshot":"false","db":"gtmhub","sequence":null,"rs":"rs0","collection":"goals","ord":1,"h":0,"tord":null,"stxnid":"ec07e1d8-299a-3074-a47e-de21c3b8348c:1","op":"u","ts_ms":1619006392023,"transaction":null

这些导致接收器连接器返回以下错误:

INFO Attempting to open connection #1 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2021-04-21 14:59:52,077] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:56)
[2021-04-21 14:59:52,089] ERROR WorkerSinkTaskid=postgres-sink-connector-0 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: PK mode for table 'goals' is RECORD_VALUE with configured PK fields [_id], but record value schema does not contain field: _id (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'goals' is RECORD_VALUE with configured PK fields [_id], but record value schema does not contain field: _id
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordValuePk(FieldsMetadata.java:280)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:105)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:116)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

当接收器连接器处于插入模式时,整个设置在将数据插入 Postgres 数据库方面工作得很好。唯一的问题是更新操作在 Postgres 中被视为新的数据行,这当然是不受欢迎的行为。因此,我根据文档在接收器连接器的配置中添加了 insert.mode=upsert 以及 pk.fieldspk.mode。不幸的是,我最终得到了来自接收器连接器无法从 record_value 中提取 pk 字段的错误。但是,由于我是 Kafka 新手,我不知道如何配置源连接器,以便它生成的消息在 record_value 中包含必要的 pk.field。

欢迎任何帮助和/或建议!

【问题讨论】:

【参考方案1】:

尝试接收连接器配置: pk.fields=id pk.mode=record_key

【讨论】:

以上是关于如何配置 Debezium 的 MongoDB 源连接器以按照 Postgres JDBC 接收器连接器的预期发送 record_value 中的 pk 字段的主要内容,如果未能解决你的问题,请参考以下文章

我们如何从带有 debezium kafka 连接器的副本集中的辅助 mongodb 节点跟踪 oplog?

如何在 Docker 中将 Debezium 连接到 MongoDB?

Debezium Kafka 连接器 mongodb:将 kafka 连接器连接到 mongodb 时出错

将 cdc 限制为特定集合的 Debezium mongodb 连接器属性

使用带有 Avro 序列化的 Debezium mongodb CDC 创建的模式太多

KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接