提取和转换 jdbc sink 连接器的 kafka 消息特定字段

Posted

技术标签:

【中文标题】提取和转换 jdbc sink 连接器的 kafka 消息特定字段【英文标题】:extract and transform kafka message specific fields for jdbc sink connector 【发布时间】:2020-07-31 21:50:59 【问题描述】:

我有一个 kafka 主题,它使用 Debezium mysql 源连接器从 mysql 数据库中获取数据,以下是其中一条消息的格式:


    "Message": 
        "schema": 
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        ,
        "payload": 
            "op": "u",
            "ts_ms": 1465491411815,
            "before": 
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            ,
            "after": 
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            ,
            "source": 
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            
        
    

我想使用 jdbc sink 连接器将 ts_ms, before, afterid(从对象/行)列推送到另一个数据库,表架构为 (id,before(text),after(text),timestamp),对于 kafka 的新手无法弄清楚:

如何仅从消息中提取这些字段以推送并忽略其他字段?

如何将字段前后转换为字符串/序列化格式?

如何从对象中提取id? (插入操作,before为null,delete,after为null)

对于上面的消息,sink目标表最后应该有如下数据:

id:     1004
before: '"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"'
after:  '"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"'
timestamp: 1465491411815

【问题讨论】:

【参考方案1】:

您可以使用Kafka Connect Transformations 的链,例如solution。

【讨论】:

感谢@Iskuskov Alexander,您能否设计一些具体的东西来满足这两个需求,转换AND Json 序列化? 你需要beforeafter作为字符串吗? @@Iskuskov Alexander 是的 我认为您可以执行以下步骤: 1. 使用 JDBC Sink Connector 的 record_key 属性从消息键中使用 id; 2、afterbefore字段使用Cast SMT; 3、使用Replace SMT重命名ts_ms; 4. 使用 Replace SMT 过滤字段【参考方案2】:

您可以创建一个 DTO(您从 kafka 主题中获得的 json 有效负载的 Java 对象)利用此在线转换器帮助您将 json 转换为 Java 对象。 [http://pojo.sodhanalibrary.com/][1]

一旦您收到来自您的 kafka 主题的消息,您就可以使用 objectmapper 转换该 json 并将其映射到您适当的 DTO 对象。一旦您准备好对象。您可以使用该对象通过调用 getId()、getBefore() 等来提取所需的字段,

这里有一些参考代码可以帮助你理解:

    @KafkaListener(topics = "test")
        public void listen(String payload)  

            logger.info("Message Received from Kafka topic: ", payload);

            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);

                logger.info("After Convertion: ", objectMapper.writeValueAsString(dtoObject));

                logger.info("Get Before:", dtoObject.getId());



        

【讨论】:

这是和 SMT 还是什么,正如我提到的,我正在为目标数据库使用 jdbc 接收器连接器,我怎样才能使用你提到的这段代码 sn-p 以及在哪里?谢谢

以上是关于提取和转换 jdbc sink 连接器的 kafka 消息特定字段的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 FME 处理 Kafka JDBC Sink 连接器

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

Flink JDBC Sink 和连接池

Kafka HDFS Sink 连接器错误。[***类型必须是 STRUCT ..]

如何使用 kafka 连接 JDBC sink 和 source 使用 python

kafka 连接器 jdbc-sink 最后出现语法错误