如何配置 Debezium Mysql 连接器以生成原始键而不是 struct 或 json 对象?

Posted

技术标签:

【中文标题】如何配置 Debezium Mysql 连接器以生成原始键而不是 struct 或 json 对象?【英文标题】:How to configure Debezium Mysql connector to produce primitive key instead of struct or json object? 【发布时间】:2019-07-31 13:15:05 【问题描述】:

我正在使用 Debezium 来检测 mysql 源表中的更改。如何生成 Kafka 消息,使得键是数字 (Long) 值而不是 Json 对象?

我得到了什么:

key: "foo_id": 123 
value: "foo_id": 123, "bar": "blahblah", "baz": "meh......"

我想要什么:

key: 123
value: "foo_id": 123, "bar": "blahblah", "baz": "meh......"

我的 FOO 表如下所示:

foo_id: INT
bar: VARCHAR 
baz: VARCHAR

请注意,我没有使用 avro,并且我尝试了以下几种组合(带和不带密钥转换器),但未能获得 Long 密钥。

"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",        
"key.converter" : "org.apache.kafka.connect.converters.LongConverter",
"key.converter.schemas.enable": "false", 
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"

我不确定 ValueToKey 或 ExtractField 是否适用于 (MySQL) 源,但我低于 NPE。

Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) 

【问题讨论】:

【参考方案1】:

根据这个https://issues.jboss.org/browse/DBZ-689找到了解决方案


...
    "config": 
    "transforms": "unwrap,insertKey,extractKey",
    "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones":"false",
    "transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.insertKey.fields":"foo_id",
    "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field":"foo_id",        
    "key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
    "key.converter.schemas.enable": "true", 
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "include.schema.changes": "false"  <-- this was missing
    

现在,我将 foo_id 视为 Integer(没什么大不了的,它不是 Long):)

【讨论】:

以上是关于如何配置 Debezium Mysql 连接器以生成原始键而不是 struct 或 json 对象?的主要内容,如果未能解决你的问题,请参考以下文章

为数据库中的多个表配置 debezium 连接器

如何将新表添加到 Debezium MySQL 连接器?

如何为 mysql 数据库创建多个 Debezium 连接器

Debezium MySQL错误:连接密码为空

使用表白名单选项更新 Debezium MySQL 连接器

Debezium 连接器发件箱转换