Debezium 创建新主题时如何编辑复制因子
Posted
技术标签:
【中文标题】Debezium 创建新主题时如何编辑复制因子【英文标题】:How to edit replication factor when Debezium creates a new topic 【发布时间】:2021-08-13 22:32:30 【问题描述】:我正在使用 kafka-connect + Debezium 来捕获我的 mysql 集群中的数据更改。目前,我们使用 Debezium 的 table.whitelist
仅摄取 MySQL 集群中非常小的表子集。每当我向table.whitelist
配置添加新表时,Debezium 都会创建一个仅使用replication_factor = 2
和partition=1
的新kafka 主题。我的 Kafka 集群共有 4 个代理(1 个领导者和 3 个追随者),在我的 /opt/kafka-connect/worker.properties
文件中,我设置了以下内容:
offset.storage.replication.factor=4
config.storage.replication.factor=4
status.storage.replication.factor=4
这导致我每次添加新表时都必须手动重新平衡主题,并且它会很快变旧。 最后一个条目是最近添加的新 MySQL 表,注意它在 2 个 broker 上复制,而其他主题在 3 个上复制。
我是否编辑了错误的配置文件或使用了错误的设置?请停下来!
下面是我的 Debezium 配置
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "none",
"max.queue.size": "300000",
"tasks.max": "8",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<USER>\" password=\"<PASSWORD>\";",
"database.history.kafka.topic": "dbhistory_mysql",
"database.history.consumer.ssl.truststore.password": "<PASSWORD>",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
"table.whitelist": "schema.table1,schema.table2,schema.table3,schema.table4,schema.table5",
"decimal.handling.mode": "string",
"database.history.kafka.recovery.poll.interval.ms": "30000",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<USER>\" password=\"<PASSWORD>\";",
"database.user": "debezium",
"database.server.id": "123",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.kafka.bootstrap.servers": "broker1.com:9092,broker2.com:9092,broker3.com:9092,broker4.com:9092",
"database.history.producer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
"database.server.name": "mysql",
"database.port": "3306",
"database.serverTimezone": "US/Pacific",
"database.history.producer.ssl.truststore.password": "<PASSWORD>",
"database.hostname": "mysql.cluster.amazonaws.com",
"database.password": "<PASSWORD>",
"name": "mysql-connector",
"max.batch.size": "20480",
"database.history.consumer.sasl.mechanism": "PLAIN"
【问题讨论】:
您可以在问题中添加您的连接器配置吗? @AchyutVyas 你的意思是worker.properties
还是 debezium 配置?
debezium 配置
@AchyutVyas 我已经编辑了我的原始帖子以包含 Debezium 配置
检查我的答案,你必须在你的 debezium 连接器配置中添加这个(旁注:你已经指定了 tasks.max =8,所以它运行 8 个任务还是只运行 1 个?)
【参考方案1】:
这三个属性是内部 Connect 主题,而不是自动创建的用户主题。
对于自动创建的主题(建议禁用),您在代理 server.properties 中设置默认分区和复制因子
num.partitions
default.replication.factor
我相信这些也是可以使用 kafka-configs
工具覆盖的动态变量...也许可以使用 --alter --entity-type brokers --entity-default
... 但可能更容易推断该属性是否实际上在配置文件中
【讨论】:
【参考方案2】:您可以在 debezium 配置中指定主题复制因子和分区计数
Ref
前
在您的 debezium 连接器配置中添加以下行
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "60"
【讨论】:
以上是关于Debezium 创建新主题时如何编辑复制因子的主要内容,如果未能解决你的问题,请参考以下文章
使用选定列更改表 - debezium 抛出 ArrayIndexOutOfBoundsException