Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库
Posted
技术标签:
【中文标题】Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库【英文标题】:Is there a way for Kafka Connect to ignore 1 field in the schema file and read the other fields into the db 【发布时间】:2021-08-02 08:42:18 【问题描述】:我正在尝试将此架构文件放入数据库中,其中一种字段类型是布尔值映射
reservation table
create table "table-reservation" (
"sessionId" varchar,
"enteredReservation" boolean,
latitude double,
longitude double
架构文件
"name":"sessionId",
"type":"string
,
"name":"enteredReservation",
"type": "map",
"values":"boolean"
,
"name":"latitude",
"type":"double"
,
"name":"longitude",
"type":"double"
接收器连接器的配置
"tasks.max":"1",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"URL",
"topics":"table-reservation",
"quote.sql.identifiers":"always",
"errors.tolerance":"all",
"errors.log.enable":"true",
"errors.log.include.messages":"true"
有没有办法只忽略enteredReservation
字段并将其设置为null。我读到如果我们使用 SMT DROP 它可以将其设置为 null DROP SMT
但是,它如何知道要删除哪个字段,而其他字段保持不变。或者 JDBC 接收器如何转换布尔值映射。有没有办法做到这一点?
【问题讨论】:
1.什么是“主题”? 2. 什么是“场”?这些不是 SQL 术语。 @OneCricketeer 这里的主题是餐桌预订和餐桌会话。这些字段是 db 模式中的列名。这次为了避免 SMT,我决定将模式中的表名和字段名双引号以匹配模式文件。但是我没有看到连接器出现并且消息读入数据库。知道为什么吗?我添加了 quote.sql.identifier 来双引号。我最初添加了 SMT 来重命名主题的字段,但是当我添加更多主题时它遇到了问题 @OneCricketeer 我更新了我的帖子,你可以看看 drop SMT 使整个键或值无效,而不是字段。我觉得我已经多次说过 Kafka Streams 或 KSQL 真的应该是你的解决方案。 SMT 不适用于非常简单的用例之外 - confluent.io/kafka-summit-nyc17/… 或者,使用文档数据库而不是平面关系数据库表 @OneCricketeer 在生产者端有一个 Kafka Streams 来产生消息。但是,在读取数据库的消费者方面,日志说“enteredReservation”是一种类型 MAP 而不是布尔型,在这种情况下是布尔型映射。有没有办法让消费者映射它,因为如果我在生产者端删除它会完全影响服务 【参考方案1】:下面的配置会忽略你想要的字段:
“变换”:“掩码”,
“transforms.mask.type”:“org.apache.kafka.connect.transforms.MaskField$Value”,
"transforms.mask.fields": "enteredReservation"
但它可能不适用于字符串和数字以外的类型。所以,你也可以试试黑名单:
“变换”:“替换字段”,
“transforms.ReplaceField.type”: "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "enteredReservation"
【讨论】:
以上是关于Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect HDFS 在 Confluent v4.0 中忽略了 flush.size
有没有办法检测 FAILED kafka connect 任务
有没有办法配置要使用 jmx_exporter/prometheus 捕获的 kafka-connect jmx 指标?