在 Confluent S3 Kafka 连接器中压缩 Avro 数据
Posted
技术标签:
【中文标题】在 Confluent S3 Kafka 连接器中压缩 Avro 数据【英文标题】:Compressing Avro data in Confluent S3 Kafka Connector 【发布时间】:2022-01-16 19:24:56 【问题描述】:我有一个 Confluent 接收器连接器,它从 Kafka 主题中获取数据。然后将其摄取到 S3 存储桶中。
摄取工作正常,一切都很好,但是现在我需要在将 Avro 数据放入存储桶之前对其进行压缩。
我尝试了以下配置
"name":"--private-v1-s3-sink",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"s3.region":"eu-west-1",
"partition.duration.ms":"3600000",
"rotate.schedule.interval.ms": "3600000",
"topics.dir":"svs",
"flush.size":"2500",
"schema.compatibility":"FULL",
"file.delim":"_",
"topics":"--connect.s3.format.avro.AvroFormat",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"--systems",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "$S3_BUCKET",
"s3.acl.canned":"bucket-owner-full-control",
"avro.codec": "snappy",
"locale":"en-GB",
"timezone": "GMT",
"errors.tolerance": "all",
"path.format":"'ingest_date'=yyyy-MM-dd",
"timestamp.extractor":"Record"
我认为“avro.code”会压缩数据,但事实并非如此。取而代之的是我还尝试了“s3.compression.type”:“snappy”,但仍然没有运气!但是这确实适用于 JSON 和 GZIP。
不太确定出了什么问题?
【问题讨论】:
【参考方案1】:这些设置仅适用于 S3 Avro 编写器,不适用于来自生产者的动态数据,这些数据必须在生产者或代理/主题级别而不是 Connect 设置进行“压缩”。
参考compression.type
主题配置
【讨论】:
【参考方案2】:对于那些将来可能会遇到这种情况的人。
我在此设置之间进行了测试,使用 BZIP2 而不是 snappy,并且未启用压缩。
结果如下:
No compression 58.2MB / 406 total objects
BZIP 19.9MB / 406 total objects
Snappy 31.1MB / 406 total objects
跑了超过 24 小时,全部从同一个主题中提取并放入自己的存储桶中。
如您所见,上面使用 snappy 的配置实际上是有效的。
BZIP 提供更高的压缩率并且看起来更快。
最终我们不得不使用 snappy,因为 Redshift 摄取只允许使用 snappy 压缩 Avro,无论如何。
【讨论】:
以上是关于在 Confluent S3 Kafka 连接器中压缩 Avro 数据的主要内容,如果未能解决你的问题,请参考以下文章
如何在没有 Confluent 的情况下使用 Kafka Connect 从 Kafka 向 AWS S3 发送数据?
使用带有 Helm 安装的 Kafka/Confluent 的连接器
Confluent Cloud Kafka - 审计日志集群:接收器连接器