提取和转换 jdbc sink 连接器的 kafka 消息特定字段
Posted
技术标签:
【中文标题】提取和转换 jdbc sink 连接器的 kafka 消息特定字段【英文标题】:extract and transform kafka message specific fields for jdbc sink connector 【发布时间】:2020-07-31 21:50:59 【问题描述】:我有一个 kafka 主题,它使用 Debezium mysql 源连接器从 mysql 数据库中获取数据,以下是其中一条消息的格式:
"Message":
"schema":
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
,
"payload":
"op": "u",
"ts_ms": 1465491411815,
"before":
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "annek@noanswer.org"
,
"after":
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "annek@noanswer.org"
,
"source":
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
我想使用 jdbc sink 连接器将 ts_ms, before, after
和 id
(从对象/行)列推送到另一个数据库,表架构为 (id,before(text),after(text),timestamp)
,对于 kafka 的新手无法弄清楚:
如何仅从消息中提取这些字段以推送并忽略其他字段?
如何将字段前后转换为字符串/序列化格式?
如何从对象中提取id
? (插入操作,before为null,delete,after为null)
对于上面的消息,sink目标表最后应该有如下数据:
id: 1004
before: '"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"'
after: '"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"'
timestamp: 1465491411815
【问题讨论】:
【参考方案1】:您可以使用Kafka Connect Transformations 的链,例如solution。
【讨论】:
感谢@Iskuskov Alexander,您能否设计一些具体的东西来满足这两个需求,转换AND Json 序列化? 你需要before
和after
作为字符串吗?
@@Iskuskov Alexander 是的
我认为您可以执行以下步骤: 1. 使用 JDBC Sink Connector 的 record_key
属性从消息键中使用 id
; 2、after
和before
字段使用Cast SMT; 3、使用Replace SMT重命名ts_ms; 4. 使用 Replace SMT 过滤字段【参考方案2】:
您可以创建一个 DTO(您从 kafka 主题中获得的 json 有效负载的 Java 对象)利用此在线转换器帮助您将 json 转换为 Java 对象。 [http://pojo.sodhanalibrary.com/][1]
一旦您收到来自您的 kafka 主题的消息,您就可以使用 objectmapper 转换该 json 并将其映射到您适当的 DTO 对象。一旦您准备好对象。您可以使用该对象通过调用 getId()、getBefore() 等来提取所需的字段,
这里有一些参考代码可以帮助你理解:
@KafkaListener(topics = "test")
public void listen(String payload)
logger.info("Message Received from Kafka topic: ", payload);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);
logger.info("After Convertion: ", objectMapper.writeValueAsString(dtoObject));
logger.info("Get Before:", dtoObject.getId());
【讨论】:
这是和 SMT 还是什么,正如我提到的,我正在为目标数据库使用 jdbc 接收器连接器,我怎样才能使用你提到的这段代码 sn-p 以及在哪里?谢谢以上是关于提取和转换 jdbc sink 连接器的 kafka 消息特定字段的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 FME 处理 Kafka JDBC Sink 连接器
Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON
Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]