用于 S3 中 PARQUET 格式的 Kafka S3 源连接器
Posted
技术标签:
【中文标题】用于 S3 中 PARQUET 格式的 Kafka S3 源连接器【英文标题】:Kafka S3 Source Connector for PARQUET format in S3 【发布时间】:2021-10-06 04:45:42 【问题描述】:我正在使用 Protobuf 生成主题事件。我可以使用 Parquet 格式的 S3 Sink 连接器成功地将我的主题事件接收到 S3 存储桶中。现在我的 S3 存储桶中有 .parquet
和 .key.parquet
类型的对象。使用以下配置,所有这些都按预期工作:
"name": "s3-sink",
"config":
"name": "s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "https://my-schema-registry",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "MY_SR_API_KEY:MY_SR_API_SECRET",
"store.kafka.keys": true,
"parquet.codec": "none",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en-US",
"s3.bucket.name": "my-bucket-123",
"s3.region": "eu-west-1",
"time.interval": "HOURLY",
"flush.size": 1000,
"tasks.max": 1,
"topics.regex": "test-topic.*",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https"
现在我想使用 Protobuf 将 my-bucket-123
(parquet
格式)的键和值放回 Kafka 主题中。为此,我使用以下配置通过 Confluent 设置了一个新的 S3 源连接器 (confluentinc/kafka-connect-s3-source:1.4.5
):
"name": "s3-source",
"config":
"name": "s3-source",
"dest.kafka.bootstrap.servers": "my-bootstrap-server",
"dest.topic.replication.factor": 1,
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"tasks.max": 1,
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": ".*",
"transforms.AddPrefix.replacement": "copy_of_$0",
"s3.region": "eu-west-1",
"s3.bucket.name": "my-bucket-123"
通过使用上述配置,我无法启动我的 S3 源连接器。如果我使用上述配置和命令验证配置:
curl -X PUT -d @config.json --header "content-Type:application/json" http://localhost:8083/connector-plugins/S3SourceConnector/config/validate
我在format.class
属性中收到以下错误:
"errors":[
"Invalid value io.confluent.connect.s3.format.parquet.ParquetFormat for configuration format.class: Class io.confluent.connect.s3.format.parquet.ParquetFormat could not be found.",
"Invalid value null for configuration format.class: Class must extend: io.confluent.connect.cloud.storage.source.StorageObjectFormat"
]
我开始认为这个 S3 源连接器不支持 Parquet
格式。我尝试针对 JSON、AVRO 和 BYTE 格式对其进行验证,所有这些都可以。
深入研究 S3 源连接器 jar 文件 (1.4.5
),我没有找到 Parquet 格式的文件:
Formats in Jar files
有人知道这里发生了什么吗?有没有其他方法可以将数据从 S3 - Parquet 格式放回我的 Kafka 集群?
谢谢!
【问题讨论】:
S3 连接器主要用于分析,而不是备份/恢复。话虽如此,Parquet 是一种列格式,因此无法轻松读取单个记录,例如基于行的 Avro/JSON/text-lines 可以 【参考方案1】:来自 Confluent 的 S3 源连接器文档:
开箱即用,连接器支持以 Avro 和 JSON 格式从 S3 读取数据。除了带有模式的记录外,连接器还支持在文本文件中导入不带模式的纯 JSON 记录,每行一条记录。通常,连接器可以接受任何提供 Format 接口实现的格式。
所以这意味着您应该能够添加/插件实现parquet
格式,但它不是开箱即用的内置
格式的源代码:
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java
【讨论】:
有一些工具可以将 Parquet 带到 Avro。不确定这是否真的有助于将数据返回到 Protobuf,就像 OP 似乎想要的那样 OP 显示错误,即在未找到类上读取镶木地板失败...读取格式是可插入的,他可能还需要为值转换器设置一个设置才能将其转换为 PROTOBUFF 当然,但是您链接到 Sink 的Format
接口,而不是源所需的 io.confluent.connect.cloud.storage.source.StorageObjectFormat
. In general, the connector may accept any format that provides an implementation of the Format interface.
假设sink和source是同一个Connect集群,那为什么不行呢?以上是关于用于 S3 中 PARQUET 格式的 Kafka S3 源连接器的主要内容,如果未能解决你的问题,请参考以下文章
S3 Select 会加速 Parquet 文件的 Spark 分析吗?
使用 Spectrify 以 Parquet 格式将数据从 Redshift 卸载到 S3
使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3