Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟

Posted

技术标签:

【中文标题】Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟【英文标题】:Kafka Connect S3 Sink Flush data - Strange lag 【发布时间】:2020-12-23 15:04:38 【问题描述】:

我有一个由 KSQL 查询和 inut 流创建的表,它由 Kafka 主题支持。 本主题是使用 Kafka Connect 下沉到 s3。 在这个主题中,我有大约 1k msgs/sec。 该主题有 6 个分区和 3 个副本。

我有一个奇怪的输出比率。下沉似乎很奇怪。 这是我的监控: monitoring 您可以看到第一个图表显示了输入比率 B/s,第二个图表显示了输出比率,第三个图表显示了使用 Burrow 计算的滞后。

这是我的 s3-sink 属性文件:


"name": "sink-feature-static",
"config": 
  "topics": "FEATURE_APP_STATIC",
  "topics.dir": "users-features-stream",
  "tasks.max": "6",
  "consumer.override.auto.offset.reset": "latest",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
  "partition.duration.ms": "3600000",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
  "flush.size": 1000000,
  "s3.part.size": 5242880,
  "rotate.interval.ms": "600000",
  "rotate.schedule.interval.ms": "600000",
  "locale": "fr-FR",
  "timezone": "UTC",
  "timestamp.extractor": "Record",
  "schema.compatibility": "NONE",
  "aws.secret.access.key": "secretkey",
  "aws.access.key.id": "accesskey",
  "s3.bucket.name": "feature-store-prod-kafka-test",
  "s3.region": "eu-west-1"
  

这是我在 s3 存储桶中观察到的内容:s3 bucket 在这些文件中,parquet.snappy 中有少量消息。 (有时只有 1 有时更多,...)。每个分区每秒大约 2 个文件。 (当我使用记录时间戳时,这是因为它正在赶上我认为的滞后)。

我所期待的是: 每 1000000 条消息 (flush.size) 或每 10 分钟 (rotate.schedule.interval.ms) 提交一次文件。 所以我期待(作为 1M 条消息 > 10 分钟 * 1Kmsg/s): 每小时 1/ 6(每 10 分钟)* 6(nb 个分区)parquet 文件 2/ 或者如果我错了,至少里面有 1M 条消息的文件...

但是没有观察到 1/ 或 2/ ... 而且我每小时都会在 s3 文件中出现巨大的延迟和刷新/提交(请参阅监控)。 “partition.duration.ms”:“3600000”会导致这种观察吗?

我哪里错了? 为什么我没有看到数据的连续输出刷新,而是出现这样的峰值?

谢谢! 雷米

【问题讨论】:

【参考方案1】:

如果您希望每 10 分钟有一个 s3 对象,请先将 partition.duration.ms 设置为 10 分钟。其次,如果您真的不希望将小文件设置为 rotate.interval.ms=-1rotate.schedule.interval.ms 为 10 分钟(但是您可以保证只发送一次)。

当使用rotate.interval.ms 时,每次你收到一个早于文件偏移量的时间戳时,kafka-connect 刷新会导致每个小时的开始和结束时文件非常小,它确实确保完全一次交付失败案例。

【讨论】:

以上是关于Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为

如何在 Kafka Connect S3 中解析记录头?

Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

Kafka连接s3 sink多个分区

如何在 Kafka Sink 中为不同环境定义 s3bucket 的名称

kafka s3 sink连接器在获取NULL数据时崩溃