使用 Kafka Connect 时如何转换所有时间戳字段?

Posted

技术标签:

【中文标题】使用 Kafka Connect 时如何转换所有时间戳字段?【英文标题】:How to transform all timestamp fields when using Kafka Connect? 【发布时间】:2019-08-22 22:59:23 【问题描述】:

我正在尝试将所有时间戳字段转换为格式为 yyyy-MM-dd HH:mm:ss 的字符串类型。

要转换多个字段,我必须为每个字段单独创建一个转换。

...
"transforms":"tsFormat1,tsFormat2,...,tsFormatN",
"transforms.tsFormat1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat1.target.type": "string",
"transforms.tsFormat1.field": "ts_col1",
"transforms.tsFormat1.format": "yyyy-MM-dd HH:mm:ss",
"transforms.tsFormat2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat2.target.type": "string",
"transforms.tsFormat2.field": "ts_col2",
"transforms.tsFormat2.format": "yyyy-MM-dd HH:mm:ss",
...
"transforms.tsFormatN.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormatN.target.type": "string",
"transforms.tsFormatN.field": "ts_colN",
"transforms.tsFormatN.format": "yyyy-MM-dd HH:mm:ss",
...

 

有没有办法对所有时间戳列应用单一转换?

我试过了,

...
"transforms":"tsFormat",
"transforms.tsFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat.target.type": "string",
"transforms.tsFormat.field": "ts_col1, ts_col2,..., ts_colN",
"transforms.tsFormat.format": "yyyy-MM-dd HH:mm:ss",
...

...
"transforms":"tsFormat",
"transforms.tsFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat.target.type": "string",
"transforms.tsFormat.field": "ts_col1",
"transforms.tsFormat.field": "ts_col2",
...
"transforms.tsFormat.field": "ts_colN",
"transforms.tsFormat.format": "yyyy-MM-dd HH:mm:ss",
...

 


更好的是数字类型匹配"numeric.mapping": "best_fit"。就像numeric.mapping 如何应用于所有数字字段(无需手动指定字段名称)以尝试找到最佳数字类型一样,有没有这样的东西可以为所有时间戳字段应用转换或字符串格式?

【问题讨论】:

.field 是单数...numeric.mapping 不是 Connect API 的一部分,那么您使用的是哪个连接器? 我知道.field 是单数。这就是为什么我要问是否有其他方法可以做到这一点。我正在使用io.confluent.connect.jdbc.JdbcSourceConnector 连接器。除了为每个时间戳单独创建一个转换之外,还有其他方法可以格式化所有时间戳列吗? 没有... 选项1)最初用正确的时间戳写入数据。选项 2) 在 Connect 的消费者之前使用 Kafka Streams 或 KSQL。选项3)让数据按原样登陆数据库,然后在应用程序中实际显示数据时使用数据库函数对其进行解析 【参考方案1】:

我在原来的 TimestampConverter 的基础上做了一些小调整,效果就像你想要的一样完美: https://github.com/howareyouo/kafka-connect-timestamp-converter

【讨论】:

【参考方案2】:

有一个Github repo 提供自定义转换器将所有时间戳消息转换为字符串日期时间格式。

所以你的代码应该是这样的

"converters": "timestampConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter",
"timestampConverter.format.datetime": "YYYY-MM-dd HH:mm:ss"

不要忘记将 Timestampconverter.jar 添加到 kafka-connect 的 plugin.path 变量中。

【讨论】:

以上是关于使用 Kafka Connect 时如何转换所有时间戳字段?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:如何将String解析为Map

深入理解Kafka Connect:转换器和序列化

使用 Kafka HDFS Connect 写入 HDFS 时出错

kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?

Kafka Connect 使用数组字段展平 postgres 记录的转换

Kafka Connect - 如何删除连接器