MongoDB 作为接收器连接器未按预期捕获数据 - kafka?
Posted
技术标签:
【中文标题】MongoDB 作为接收器连接器未按预期捕获数据 - kafka?【英文标题】:MongoDB as sink connector not capturing data as expected - kafka? 【发布时间】:2021-09-05 11:19:43 【问题描述】:我目前使用 mysql 数据库作为源连接器,使用下面的配置,我想监控数据库的更改并将其发送到 mongoDB,
这是我的源连接器配置,
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''
"name": "source_mysql_connector",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "host.docker.internal",
"database.port": "3306",
"database.user": "test",
"database.password": "$apr1$o7RbW.GvrPIY1",
"database.server.id": "8111999",
"database.server.name": "db_source",
"database.include.list": "example",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "schema-changes.example",
"database.allowPublicKeyRetrieval":"true",
"include.schema.changes": "true"
'''
这是我的接收器连接器 (mongodb) 配置,
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''
"name": "sink_mongodb_connector",
"config":
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"topics":"db_source.example.employees",
"connection.uri":"mongodb://172.17.0.1:27017/example?w=1&journal=true",
"database":"example",
"collection":"employees",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
'''
使用它,我能够建立连接并捕获数据更改并将它们存储到名为 employees 的表的 mongodb 集合中,
但这里的问题是,当我在 mongodb 中检查集合时,文档是这样保存的,
"_id" : ObjectId("60d0e6939e00e22f274ccac1"), "before" : null, "after" : "id" : NumberLong(11), "name" : "Steve Shining", "team" : "DevOps", "birthday" : 11477 , "source" : "version" : "1.5.0.Final", "connector" : "mysql", "name" : "db_source", "ts_ms" : NumberLong("1624303251000"), "snapshot" : "false", "db" : "example", "sequence" : null, "table" : "employees", "server_id" : NumberLong(6030811), "gtid" : null, "file" : "mysql-bin.000003", "pos" : NumberLong(5445), "row" : 2, "thread" : null, "query" : null , "op" : "c", "ts_ms" : NumberLong("1624303251190"), "transaction" : null
"_id" : ObjectId("60d0e6939e00e22f274ccac2"), "before" : null, "after" : "id" : NumberLong(12), "name" : "John", "team" : "Support", "birthday" : 6270 , "source" : "version" : "1.5.0.Final", "connector" : "mysql", "name" : "db_source", "ts_ms" : NumberLong("1624303251000"), "snapshot" : "false", "db" : "example", "sequence" : null, "table" : "employees", "server_id" : NumberLong(6030811), "gtid" : null, "file" : "mysql-bin.000003", "pos" : NumberLong(5445), "row" : 3, "thread" : null, "query" : null , "op" : "c", "ts_ms" : NumberLong("1624303251190"), "transaction" : null
但是我的mysql数据库是这样的,
mysql> select * from employees;
+----+---------------+-----------+------------+------------+
| id | name | team | birthday |
+----+---------------+-----------+------------+------------+
| 1 | Peter Smith | DevOps | 2003-07-21 |
| 11 | Steve Shining | DevOps | 2001-06-04 |
| 12 | John | Support | 1987-03-03 |
+----+---------------+-----------+------------+------------+
我希望我的收藏看起来像这样,
"_id" : ObjectId("60d0e6939e00e22f274ccac2"), "name" : "John", "team" : "Support", "birthday" : "1987-03-03 "
我在这里做错了什么?即使删除消息像这样存储在集合中,它也无法识别消息和所有内容。我如何解决它?连日期都没有正确存储?
更新:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''
"name": "sink_mongodb_connector",
"config":
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"topics":"db_source.example.employees",
"connection.uri":"mongodb://172.17.0.1:27017/example?w=1&journal=true",
"database":"example",
"collection":"employees",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
'''
【问题讨论】:
【参考方案1】:问题与 Mongo 无关,而是默认的 Debezium 格式。
您看到的是之前、之后和其他 CDC 事件元数据。
无法识别消息
它是,虽然..."after" : "id" : NumberLong(12), "name" : "John", "team" : "Support", "birthday" : 6270
您需要提取/展平事件,以便仅获得“之后”字段
https://debezium.io/documentation/reference/configuration/event-flattening.html
关于生日/日期值,似乎是一个单独的问题
【讨论】:
那么,如果我只得到后字段,它是否适用于源上的删除命令?那么我应该在源配置还是接收器配置中将其展平? 行删除的后续事件将是null
。您需要二次转换来删除空事件。
在哪里进行转换取决于主题中需要多少数据
换句话说,你想要你目前拥有的所有数据吗? (id、before、after、table info 等),还是仅主题中的 after 字段?您可能想要其他消费者的其他信息,因此仅扁平化接收器的数据可能是有意义的。否则,在源头做
感谢您提供的信息,当我将 flatten 添加到源配置时,它现在按预期工作。唯一的问题是没有正确解析日期以上是关于MongoDB 作为接收器连接器未按预期捕获数据 - kafka?的主要内容,如果未能解决你的问题,请参考以下文章
MongoDB diacriticInSensitive 搜索未按预期显示所有重音(带有变音符号的单词)行,反之亦然
如何配置 Debezium 的 MongoDB 源连接器以按照 Postgres JDBC 接收器连接器的预期发送 record_value 中的 pk 字段