使用 Debezium 提取密钥的转换中不存在字段

Posted

技术标签:

【中文标题】使用 Debezium 提取密钥的转换中不存在字段【英文标题】:Field does not exist on transformations to extract key with Debezium 【发布时间】:2020-09-07 04:18:48 【问题描述】:

我正在尝试创建一个 Debezium mysql 连接器,并通过转换来提取密钥。

键转换前:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'before',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "transforms" = 'unwrap',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');

主题结果是:

> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: "id": "P195910", "price": "1511.64"

当key.converter设置为JSON时,Key变为"id": "P195910"

所以,我想从 key 中提取 id 并使其成为字符串 key:

预期结果:

rowtime: 2020/05/20 16:47:23.354 Z, 
key: 'P195910', 
value: "id": "P195910", "price": "1511.64"   

在尝试使用ExtractFieldValueToKey 进行转换时,我得到:

DataException: Field does not exist: id

我尝试使用包含ValueToKey的指令:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'after',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable" = 'TRUE',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "value.converter.schemas.enable" = 'TRUE',
    "transforms" = 'unwrap,createkey',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
    "transforms.createkey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createkey.fields" = 'id'
    );

在我的 Kafka-connect 日志中导致以下错误:

Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)

【问题讨论】:

为什么不指定 debezium 配置,而不是 SMT,您希望 'id' 列作为键? "message.key.columns": ".:" 【参考方案1】:

将转换类型从UnwrapFromEnvelope 更改为ExtractNewRecordState,解决了Debezium MySQL CDC Connector 版本1.1.0 上的问题。

transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState'

【讨论】:

【参考方案2】:

由于您在此处使用 ksqlDB,因此您需要设置源连接器以将密钥写入字符串:

key.converter=org.apache.kafka.connect.storage.StringConverter

【讨论】:

debezium Transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',而不是 UnwrapFromEnvelope 转换解决了这个问题。但是,我不知道为什么! UnwrapFromEnvelope 已在 1.2.0.Debezium Alpha1 中弃用并删除

以上是关于使用 Debezium 提取密钥的转换中不存在字段的主要内容,如果未能解决你的问题,请参考以下文章

Debezium SMT 转换重新路由关键字段设置不起作用

@JsonIgnoreProperties转换实体时忽略json中不存在的字段

@JsonIgnoreProperties转换实体时忽略json中不存在的字段

pig - 将数据从行转换为列,同时为特定行中不存在的字段插入占位符

Debezium 时间戳问题,无法转换为本地时区

table.include.list 配置参数在 Debezium Postgres 连接器中不起作用