如何在从 debezium kafka connect 收到的 CDC 事件中获取表名和数据库名

Posted

技术标签:

【中文标题】如何在从 debezium kafka connect 收到的 CDC 事件中获取表名和数据库名【英文标题】:How to get the table-name and database-name in the CDC event received from debezium kafka connect 【发布时间】:2019-09-20 11:06:24 【问题描述】:

设置:我在 MS SQL Server 上启用了 CDC,并且使用 debezium kafka connect(source) 将 CDC 事件馈送到 Kafka。此外,多个表 CDC 事件被路由到 Kafka 中的单个主题。

问题:由于我在kafka topic里有多个表数据,所以想在CDC数据中有表名和数据库名。

我在 mysql CDC 中获取表名和数据库名,但在 MS SQL CDC 中没有。

下面是 SQL Server 的 Debezium 源连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
  "name": "cdc-user_profile-connector",
  "config": 
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.hostname": "<<hostname>>",
    "database.port": "<<port>>",
    "database.user": "<<username>>",
    "database.password": "<<password>>",
    "database.server.name": "test",
    "database.dbname": "testDb",
    "table.whitelist": "schema01.table1,schema01.table2",
    "database.history.kafka.bootstrap.servers": "broker:9092",
    "database.history.kafka.topic": "digital.user_profile.schema.audit",
    "database.history.store.only.monitored.tables.ddl": true,
    "include.schema.changes": false,
    "event.deserialization.failure.handling.mode": "fail",
    "snapshot.mode": "initial_schema_only",
    "snapshot.locking.mode": "none",
    "transforms":"addStaticField,topicRoute",
    "transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addStaticField.static.field":"source_system",
    "transforms.addStaticField.static.value":"source_system_1",
    "transforms.topicRoute.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.topicRoute.regex":"(.*)",
    "transforms.topicRoute.replacement":"digital.user_profile",
    "errors.tolerance": "none",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "errors.retry.delay.max.ms": 60000,
    "errors.retry.timeout": 300000
  
'

我得到以下输出(演示数据)


  "before": 
    "profile_id": 147,
    "email_address": "test@gmail.com"
  ,
  "after": 
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  ,
  "source": 
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false
  ,
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"

我的要求是得到如下


  "before": 
    "profile_id": 147,
    "email_address": "test@gmail.com"
  ,
  "after": 
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  ,
  "source": 
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false,
    "db": "testDb",
    "table": "table1/table2"
  ,
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"

【问题讨论】:

您可以为每个表创建一个连接器,然后添加static.field 以添加到表中。 感谢@cricket_007,这将是在推出issues.jboss.org/browse/DBZ-875 之前最好的解决方法。 【参考方案1】:

这是https://issues.jboss.org/browse/DBZ-875问题的一部分

【讨论】:

谢谢@jiri,这个功能什么时候推出。 在下一个版本中 - 我估计在 3 周内。【参考方案2】:

Debezium Kafka-Connect 通常将每个表中的数据放在单独的主题中,主题名称的格式为 hostname.database.table。我们一般使用主题名来区分源表和数据库名。

如果您要将所有表中的数据手动放入一个主题中,那么您可能还必须手动添加表和数据库名称。

【讨论】:

我如何在 CDC 事件中手动添加它?。向 kafka 主题添加多个表数据的原因是因为这是一个企业数据管道,可能有 100 个表。如果我们继续增加这样的主题,那么 kafka 集群的性能将会下降。此外,kafka connect 通过设置这两个属性 CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" 来提供此功能。

以上是关于如何在从 debezium kafka connect 收到的 CDC 事件中获取表名和数据库名的主要内容,如果未能解决你的问题,请参考以下文章

如何配置 Debezium 以使用特定列作为 Kafka 消息键?

如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?

如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?

如何使用Debezium从MS SQL中将250张表导入Kafka

Kafka乱码——中文单词在debezium中变成乱码

如何使用 debezium mysql 连接器 kafka 进行初始快照加载?