在 Confluent S3 Kafka 连接器中压缩 Avro 数据

Posted

技术标签:

【中文标题】在 Confluent S3 Kafka 连接器中压缩 Avro 数据【英文标题】:Compressing Avro data in Confluent S3 Kafka Connector 【发布时间】:2022-01-16 19:24:56 【问题描述】:

我有一个 Confluent 接收器连接器,它从 Kafka 主题中获取数据。然后将其摄取到 S3 存储桶中。

摄取工作正常,一切都很好,但是现在我需要在将 Avro 数据放入存储桶之前对其进行压缩。

我尝试了以下配置

   
  "name":"--private-v1-s3-sink",
  "connector.class":"io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "1",
  "s3.region":"eu-west-1",
  "partition.duration.ms":"3600000",
  "rotate.schedule.interval.ms": "3600000",
  "topics.dir":"svs",
  "flush.size":"2500",
  "schema.compatibility":"FULL",
  "file.delim":"_",
  "topics":"--connect.s3.format.avro.AvroFormat",
  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  "value.converter":"io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url":"--systems",
  "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "storage.class":"io.confluent.connect.s3.storage.S3Storage",
  "s3.bucket.name": "$S3_BUCKET",
  "s3.acl.canned":"bucket-owner-full-control",
  "avro.codec": "snappy",
  "locale":"en-GB",
  "timezone": "GMT",
  "errors.tolerance": "all",
  "path.format":"'ingest_date'=yyyy-MM-dd",
  "timestamp.extractor":"Record"

我认为“avro.code”会压缩数据,但事实并非如此。取而代之的是我还尝试了“s3.compression.type”:“snappy”,但仍然没有运气!但是这确实适用于 JSON 和 GZIP。

不太确定出了什么问题?

【问题讨论】:

【参考方案1】:

这些设置仅适用于 S3 Avro 编写器,不适用于来自生产者的动态数据,这些数据必须在生产者或代理/主题级别而不是 Connect 设置进行“压缩”。

参考compression.type主题配置

【讨论】:

【参考方案2】:

对于那些将来可能会遇到这种情况的人。

我在此设置之间进行了测试,使用 BZIP2 而不是 snappy,并且未启用压缩。

结果如下:

No compression 58.2MB / 406 total objects
BZIP    19.9MB / 406 total objects
Snappy 31.1MB / 406 total objects

跑了超过 24 小时,全部从同一个主题中提取并放入自己的存储桶中。

如您所见,上面使用 snappy 的配置实际上是有效的。

BZIP 提供更高的压缩率并且看起来更快。

最终我们不得不使用 snappy,因为 Redshift 摄取只允许使用 snappy 压缩 Avro,无论如何。

【讨论】:

以上是关于在 Confluent S3 Kafka 连接器中压缩 Avro 数据的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有 Confluent 的情况下使用 Kafka Connect 从 Kafka 向 AWS S3 发送数据?

kafka s3融合连接器 - 将json上传为字符串

使用带有 Helm 安装的 Kafka/Confluent 的连接器

Confluent Cloud Kafka - 审计日志集群:接收器连接器

无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON

Confluent Kafka Connect HDFS Sink 连接器延迟