Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库

Posted

技术标签:

【中文标题】Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库【英文标题】:Is there a way for Kafka Connect to ignore 1 field in the schema file and read the other fields into the db 【发布时间】:2021-08-02 08:42:18 【问题描述】:

我正在尝试将此架构文件放入数据库中,其中一种字段类型是布尔值映射

reservation table

create table "table-reservation" (
  "sessionId" varchar,
  "enteredReservation" boolean,
  latitude double,
  longitude double

架构文件


  "name":"sessionId",
  "type":"string
,
 
 "name":"enteredReservation",
 "type": "map",
 "values":"boolean"
,

 "name":"latitude",
 "type":"double"
,

 "name":"longitude",
 "type":"double"

接收器连接器的配置

"tasks.max":"1",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"URL",
"topics":"table-reservation",
"quote.sql.identifiers":"always",
"errors.tolerance":"all",
"errors.log.enable":"true",
"errors.log.include.messages":"true"

有没有办法只忽略enteredReservation 字段并将其设置为null。我读到如果我们使用 SMT DROP 它可以将其设置为 null DROP SMT 但是,它如何知道要删除哪个字段,而其他字段保持不变。或者 JDBC 接收器如何转换布尔值映射。有没有办法做到这一点?

【问题讨论】:

1.什么是“主题”? 2. 什么是“场”?这些不是 SQL 术语。 @OneCricketeer 这里的主题是餐桌预订和餐桌会话。这些字段是 db 模式中的列名。这次为了避免 SMT,我决定将模式中的表名和字段名双引号以匹配模式文件。但是我没有看到连接器出现并且消息读入数据库。知道为什么吗?我添加了 quote.sql.identifier 来双引号。我最初添加了 SMT 来重命名主题的字段,但是当我添加更多主题时它遇到了问题 @OneCricketeer 我更新了我的帖子,你可以看看 drop SMT 使整个键或值无效,而不是字段。我觉得我已经多次说过 Kafka Streams 或 KSQL 真的应该是你的解决方案。 SMT 不适用于非常简单的用例之外 - confluent.io/kafka-summit-nyc17/… 或者,使用文档数据库而不是平面关系数据库表 @OneCricketeer 在生产者端有一个 Kafka Streams 来产生消息。但是,在读取数据库的消费者方面,日志说“enteredReservation”是一种类型 MAP 而不是布尔型,在这种情况下是布尔型映射。有没有办法让消费者映射它,因为如果我在生产者端删除它会完全影响服务 【参考方案1】:

下面的配置会忽略你想要的字段:

“变换”:“掩码”,

“transforms.mask.type”:“org.apache.kafka.connect.transforms.MaskField$Value”,

"transforms.mask.fields": "enteredReservation"

但它可能不适用于字符串和数字以外的类型。所以,你也可以试试黑名单:

“变换”:“替换字段”,

“transforms.ReplaceField.type”: "org.apache.kafka.connect.transforms.ReplaceField$Value",

"transforms.ReplaceField.blacklist": "enteredReservation"

【讨论】:

以上是关于Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect HDFS 在 Confluent v4.0 中忽略了 flush.size

有没有办法检测 FAILED kafka connect 任务

有没有办法配置要使用 jmx_exporter/prometheus 捕获的 kafka-connect jmx 指标?

没有模式注册表的 Kafka-connect

Kafka-connect 是不是必须使用模式注册表?

有没有办法将融合模式注册表与 kafka-node 模块一起使用?