Debezium - 自定义负载 - MySQL 连接器
Posted
技术标签:
【中文标题】Debezium - 自定义负载 - MySQL 连接器【英文标题】:Debezium - Custom Payload - MySQL Connector 【发布时间】:2020-04-10 07:37:09 【问题描述】:我正在使用 Debezium 将数据从 mysql 同步到 S3。现在我想做一些改变。
示例插入:
create table new (id int);
insert into new (1);
1。自定义负载
"schema":
"type": "struct",
bla bla bla
"optional": false,
"name": "_72.31.84.129.test.new.Envelope"
,
"payload":
"before": null,
"after":
"id": 10
,
"source":
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
,
"op": "c",
"ts_ms": 1576605998231
我只想推送带有一些自定义更改的有效负载选项。我需要在payload.after
中包含source,op,ts_ms
。
预期输出:
"id": 10,
"source":
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
,
"op": "c",
"ts_ms": 1576605998231
我不想要架构,payload.before。我不确定如何获得此输出。
【问题讨论】:
【参考方案1】:查看SMT 以提取新记录状态。它只会传播after
中的内容。或者,您也可以让它从 source
添加选择的字段。
...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...
您不能在 atm. 中插入 op
和 ts_ms
字段,但它们可以作为消息头传播。
【讨论】:
以上是关于Debezium - 自定义负载 - MySQL 连接器的主要内容,如果未能解决你的问题,请参考以下文章
使用 Debezium 的 Quarkus 发件箱模式:如何将自定义列添加到发件箱表
干货 | Debezium实现Mysql到Elasticsearch高效实时同步