Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中
Posted
技术标签:
【中文标题】Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中【英文标题】:Kafka Connect: Read JSON serialized Kafka message, convert to Parquet format and persist in S3 【发布时间】:2021-08-09 22:41:08 【问题描述】:我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保存在 S3 中。
背景
官方S3-Sink-Connector支持Parquet输出格式但是:
您必须将 AvroConverter、ProtobufConverter 或 JsonSchemaConverter 与 ParquetFormat 一起用于此连接器。尝试使用 JsonConverter(带或不带模式)会导致 NullPointerException 和 ***Exception。
还有JsonSchemaConverter throws out an error if the message was not written using JSON Schema serialization。
问题陈述
所以,我正在寻找一种方法来从最初以 JSON 格式编写的 Kafka 主题读取消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入 S3 连接器,该连接器将以 Parquet 格式写入 S3 .
或者,鉴于主要要求(获取 Kafka 消息,将它在 S3 中作为 Parquet 文件)。谢谢!
PS:不幸的是,目前我无法更改这些 Kafka 消息最初的编写方式(例如使用 JSON Schema serialization 和 Schema Discovery)。
【问题讨论】:
不涉及编写 JAVA 代码 - github.com/pinterest/secor 或 Apache Nifi ... 【参考方案1】:一般来说,您的数据需要有一个模式,因为 Parquet 需要它(S3 Parquet 编写器作为中间步骤转换为 Avro)
您可以考虑使用接受架构的this Connect transform,并尝试应用 JSON 架构 - see tests。由于这会返回一个Struct
对象,因此您可以尝试将JsonSchemaConverter
用作接收器的一部分。
但是,如果您只是将随机 JSON 数据放入一个没有任何一致字段或值的主题中,那么您将很难应用任何架构
【讨论】:
以上是关于Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中的主要内容,如果未能解决你的问题,请参考以下文章
在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector
使用JsonConverter的Kafka Connect HDFS Sink for JSON格式
如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?
如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]