用于 S3 中 PARQUET 格式的 Kafka S3 源连接器

Posted

技术标签:

【中文标题】用于 S3 中 PARQUET 格式的 Kafka S3 源连接器【英文标题】:Kafka S3 Source Connector for PARQUET format in S3 【发布时间】:2021-10-06 04:45:42 【问题描述】:

我正在使用 Protobuf 生成主题事件。我可以使用 Parquet 格式的 S3 Sink 连接器成功地将我的主题事件接收到 S3 存储桶中。现在我的 S3 存储桶中有 .parquet.key.parquet 类型的对象。使用以下配置,所有这些都按预期工作:


    "name": "s3-sink",
    "config": 
      "name": "s3-sink",
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.schema.registry.url": "https://my-schema-registry",
      "value.converter.basic.auth.credentials.source": "USER_INFO",
      "value.converter.basic.auth.user.info": "MY_SR_API_KEY:MY_SR_API_SECRET",
      "store.kafka.keys": true,
      "parquet.codec": "none",
      "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
      "locale": "en-US",
      "s3.bucket.name": "my-bucket-123",
      "s3.region": "eu-west-1",
      "time.interval": "HOURLY",
      "flush.size": 1000,
      "tasks.max": 1,
      "topics.regex": "test-topic.*",
      "confluent.license": "",
      "confluent.topic.bootstrap.servers": "my-bootstrap-server",
      "confluent.topic.replication.factor": 3,
      "confluent.license.topic.replication.factor": 1,
      "confluent.topic.security.protocol": "SASL_SSL",
      "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
      "confluent.topic.sasl.mechanism": "PLAIN",
      "confluent.topic.ssl.endpoint.identification.algorithm": "https"
    
  

现在我想使用 Protobuf 将 my-bucket-123parquet 格式)的键和值放回 Kafka 主题中。为此,我使用以下配置通过 Confluent 设置了一个新的 S3 源连接器 (confluentinc/kafka-connect-s3-source:1.4.5):


    "name": "s3-source",
    "config": 
      "name": "s3-source",
      "dest.kafka.bootstrap.servers": "my-bootstrap-server",
      "dest.topic.replication.factor": 1,
      "dest.kafka.security.protocol": "SASL_SSL",
      "dest.kafka.sasl.mechanism": "PLAIN",
      "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
      "tasks.max": 1,
      "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "confluent.license": "",
      "confluent.topic.bootstrap.servers": "my-bootstrap-server",
      "confluent.topic.replication.factor": 3,
      "confluent.license.topic.replication.factor": 1,
      "confluent.topic.security.protocol": "SASL_SSL",
      "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
      "confluent.topic.sasl.mechanism": "PLAIN",
      "confluent.topic.ssl.endpoint.identification.algorithm": "https",
      "transforms": "AddPrefix",
      "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.AddPrefix.regex": ".*",
      "transforms.AddPrefix.replacement": "copy_of_$0",
      "s3.region": "eu-west-1",
      "s3.bucket.name": "my-bucket-123"
    

通过使用上述配置,我无法启动我的 S3 源连接器。如果我使用上述配置和命令验证配置:

curl -X PUT -d @config.json --header "content-Type:application/json" http://localhost:8083/connector-plugins/S3SourceConnector/config/validate

我在format.class 属性中收到以下错误:

"errors":[
               "Invalid value io.confluent.connect.s3.format.parquet.ParquetFormat for configuration format.class: Class io.confluent.connect.s3.format.parquet.ParquetFormat could not be found.",
               "Invalid value null for configuration format.class: Class must extend: io.confluent.connect.cloud.storage.source.StorageObjectFormat"
            ]

我开始认为这个 S3 源连接器不支持 Parquet 格式。我尝试针对 JSON、AVRO 和 BYTE 格式对其进行验证,所有这些都可以。

深入研究 S3 源连接器 jar 文件 (1.4.5),我没有找到 Parquet 格式的文件:

Formats in Jar files

有人知道这里发生了什么吗?有没有其他方法可以将数据从 S3 - Parquet 格式放回我的 Kafka 集群?

谢谢!

【问题讨论】:

S3 连接器主要用于分析,而不是备份/恢复。话虽如此,Parquet 是一种列格式,因此无法轻松读取单个记录,例如基于行的 Avro/JSON/text-lines 可以 【参考方案1】:

来自 Confluent 的 S3 源连接器文档:

开箱即用,连接器支持以 Avro 和 JSON 格式从 S3 读取数据。除了带有模式的记录外,连接器还支持在文本文件中导入不带模式的纯 JSON 记录,每行一条记录。通常,连接器可以接受任何提供 Format 接口实现的格式。

所以这意味着您应该能够添加/插件实现parquet 格式,但它不是开箱即用的内置

格式的源代码:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java

【讨论】:

有一些工具可以将 Parquet 带到 Avro。不确定这是否真的有助于将数据返回到 Protobuf,就像 OP 似乎想要的那样 OP 显示错误,即在未找到类上读取镶木地板失败...读取格式是可插入的,他可能还需要为值转换器设置一个设置才能将其转换为 PROTOBUFF 当然,但是您链接到 Sink 的 Format 接口,而不是源所需的 io.confluent.connect.cloud.storage.source.StorageObjectFormat . In general, the connector may accept any format that provides an implementation of the Format interface. 假设sink和source是同一个Connect集群,那为什么不行呢?

以上是关于用于 S3 中 PARQUET 格式的 Kafka S3 源连接器的主要内容,如果未能解决你的问题,请参考以下文章

S3 Select 会加速 Parquet 文件的 Spark 分析吗?

使用 Spectrify 以 Parquet 格式将数据从 Redshift 卸载到 S3

使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3

AWS Glue 作业以 Parquet 格式写入 s3 并出现 Not Found 错误

parquet 如何在 S3 中存储时间戳数据?

使用存储在 s3 中的 parquet 文件在 Greenplum 中创建外部表