如何获取来自 debezium 到 yyyy-mm-dd-ss.zzz 格式的所有日期时间值

Posted

技术标签:

【中文标题】如何获取来自 debezium 到 yyyy-mm-dd-ss.zzz 格式的所有日期时间值【英文标题】:How to get all the datetime values coming from debezium to yyyy-mm-dd-ss.zzz format 【发布时间】:2021-01-16 22:37:40 【问题描述】:

我想知道如何以 YYYY-MM-DD hh:mm:ss 格式从 Debezium sql server 连接器获取所有 DATETIME(io.debezium.time.Timestamp)。现在这是以 epoch int64 格式给出的,这将很难使用 spark 转换数据。

debezium sql server 连接器的配置如下:


    "name": "localDB-sqlserverconnector",
    "config": 
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname": "192.168.1.22",
        "database.port": "1433",
        "database.user": "user",
        "database.password": "user_123",
        "database.dbname": "localDB",
        "database.server.name": "DEV1",
        "table.whitelist": "dbo.testtabledebezium",
        "database.history.kafka.bootstrap.servers": "192.168.1.81:32105",
        "database.history.kafka.topic": "history_DB.DEV1",
        "include.schema.changes": false,
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": true,
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "snapshot.mode": "schema_only"
        
    

debezium 主题 DEV1.dbo.testtabledebezium 的结果

"id":9,"column1":"t6","column2":1601480866593,"column3":18535,"__deleted":"false"  

Sqlserver表实际数据

|id|    |column1|   |column2                |   |column3   | =>
|9 |    |t6     |   |2020-09-30 15:47:46.593|   |2020-09-30|

实际要求

"id":9,"column1":"t6","column2":"2020-09-30 15:47:46.593","column3":"2020-09-30","__deleted":"false"  

【问题讨论】:

【参考方案1】:

将 "time.precision.mode": "connect" 添加到您的连接器配置中。

参考https://debezium.io/documentation/reference/1.3/connectors/sqlserver.html#sqlserver-temporal-values

【讨论】:

【参考方案2】:

您需要将以下内容添加到您的 debezium 连接器配置中


    "transforms":"unwrap,col2",  
    "transforms.col2.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.col2.target.type":"string", 
    "transforms.col2.field":"col2", 
    "transforms.col2.format":"YYYY-MM-DD hh:mm:ss" ,
    "time.precision.mode":"connect"
 

也为 col3 添加它。

【讨论】:

以上是关于如何获取来自 debezium 到 yyyy-mm-dd-ss.zzz 格式的所有日期时间值的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Debezium (cdc) 将从 mysql 获取的更改接收到另一个 mysql db

如何在从 debezium kafka connect 收到的 CDC 事件中获取表名和数据库名

如何从 Debezium 创建的 avro 消息中获取字段?

如何将 DateTime 值序列化为“yyyy-MM”? [复制]

如何使用 debezium mysql 连接器 kafka 进行初始快照加载?

干货 | Debezium实现Mysql到Elasticsearch高效实时同步