Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟
Posted
技术标签:
【中文标题】Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟【英文标题】:Kafka Connect S3 Sink Flush data - Strange lag 【发布时间】:2020-12-23 15:04:38 【问题描述】:我有一个由 KSQL 查询和 inut 流创建的表,它由 Kafka 主题支持。 本主题是使用 Kafka Connect 下沉到 s3。 在这个主题中,我有大约 1k msgs/sec。 该主题有 6 个分区和 3 个副本。
我有一个奇怪的输出比率。下沉似乎很奇怪。 这是我的监控: monitoring 您可以看到第一个图表显示了输入比率 B/s,第二个图表显示了输出比率,第三个图表显示了使用 Burrow 计算的滞后。
这是我的 s3-sink 属性文件:
"name": "sink-feature-static",
"config":
"topics": "FEATURE_APP_STATIC",
"topics.dir": "users-features-stream",
"tasks.max": "6",
"consumer.override.auto.offset.reset": "latest",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
"partition.duration.ms": "3600000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
"flush.size": 1000000,
"s3.part.size": 5242880,
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "600000",
"locale": "fr-FR",
"timezone": "UTC",
"timestamp.extractor": "Record",
"schema.compatibility": "NONE",
"aws.secret.access.key": "secretkey",
"aws.access.key.id": "accesskey",
"s3.bucket.name": "feature-store-prod-kafka-test",
"s3.region": "eu-west-1"
这是我在 s3 存储桶中观察到的内容:s3 bucket 在这些文件中,parquet.snappy 中有少量消息。 (有时只有 1 有时更多,...)。每个分区每秒大约 2 个文件。 (当我使用记录时间戳时,这是因为它正在赶上我认为的滞后)。
我所期待的是: 每 1000000 条消息 (flush.size) 或每 10 分钟 (rotate.schedule.interval.ms) 提交一次文件。 所以我期待(作为 1M 条消息 > 10 分钟 * 1Kmsg/s): 每小时 1/ 6(每 10 分钟)* 6(nb 个分区)parquet 文件 2/ 或者如果我错了,至少里面有 1M 条消息的文件...
但是没有观察到 1/ 或 2/ ... 而且我每小时都会在 s3 文件中出现巨大的延迟和刷新/提交(请参阅监控)。 “partition.duration.ms”:“3600000”会导致这种观察吗?
我哪里错了? 为什么我没有看到数据的连续输出刷新,而是出现这样的峰值?
谢谢! 雷米
【问题讨论】:
【参考方案1】:如果您希望每 10 分钟有一个 s3 对象,请先将 partition.duration.ms
设置为 10 分钟。其次,如果您真的不希望将小文件设置为 rotate.interval.ms=-1
和 rotate.schedule.interval.ms
为 10 分钟(但是您可以保证只发送一次)。
当使用rotate.interval.ms
时,每次你收到一个早于文件偏移量的时间戳时,kafka-connect 刷新会导致每个小时的开始和结束时文件非常小,它确实确保完全一次交付失败案例。
【讨论】:
以上是关于Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为
Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中