是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?
Posted
技术标签:
【中文标题】是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?【英文标题】:Is it possible to have one Elasticsearch Index for one database with tables using debezium and kafka? 【发布时间】:2021-10-10 04:26:15 【问题描述】:我有这个连接器和接收器,它基本上创建了一个主题 “Test.dbo.TEST_A”并写入 ES 索引“Test”。我设置了 "key.ignore": "false" 以便在 ES 和 "transforms.unwrap.add.fields":"table" 跟踪文档属于哪个表。
"name": "Test-connector",
"config":
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields":"table"
"name": "elastic-sink-test",
"config":
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"string.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable": "false",
"schema.ignore": "true",
"transforms": "topicRoute,unwrap,key",
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "(.*).dbo.TEST_A", /* Use the database name */
"transforms.topicRoute.replacement": "$1",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "Id",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
但是当我添加另一个连接器/接收器以包含数据库中的另一个表“TEST_B”时。 似乎每当来自 TEST_A 和 TEST_B 的 id 相同时,就会从 ES 中删除该行?
这种设置是否有可能拥有一个索引 = 一个数据库,或者是每个表拥有一个索引的唯一解决方案? 我想要一个索引=一个数据库的原因是当更多的数据库添加到ES时减少索引的数量。
【问题讨论】:
hmm,在我的“测试连接器”中,我也许可以创建一个带有 tablename+id 的字段“uniqueId”并将其用作 ES 文档 ID。我在考虑 docs.confluent.io/platform/current/connect/transforms/… 或 Ksql 还是有其他更好的选择? 【参考方案1】:您正在从不同的数据库/表中读取数据更改并将它们写入同一个 ElasticSearch 索引,并将 ES 文档 ID 设置为数据库记录 ID。如您所见,如果 DB 记录 ID 发生冲突,则索引文档 ID 也会发生冲突,从而导致旧文档被删除。
你有几个选择:
每个数据库/表名称的 ElasticSearch 索引:您可以使用不同的连接器或自定义单消息转换 (SMT) 来实现这一点 全局唯一数据库记录:如果您控制源表的架构,则可以将主键设置为 UUID。这将防止 ID 冲突。 正如您在 cmets 中提到的,将 ES 文档 ID 设置为 DB/Table/ID。您可以使用 SMT 实施此更改【讨论】:
以上是关于是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?的主要内容,如果未能解决你的问题,请参考以下文章
带有 column.include.list 的 Debezium 连接器
ClassNotFoundException 使用带有本地类和 Debezium 引擎的 QuarkusClassLoader