如何在使用 REST API 创建 Kafka 连接器时定义模式
Posted
技术标签:
【中文标题】如何在使用 REST API 创建 Kafka 连接器时定义模式【英文标题】:How to define schema while creating a Kafka connector using REST API 【发布时间】:2020-01-30 10:14:42 【问题描述】:我已将 Kafka 连接工作器配置为在集群中运行并能够获取数据库数据。我还以 JSON 格式将数据库数据存储在 Kafka 主题中。这里我使用 JSON 转换器来序列化数据
在 Kafka 消费者控制台中查看数据库数据时,我可以看到 UserCreatedon 列值显示为整数。 DB中UserCreatedon列值的数据类型是int64(unix epoch time),这就是为什么Kafka消费者将时间戳值显示为int
有没有办法在连接器创建期间发送架构。因为我希望 UserCreatedon 应该以时间戳格式而不是 int 格式显示
样本输出
"schema":"type":"struct","fields":["type":"string","optional":false,"field":"NAME","type" :"int64","可选":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"UserCreatedON"],"可选":false ,"payload":"NAME":"UserProvision","UserCreatedon":1567688965261
请在这里多多支持。
【问题讨论】:
【参考方案1】:您还没有提到您使用哪种类型的连接器将数据从数据库传输到 Kafka。 Kafka connect 支持转换器。
单消息转换 (SMT) 应用于消息,因为它们 流过连接。 SMT 在源之后转换入站消息 连接器已经生成了它们,但在它们被写入 Kafka 之前
见here
具体来说,您可以使用TimestampConvertor
【讨论】:
以上是关于如何在使用 REST API 创建 Kafka 连接器时定义模式的主要内容,如果未能解决你的问题,请参考以下文章
如何将 SPARK/Flink 流数据处理创建为微服务(REST API)
保护对 Kafka Connect 的 REST API 的访问
将 Flask Restless API 连接到 Admin-on-rest (React)
如何使延迟加载 Apache Spark Dataframe 连接到 REST API