是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?

Posted

技术标签:

【中文标题】是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?【英文标题】:Is it possible to have one Elasticsearch Index for one database with tables using debezium and kafka? 【发布时间】:2021-10-10 04:26:15 【问题描述】:

我有这个连接器和接收器,它基本上创建了一个主题 “Test.dbo.TEST_A”并写入 ES 索引“Test”。我设置了 "key.ignore": "false" 以便在 ES 和 "transforms.unwrap.add.fields":"table" 跟踪文档属于哪个表。


    "name": "Test-connector", 
    "config": 
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "tasks.max": "1",
        "database.hostname": "192.168.1.234", 
        "database.port": "1433", 
        "database.user": "user", 
        "database.password": "pass", 
        "database.dbname": "Test", 
        "database.server.name": "MyServer",
        "table.include.list": "dbo.TEST_A",
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.testA",

        "transforms": "unwrap",

        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields":"table"
    


    "name": "elastic-sink-test",
    "config": 
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "TEST_A",
        "connection.url": "http://localhost:9200/",
        "string.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schema.enable": "false",
        "schema.ignore": "true",

        "transforms": "topicRoute,unwrap,key",

        "transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.topicRoute.regex": "(.*).dbo.TEST_A",                          /* Use the database name */
        "transforms.topicRoute.replacement": "$1",

        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",    
        "transforms.unwrap.drop.tombstones": "false",    

        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "Id",       

        "key.ignore": "false",                                                        
        "type.name": "TEST_A",
        "behavior.on.null.values": "delete"                                                     
    

但是当我添加另一个连接器/接收器以包含数据库中的另一个表“TEST_B”时。 似乎每当来自 TEST_A 和 TEST_B 的 id 相同时,就会从 ES 中删除该行?

这种设置是否有可能拥有一个索引 = 一个数据库,或者是每个表拥有一个索引的唯一解决方案? 我想要一个索引=一个数据库的原因是当更多的数据库添加到ES时减少索引的数量。

【问题讨论】:

hmm,在我的“测试连接器”中,我也许可以创建一个带有 tablename+id 的字段“uniqueId”并将其用作 ES 文档 ID。我在考虑 docs.confluent.io/platform/current/connect/transforms/… 或 Ksql 还是有其他更好的选择? 【参考方案1】:

您正在从不同的数据库/表中读取数据更改并将它们写入同一个 ElasticSearch 索引,并将 ES 文档 ID 设置为数据库记录 ID。如您所见,如果 DB 记录 ID 发生冲突,则索引文档 ID 也会发生冲突,从而导致旧文档被删除。

你有几个选择:

每个数据库/表名称的 ElasticSearch 索引:您可以使用不同的连接器或自定义单消息转换 (SMT) 来实现这一点 全局唯一数据库记录:如果您控制源表的架构,则可以将主键设置为 UUID。这将防止 ID 冲突。 正如您在 cmets 中提到的,将 ES 文档 ID 设置为 DB/Table/ID。您可以使用 SMT 实施此更改

【讨论】:

以上是关于是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?的主要内容,如果未能解决你的问题,请参考以下文章

带有 debezium 嵌入式连接器的外部偏移存储

为啥我会收到很多带有 debezium 的重复项?

Debezium 是不是提供交货和订购保证?

带有 column.include.list 的 Debezium 连接器

ClassNotFoundException 使用带有本地类和 Debezium 引擎的 QuarkusClassLoader

带有正则表达式的 debezium 表白名单