如何配置 Debezium 以使用特定列作为 Kafka 消息键?

Posted

技术标签:

【中文标题】如何配置 Debezium 以使用特定列作为 Kafka 消息键?【英文标题】:How to configure Debezium to use specific column as Kafka message key? 【发布时间】:2020-06-29 00:30:11 【问题描述】:

默认情况下,Debezium 使用表的主键作为消息键。例如,如果你有一张桌子

create table users
(
    id            bigint auto_increment primary key,
    department_id bigint
);

有数据

+----+----------------+
| id | department_id  |
+----+----------------+
|  5 |              1 |
|  6 |              1 |
|  7 |              2 |
+----+----------------+

Debezium 将产生以下 Kafka 消息:

Key: "id": 5 Value: "id": 5, "department_id": 1
Key: "id": 6 Value: "id": 6, "department_id": 1
Key: "id": 7 Value: "id": 7, "department_id": 2

问题是如何配置 Debezium 以使用 department_id 或任何其他列作为 Kafka 消息密钥?

【问题讨论】:

【参考方案1】:

为此有message.key.columns 参数。在连接器的配置中,您应该这样设置:


  "name": "my-connector",
  "config": 
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.whitelist": "my_database",
    ...
    "message.key.columns": "my_database.users:department_id"
  

所有关系型 Debezium 连接器都支持此参数。

您可以在此处找到更多信息:

https://debezium.io/blog/2019/09/26/debezium-0-10-0-cr2-released/ https://debezium.io/documentation/reference/1.0/assemblies/cdc-mysql-connector/as_deploy-the-mysql-connector.html#mysql-connector-configuration-properties_debezium

【讨论】:

请记住,此时不应压缩您的主题,因为您不会保留每个用户的最后一个事件,而是压缩后每个部门的最后一个事件。此外,当更改用户的部门时,这些事件可能会在不同的分区中结束,因此您不会有任何保证的顺序。

以上是关于如何配置 Debezium 以使用特定列作为 Kafka 消息键?的主要内容,如果未能解决你的问题,请参考以下文章

如何设置 Kafka 连接器以在 Debezium 中使用自定义转换?

带有 kafka 的 Debezium 还是只有嵌入式 Debezium?

如何配置 Debezium 的 MongoDB 源连接器以按照 Postgres JDBC 接收器连接器的预期发送 record_value 中的 pk 字段

使用 Debezium 的 Quarkus 发件箱模式:如何将自定义列添加到发件箱表

当debezium连接器从你的sql服务器获取数据时,有没有办法限制kafka连接堆空间

如何通过 docker-compose 实现 debezium 连接器(Oracle)?