kafka connect到底会不会重写/丢失数据
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka connect到底会不会重写/丢失数据相关的知识,希望对你有一定的参考价值。
参考技术A 写入hdfs的最后一条记录的offset,记录在文件名中;数据是不停的往tmp文件写,然后rename至目标文件的,详见:
http://blog.csdn.net/xianzhen376/article/details/51831448
不同kafka 分区的数据 独立进行offset 编号;
不同kafka 分区的数据 不会写往同一hdfs文件;
2.2 恢复流程:
恢复处理是基于kafka 分区的
从hdfs 中根据文件名拿到最后一条记录的offset,假设为12345678;
通知kafka 该分区的数据,connect consumer group下次从12345678开始读数据;
2.3 流程分析
这个流程基本保证了数据不会重写,但是会丢。数据丢失的情况:
刚开始读取数据,记录已经独到了100,目标路径下还没有文件生成;
kafka connect 已经commit过offset,比如commit过90了;
在上述处理过程中,第一步拿不到最后一条记录的offset。所以不会去重置kafka server端的消费offset记录。kafka connect恢复后会从91开始读取数据,0-90的数据就丢失了。
2.4 相关issue
https://github.com/confluentinc/kafka-connect-hdfs/issues/57
一种解决办法,就是不要跟kafka server端commit offset。commit 动作当前是kafka connect框架做的。要去该kafka connect的实现。
不使用 Kafka Connect 复制架构更改
【中文标题】不使用 Kafka Connect 复制架构更改【英文标题】:Schema Changes Not Replicating with Kafka Connect 【发布时间】:2021-08-19 04:45:54 【问题描述】:目标是:MySQL -> Kafka -> MySQL。接收器目标应与生产保持同步。
插入和删除记录很好,但我遇到了架构更改问题,例如删除的列。更改不会复制到接收器目标。
我的来源:
"name": "hub-connector",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "study",
"database.include.list": "companyHub",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "dbhistory.study",
"include.schema.changes": "true"
我的水槽
"name": "companyHub-sink",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://172.18.141.102:3306/Hub",
"connection.user": "user",
"connection.password": "passaword",
"topics": "study.companyHub.countryTaxes, study.companyHub.addresses",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"transforms": "dropPrefix, unwrap",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"study.companyHub.(.*)$",
"transforms.dropPrefix.replacement":"$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
任何帮助将不胜感激!
【问题讨论】:
【参考方案1】:自动创造和自动进化 提示
确保 JDBC 用户具有适当的 DDL 权限。
如果启用了 auto.create,如果发现目标表丢失,连接器可以创建目标表。由于连接器使用记录模式作为表定义的基础,因此创建在线进行,记录从主题中使用。主键是根据键配置设置指定的。
如果启用了 auto.evolve,则连接器可以在遇到发现缺少列的记录时通过在目标表上发出 ALTER 来执行有限的自动进化。由于数据类型的更改和列的删除可能很危险,因此连接器不会尝试对表执行此类演变。也没有尝试添加主键约束。相反,如果 auto.evolve 被禁用,则不会执行任何进化,并且连接器任务会失败并出现错误,指出缺少列。
对于自动创建和自动演化,列的可空性基于架构中相应字段的可选性,并且如果适用,还根据相应字段的默认值指定默认值。我们使用以下从 Connect 模式类型到数据库特定类型的映射:
架构类型 MySQL Oracle PostgreSQL SQLite SQL Server Vertica
此处未提及的数据库不支持自动创建或自动进化。
重要
对于向后兼容的表架构演变,记录架构中的新字段必须是可选的或具有默认值。如果您需要删除字段,则应手动更改表架构以删除相应的列、为其分配默认值或使其可为空。
【讨论】:
“由于数据类型更改和列删除可能很危险,因此连接器不会尝试在表上执行此类演变” 谢谢...你是说不可能吗? 根据 jdbc sink 连接器文档,不支持更改 drop 列,虽然我发现以下文档似乎试图解决这个问题,但请注意由您来处理ddl的变化。 debezium.io/documentation/reference/connectors/… ... 架构更改主题以上是关于kafka connect到底会不会重写/丢失数据的主要内容,如果未能解决你的问题,请参考以下文章
不看损失大了,刨根问底,Kafka消息中间件到底会不会丢消息