为jdbc sink连接器提取和转换kafka消息的特定字段。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为jdbc sink连接器提取和转换kafka消息的特定字段。相关的知识,希望对你有一定的参考价值。
我有一个kafka主题,使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一个消息的格式。
{
"Message": {
"schema": {
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "annek@noanswer.org"
},
"source": {
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
}
}
}
}
我想推送 ts_ms, before, after
和 id
(从objectrow)列到另一个数据库,使用jdbc sink连接器,表的模式是 (id,before(text),after(text),timestamp)
我是一个新的kafka新手,不知道该怎么做。
我怎样才能从消息中只提取这些字段来推送而忽略其他字段?
我怎么能把之前,之后的字段转换成字符串序列化格式?
我怎么能提取
id
从对象? (如果是插入操作,前面为空,如果是删除,后面为空)
对于上面的消息,sink目的表的最后应该有下面这样的数据。
id: 1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after: '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815
你可以创建一个DTO(Java对象为你的json有效载荷,你从你的kafka主题)利用这个在线转换器帮助你转换你的json到Java对象。[http:/pojo.sodhanalibrary.com][1]。
一旦你从你的kafka主题中接收到你的消息,你就可以使用objectmapper来转换json,并将其映射到你相应的DTO对象中。你可以利用这个对象来提取你想要的字段,只需调用getId(),getBefore()等。
下面是一些参考代码,有助于你理解。
@KafkaListener(topics = "test")
public void listen(String payload) {
logger.info("Message Received from Kafka topic: {}", payload);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);
logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));
logger.info("Get Before:{}", dtoObject.getId());
}
以上是关于为jdbc sink连接器提取和转换kafka消息的特定字段。的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON
如何使用 kafka 连接 JDBC sink 和 source 使用 python