Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库

Posted

技术标签:

【中文标题】Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库【英文标题】:Kafka Sink how to map fields to db with different topic and table schema name 【发布时间】:2021-04-15 06:41:21 【问题描述】:

我目前正在使用主题名称 waiting-room 设置 Kafka Sink 连接器,而我的数据库架构称为 waiting_room。所以我试图将主题消息映射到数据库模式,但我没有看到任何数据进入数据库。 所以我尝试了以下场景:

    因此,由于表架构是 waiting_room,我尝试添加 quote.sql.identifier=ALWAYS,因为它引用表名并允许 Kafka 接收器引用它,以便它可以映射到表,但我在 Kafka 中没有看到 quote.sql.identifier=ALWAYS下沉。 table.schema 和 Kafka 接收器是否都需要引用才能映射它,或者我如何将表模式映射为下划线并让 kafka 映射它 然后,如果我更改了 table.name.format=waiting-room 并让 db schema = gt.namespace."waiting-room" 我看不到我的 kafka 接收器得到更新,但我的 table.name.format 将 = waiting_room 并具有状态连接器的 404 未找到。

当主题和数据库名称不同时,有没有办法映射并将数据输入数据库

【问题讨论】:

【参考方案1】:

尝试使用Kafka Connect SMTRegexRouter


    "task.max": "1",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "'"$URL"'",
    "topics": "waiting-room",

    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "waiting-room",
    "transforms.route.replacement": "gt.namespace.waiting_room",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": true

【讨论】:

1)这是否意味着 db.schema 将保留为 gt.namespace.topic_name 2) 如果是这样,那么我们是否仍然需要 quote.sql.identifier:始终将其映射到引用 db 3 ) 是否会影响消息 4) 如果数据库名称是 topic_name 5) 如果我的 avro 文件包含以下字段:计数、WaitingType、时间戳,并且数据库模式将其设置为“计数”,我们是否还需要 table.name.format = topic_name ", "WaitingType" "timestamp", kafka sink 是否也可以转换例如 waiting_type 并让数据进入数据库而不影响消息 你知道如果我使用转换 SMT 为什么我的数据在 db 中翻倍 > 为什么我的数据在数据库中翻了一番。仅当您同时运行两个连接器时。 两个不同的数据库怎么样,例如 gt.table1 和 gt.table2 相同的凭据?我只是将它与逗号分开添加到列表中,还是单独制作 transforms.RenameSessionField 与 transforms.RenameReservationField ***.com/questions/66838248/…

以上是关于Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库的主要内容,如果未能解决你的问题,请参考以下文章

无法使用镜头 kudu sink 连接器将数据从 kafka 主题插入或更新到 kudu 表

Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields

Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis

如何在 Kafka Connect Sink 中指定 Kafka 主题的分区

无法使用 JDBC kafka-sink-connector 将 kafka 主题数据写入 postgres DB

提取和转换 jdbc sink 连接器的 kafka 消息特定字段