Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

Posted

技术标签:

【中文标题】Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中【英文标题】:Kafka Connect: Read JSON serialized Kafka message, convert to Parquet format and persist in S3 【发布时间】:2021-08-09 22:41:08 【问题描述】:

我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保存在 S3 中。

背景

官方S3-Sink-Connector支持Parquet输出格式但是:

您必须将 AvroConverter、ProtobufConverter 或 JsonSchemaConverter 与 ParquetFormat 一起用于此连接器。尝试使用 JsonConverter(带或不带模式)会导致 NullPointerException 和 ***Exception。

还有JsonSchemaConverter throws out an error if the message was not written using JSON Schema serialization。

问题陈述

所以,我正在寻找一种方法来从最初以 JSON 格式编写的 Kafka 主题读取消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入 S3 连接器,该连接器将以 Parquet 格式写入 S3 .

或者,鉴于主要要求(获取 Kafka 消息,将它在 S3 中作为 Parquet 文件)。谢谢!

PS:不幸的是,目前我无法更改这些 Kafka 消息最初的编写方式(例如使用 JSON Schema serialization 和 Schema Discovery)。

【问题讨论】:

不涉及编写 JAVA 代码 - github.com/pinterest/secor 或 Apache Nifi ... 【参考方案1】:

一般来说,您的数据需要有一个模式,因为 Parquet 需要它(S3 Parquet 编写器作为中间步骤转换为 Avro)

您可以考虑使用接受架构的this Connect transform,并尝试应用 JSON 架构 - see tests。由于这会返回一个Struct 对象,因此您可以尝试将JsonSchemaConverter 用作接收器的一部分。

但是,如果您只是将随机 JSON 数据放入一个没有任何一致字段或值的主题中,那么您将很难应用任何架构

【讨论】:

以上是关于Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中的主要内容,如果未能解决你的问题,请参考以下文章

在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector

使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

Kafka Connect:如何将String解析为Map

如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

深入理解Kafka Connect:转换器和序列化