使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium
Posted
技术标签:
【中文标题】使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium【英文标题】:Debezium with AWS MSK NOT_ENOUGH_REPLICAS 【发布时间】:2020-04-12 22:40:09 【问题描述】:我在 AWS 中有一个正在运行的 debezium 集群,这没有问题。我想试试 AWS MSK。所以我启动了一个集群。然后我启动了一个 EC2 来运行我的连接器。
然后安装confluent-kafka
sudo apt-get update && sudo apt-get install confluent-platform-2.12
默认情况下,AWS MSK 没有架构注册表,所以我从连接器 EC2 配置它 架构注册表配置文件:
kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181
kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
然后/etc/kafka/connect-distributed.properties
文件
bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
安装连接器:
confluent-hub install debezium/debezium-connector-mysql:latest
启动服务
systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
现在一切都开始了。然后我创建了一个 mysql.json 文件。
"name": "mysql-connector-db01",
"config":
"name": "mysql-connector-db01",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.id": "1",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
"database.history.kafka.topic": "schema-changes.mysql",
"database.server.name": "mysql-db01",
"database.hostname": "172.31.84.129",
"database.port": "3306",
"database.user": "bhuvi",
"database.password": "my_stong_password",
"database.whitelist": "proddb,test",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
"transforms.unwrap.add.source.fields": "ts_ms",
创建 debezium 连接器
curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.josn
然后声明在连接器 EC2 中给出此错误。
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,290] WARN [Producer clientId=producer-3] Got error produce response with correlation id 844 on topic-partition connect-configs-0, retrying (2147482809 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,391] WARN [Producer clientId=producer-3] Got error produce response with correlation id 845 on topic-partition connect-configs-0, retrying (2147482808 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,492] WARN [Producer clientId=producer-3] Got error produce response with correlation id 846 on topic-partition connect-configs-0, retrying (2147482807 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,593] WARN [Producer clientId=producer-3] Got error produce response with correlation id 847 on topic-partition connect-configs-0, retrying (2147482806 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
它永远不会停止此错误消息。
描述连接配置
Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: connect-configs Partition: 0 Leader: 2 Replicas: 2 Isr: 2
【问题讨论】:
请描述连接配置主题 我会在一段时间内发布它,但我记得它的复制因子是 3 我猜 刚刚检查过,1 是那里的默认值。我改成3了,还是一样的错误 我删除了所有的 connect-* 主题并将复制因子更改为 3,现在它可以工作了。非常感谢。但我不知道为什么它不能与 1 一起使用?如果您能帮助我理解这一点,那就太好了 Kafka Connect 中使用的生产者可能正在使用 ack 配置或 min ISR 高于 1。这是我看到该错误的仅有的两种情况 【参考方案1】:默认情况下,MSK 将所有主题的 min.in.sync.replicas
设置为 2(请参阅 https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html)
Kafka Connect 可能使用 ACKs="all" 进行生产,并且由于您只有一份主题副本,因此它永远无法达到足够的法定人数。
【讨论】:
是的,默认情况下它使用 "ACKs=all" (docs.confluent.io/5.5.0/installation/configuration/…) 和 "offsets.topic.replication.factor=3" (docs.confluent.io/5.5.0/installation/configuration/…) 所以,默认情况下,kafka 连接器需要一个最小值3 个节点(1 个领导者 + 2 个副本)。 您也可以在 Kafka-connect 级别更改它:offset.storage.replication.factor、config.storage.replication.factor、status.storage.replication.factor=1,如果运行时使用Docker:CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR,CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR CONNECT_STATUS_STORAGE_REPLICATION_FACTOR 将值设置为保存为您的 MSK“min.insync.replicas” 我把@shlomiLan提到的所有值都改成了2(MSK默认min.insync.replicas
的值),效果很好!以上是关于使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录
使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium
AWS Kafka (MSK) - 如何生成 Keystore 和 truststore 并在我的 Spring Cloud Stream 应用程序中使用它们?