使用 Kafka Connect 时如何转换所有时间戳字段?
Posted
技术标签:
【中文标题】使用 Kafka Connect 时如何转换所有时间戳字段?【英文标题】:How to transform all timestamp fields when using Kafka Connect? 【发布时间】:2019-08-22 22:59:23 【问题描述】:我正在尝试将所有时间戳字段转换为格式为 yyyy-MM-dd HH:mm:ss
的字符串类型。
要转换多个字段,我必须为每个字段单独创建一个转换。
...
"transforms":"tsFormat1,tsFormat2,...,tsFormatN",
"transforms.tsFormat1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat1.target.type": "string",
"transforms.tsFormat1.field": "ts_col1",
"transforms.tsFormat1.format": "yyyy-MM-dd HH:mm:ss",
"transforms.tsFormat2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat2.target.type": "string",
"transforms.tsFormat2.field": "ts_col2",
"transforms.tsFormat2.format": "yyyy-MM-dd HH:mm:ss",
...
"transforms.tsFormatN.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormatN.target.type": "string",
"transforms.tsFormatN.field": "ts_colN",
"transforms.tsFormatN.format": "yyyy-MM-dd HH:mm:ss",
...
有没有办法对所有时间戳列应用单一转换?
我试过了,
...
"transforms":"tsFormat",
"transforms.tsFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat.target.type": "string",
"transforms.tsFormat.field": "ts_col1, ts_col2,..., ts_colN",
"transforms.tsFormat.format": "yyyy-MM-dd HH:mm:ss",
...
和
...
"transforms":"tsFormat",
"transforms.tsFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat.target.type": "string",
"transforms.tsFormat.field": "ts_col1",
"transforms.tsFormat.field": "ts_col2",
...
"transforms.tsFormat.field": "ts_colN",
"transforms.tsFormat.format": "yyyy-MM-dd HH:mm:ss",
...
更好的是数字类型匹配"numeric.mapping": "best_fit"
。就像numeric.mapping
如何应用于所有数字字段(无需手动指定字段名称)以尝试找到最佳数字类型一样,有没有这样的东西可以为所有时间戳字段应用转换或字符串格式?
【问题讨论】:
.field
是单数...numeric.mapping
不是 Connect API 的一部分,那么您使用的是哪个连接器?
我知道.field
是单数。这就是为什么我要问是否有其他方法可以做到这一点。我正在使用io.confluent.connect.jdbc.JdbcSourceConnector
连接器。除了为每个时间戳单独创建一个转换之外,还有其他方法可以格式化所有时间戳列吗?
没有... 选项1)最初用正确的时间戳写入数据。选项 2) 在 Connect 的消费者之前使用 Kafka Streams 或 KSQL。选项3)让数据按原样登陆数据库,然后在应用程序中实际显示数据时使用数据库函数对其进行解析
【参考方案1】:
我在原来的 TimestampConverter
的基础上做了一些小调整,效果就像你想要的一样完美:
https://github.com/howareyouo/kafka-connect-timestamp-converter
【讨论】:
【参考方案2】:有一个Github repo 提供自定义转换器将所有时间戳消息转换为字符串日期时间格式。
所以你的代码应该是这样的
"converters": "timestampConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter",
"timestampConverter.format.datetime": "YYYY-MM-dd HH:mm:ss"
不要忘记将 Timestampconverter.jar 添加到 kafka-connect 的 plugin.path 变量中。
【讨论】:
以上是关于使用 Kafka Connect 时如何转换所有时间戳字段?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Kafka HDFS Connect 写入 HDFS 时出错
kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是不是可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?