为jdbc sink连接器提取和转换kafka消息的特定字段。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为jdbc sink连接器提取和转换kafka消息的特定字段。相关的知识,希望对你有一定的参考价值。

我有一个kafka主题,使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一个消息的格式。

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

我想推送 ts_ms, before, afterid (从objectrow)列到另一个数据库,使用jdbc sink连接器,表的模式是 (id,before(text),after(text),timestamp)我是一个新的kafka新手,不知道该怎么做。

  • 我怎样才能从消息中只提取这些字段来推送而忽略其他字段?

  • 我怎么能把之前,之后的字段转换成字符串序列化格式?

  • 我怎么能提取 id 从对象? (如果是插入操作,前面为空,如果是删除,后面为空)

对于上面的消息,sink目的表的最后应该有下面这样的数据。

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815
答案

你可以使用连锁的 Kafka连接转换,像这样 解决办法.

另一答案

你可以创建一个DTO(Java对象为你的json有效载荷,你从你的kafka主题)利用这个在线转换器帮助你转换你的json到Java对象。[http:/pojo.sodhanalibrary.com][1]。

一旦你从你的kafka主题中接收到你的消息,你就可以使用objectmapper来转换json,并将其映射到你相应的DTO对象中。你可以利用这个对象来提取你想要的字段,只需调用getId(),getBefore()等。

下面是一些参考代码,有助于你理解。

    @KafkaListener(topics = "test")
        public void listen(String payload)  {

            logger.info("Message Received from Kafka topic: {}", payload);

            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);

                logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));

                logger.info("Get Before:{}", dtoObject.getId());



        }

以上是关于为jdbc sink连接器提取和转换kafka消息的特定字段。的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

Kafka JDBC Sink 句柄数组数据类型

kafka 连接器 jdbc-sink 最后出现语法错误

如何使用 kafka 连接 JDBC sink 和 source 使用 python

使用 Kafka Connect API JDBC Sink 连接器示例到 Oracle 数据库的 Kafka 主题

如何使用 FME 处理 Kafka JDBC Sink 连接器