Debezium 创建新主题时如何编辑复制因子

Posted

技术标签:

【中文标题】Debezium 创建新主题时如何编辑复制因子【英文标题】:How to edit replication factor when Debezium creates a new topic 【发布时间】:2021-08-13 22:32:30 【问题描述】:

我正在使用 kafka-connect + Debezium 来捕获我的 mysql 集群中的数据更改。目前,我们使用 Debezium 的 table.whitelist 仅摄取 MySQL 集群中非常小的表子集。每当我向table.whitelist 配置添加新表时,Debezium 都会创建一个仅使用replication_factor = 2partition=1 的新kafka 主题。我的 Kafka 集群共有 4 个代理(1 个领导者和 3 个追随者),在我的 /opt/kafka-connect/worker.properties 文件中,我设置了以下内容:

offset.storage.replication.factor=4 config.storage.replication.factor=4 status.storage.replication.factor=4

这导致我每次添加新表时都必须手动重新平衡主题,并且它会很快变旧。 最后一个条目是最近添加的新 MySQL 表,注意它在 2 个 broker 上复制,而其他主题在 3 个上复制。

我是否编辑了错误的配置文件或使用了错误的设置?请停下来!

下面是我的 Debezium 配置


  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "none",
  "max.queue.size": "300000",
  "tasks.max": "8",
  "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"<USER>\"   password=\"<PASSWORD>\";",
  "database.history.kafka.topic": "dbhistory_mysql",
  "database.history.consumer.ssl.truststore.password": "<PASSWORD>",
  "database.history.consumer.security.protocol": "SASL_SSL",
  "database.history.consumer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
  "table.whitelist": "schema.table1,schema.table2,schema.table3,schema.table4,schema.table5",
  "decimal.handling.mode": "string",
  "database.history.kafka.recovery.poll.interval.ms": "30000",
  "database.history.producer.sasl.mechanism": "PLAIN",
  "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"<USER>\"   password=\"<PASSWORD>\";",
  "database.user": "debezium",
  "database.server.id": "123",
  "database.history.producer.security.protocol": "SASL_SSL",
  "database.history.kafka.bootstrap.servers": "broker1.com:9092,broker2.com:9092,broker3.com:9092,broker4.com:9092",
  "database.history.producer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
  "database.server.name": "mysql",
  "database.port": "3306",
  "database.serverTimezone": "US/Pacific",
  "database.history.producer.ssl.truststore.password": "<PASSWORD>",
  "database.hostname": "mysql.cluster.amazonaws.com",
  "database.password": "<PASSWORD>",
  "name": "mysql-connector",
  "max.batch.size": "20480",
  "database.history.consumer.sasl.mechanism": "PLAIN"

【问题讨论】:

您可以在问题中添加您的连接器配置吗? @AchyutVyas 你的意思是 worker.properties 还是 debezium 配置? debezium 配置 @AchyutVyas 我已经编辑了我的原始帖子以包含 Debezium 配置 检查我的答案,你必须在你的 debezium 连接器配置中添加这个(旁注:你已经指定了 tasks.max =8,所以它运行 8 个任务还是只运行 1 个?) 【参考方案1】:

这三个属性是内部 Connect 主题,而不是自动创建的用户主题。

对于自动创建的主题(建议禁用),您在代理 server.properties 中设置默认分区和复制因子

num.partitions
default.replication.factor

我相信这些也是可以使用 kafka-configs 工具覆盖的动态变量...也许可以使用 --alter --entity-type brokers --entity-default... 但可能更容易推断该属性是否实际上在配置文件中

【讨论】:

【参考方案2】:

您可以在 debezium 配置中指定主题复制因子和分区计数

Ref

在您的 debezium 连接器配置中添加以下行

"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "60"

【讨论】:

以上是关于Debezium 创建新主题时如何编辑复制因子的主要内容,如果未能解决你的问题,请参考以下文章

创建 Kafka 主题时出错 - 复制因子大于可用代理

使用选定列更改表 - debezium 抛出 ArrayIndexOutOfBoundsException

Debezium PostgresConnector 实例化主题,因为表中没有数据

Postgres 使用 debezium 创建复制槽失败

分区 w.r.t 到代理和分区之间关于主题的关联

在 Debezium 中无法根据 MySQL 的表创建某些主题