Kafka Connect 对 debezium 生成事件的日期处理
Posted
技术标签:
【中文标题】Kafka Connect 对 debezium 生成事件的日期处理【英文标题】:Kafka Connect date handling of debezium generated events 【发布时间】:2019-12-07 19:44:55 【问题描述】:我正在使用 debezium SQL Server 来跟踪生产基地的变化。 主题已创建,CDC 工作正常,但尝试使用 jdbcSinkConnector 将数据转储到另一个 Sql Server DB 时,遇到以下错误。
com.microsoft.sqlserver.jdbc.SQLServerException: One or more values is out of range of values for the datetime2 SQL Server data type
在源数据库上,sql 数据类型是timestamp2(7)
。
卡夫卡事件是1549461754650000000。
架构类型是 INT64。
架构名称 io.debezium.time.NanoTimestamp。
我找不到一种方法来告诉 TimestampConverter 值不是以毫秒或微秒表示,而是以纳秒表示(无论如何,微秒不适用)。
这是我的连接器配置
"name": "cdc.swip.bi.ods.sink.contract",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "swip.swip_core.contract",
"connection.url": "jdbc:sqlserver://someip:1234;database=DB",
"connection.user": "loloolololo",
"connection.password": "muahahahahaha",
"dialect.name": "SqlServerDatabaseDialect",
"auto.create": "false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "unwrap,created_date,modified_date",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.created_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.created_date.target.type": "Timestamp",
"transforms.created_date.field": "created_date",
"transforms.modified_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.modified_date.target.type": "Timestamp",
"transforms.modified_date.field": "modified_date",
"insert.mode": "insert",
"delete.enabled": "false",
"pk.fields": "id",
"pk.mode": "record_value",
"schema.registry.url": "http://localhost:8081",
"table.name.format": "ODS.swip.contract"
【问题讨论】:
【参考方案1】:我忘记发布答案了。
"time.precision.mode":"connect"
属性可以解决问题
https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-property-time-precision-mode
"name":"debezium-connector-sqlserver",
"config":
"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"true",
"database.hostname":"someHost",
"database.port":"somePort",
"database.user":"someUser",
"database.password":"somePassword",
"database.dbname":"someDb",
"database.server.name":"xxx.xxx",
"database.history.kafka.topic":"xxx.xxx.history",
"time.precision.mode":"connect",
"database.history.kafka.bootstrap.servers":"example.com:9092"
【讨论】:
【参考方案2】:SQL Server 连接器中缺少一个功能 - DBZ-1419。
您可以通过编写自己的 SMT 来解决该问题,该 SMT 将在接收端进行字段转换,然后再由 JDBC 连接器处理。
【讨论】:
我看了一下连接转换。我即将编写一个 DebeziumTemporalConverter 来应用所需的转换。 link 无论如何,datetimeoffset 在 a** 中也是一个真正的痛点,因为 SQL Server debezium 连接器更改偏移量以传递带有 GMT 参考的 iso 8601 字符串。即使时间相应更改,时区偏移现在也会丢失。以上是关于Kafka Connect 对 debezium 生成事件的日期处理的主要内容,如果未能解决你的问题,请参考以下文章
如何将 kafka-connect-jdbc-5.5.0.jar 添加到 Debezium/connect
无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器
Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题
如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?