Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库
Posted
技术标签:
【中文标题】Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库【英文标题】:Kafka Sink how to map fields to db with different topic and table schema name 【发布时间】:2021-04-15 06:41:21 【问题描述】:我目前正在使用主题名称 waiting-room
设置 Kafka Sink 连接器,而我的数据库架构称为 waiting_room
。所以我试图将主题消息映射到数据库模式,但我没有看到任何数据进入数据库。
所以我尝试了以下场景:
-
因此,由于表架构是
waiting_room
,我尝试添加 quote.sql.identifier=ALWAYS
,因为它引用表名并允许 Kafka 接收器引用它,以便它可以映射到表,但我在 Kafka 中没有看到 quote.sql.identifier=ALWAYS
下沉。 table.schema 和 Kafka 接收器是否都需要引用才能映射它,或者我如何将表模式映射为下划线并让 kafka 映射它
然后,如果我更改了 table.name.format=waiting-room
并让 db schema = gt.namespace."waiting-room" 我看不到我的 kafka 接收器得到更新,但我的 table.name.format 将 = waiting_room 并具有状态连接器的 404 未找到。
当主题和数据库名称不同时,有没有办法映射并将数据输入数据库
【问题讨论】:
【参考方案1】:尝试使用Kafka Connect SMTRegexRouter
:
"task.max": "1",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "'"$URL"'",
"topics": "waiting-room",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "waiting-room",
"transforms.route.replacement": "gt.namespace.waiting_room",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": true
【讨论】:
1)这是否意味着 db.schema 将保留为 gt.namespace.topic_name 2) 如果是这样,那么我们是否仍然需要 quote.sql.identifier:始终将其映射到引用 db 3 ) 是否会影响消息 4) 如果数据库名称是 topic_name 5) 如果我的 avro 文件包含以下字段:计数、WaitingType、时间戳,并且数据库模式将其设置为“计数”,我们是否还需要 table.name.format = topic_name ", "WaitingType" "timestamp", kafka sink 是否也可以转换例如 waiting_type 并让数据进入数据库而不影响消息 你知道如果我使用转换 SMT 为什么我的数据在 db 中翻倍 > 为什么我的数据在数据库中翻了一番。仅当您同时运行两个连接器时。 两个不同的数据库怎么样,例如 gt.table1 和 gt.table2 相同的凭据?我只是将它与逗号分开添加到列表中,还是单独制作 transforms.RenameSessionField 与 transforms.RenameReservationField ***.com/questions/66838248/…以上是关于Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库的主要内容,如果未能解决你的问题,请参考以下文章
无法使用镜头 kudu sink 连接器将数据从 kafka 主题插入或更新到 kudu 表
Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields
Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis
如何在 Kafka Connect Sink 中指定 Kafka 主题的分区