Debezium - 自定义负载 - MySQL 连接器

Posted

技术标签:

【中文标题】Debezium - 自定义负载 - MySQL 连接器【英文标题】:Debezium - Custom Payload - MySQL Connector 【发布时间】:2020-04-10 07:37:09 【问题描述】:

我正在使用 Debezium 将数据从 mysql 同步到 S3。现在我想做一些改变。

示例插入:

create table new (id int);
insert into new (1);

1。自定义负载


    "schema": 
        "type": "struct",
        bla bla bla
        "optional": false,
        "name": "_72.31.84.129.test.new.Envelope"
    ,
    "payload": 
        "before": null,
        "after": 
            "id": 10
        ,
        "source": 
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "11.11.84.129",
            "ts_ms": 1576605998000,
            "snapshot": "false",
            "db": "test",
            "table": "new",
            "server_id": 1,
            "gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
            "file": "mysql-bin.000003",
            "pos": 12770,
            "row": 0,
            "thread": 47,
            "query": null
        ,
        "op": "c",
        "ts_ms": 1576605998231
    

我只想推送带有一些自定义更改的有效负载选项。我需要在payload.after 中包含source,op,ts_ms

预期输出:


            "id": 10, 
            "source": 
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "11.11.84.129",
            "ts_ms": 1576605998000,
            "snapshot": "false",
            "db": "test",
            "table": "new",
            "server_id": 1,
            "gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
            "file": "mysql-bin.000003",
            "pos": 12770,
            "row": 0,
            "thread": 47,
            "query": null
        ,
        "op": "c",
        "ts_ms": 1576605998231
        

我不想要架构,payload.before。我不确定如何获得此输出。

【问题讨论】:

【参考方案1】:

查看SMT 以提取新记录状态。它只会传播after 中的内容。或者,您也可以让它从 source 添加选择的字段。

...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...

您不能在 atm. 中插入 opts_ms 字段,但它们可以作为消息头传播。

【讨论】:

以上是关于Debezium - 自定义负载 - MySQL 连接器的主要内容,如果未能解决你的问题,请参考以下文章

使用 Debezium 的 Quarkus 发件箱模式:如何将自定义列添加到发件箱表

FlinkCDC自定义反序列化器

干货 | Debezium实现Mysql到Elasticsearch高效实时同步

基于 Kafka 与 Debezium 构建实时数据同步

Debezium - MySQL 连接器 - Kinesis - 服务未启动

Debezium的基本使用(以MySQL为例)