Kafka Connect S3 连接器 OutOfMemory 错误与 TimeBasedPartitioner

Posted

技术标签:

【中文标题】Kafka Connect S3 连接器 OutOfMemory 错误与 TimeBasedPartitioner【英文标题】:Kafka Connect S3 Connector OutOfMemory errors with TimeBasedPartitioner 【发布时间】:2018-11-30 22:27:19 【问题描述】:

我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1 将 Kafka 消息复制到 S3,并且在处理延迟数据时出现 OutOfMemory 错误。

我知道这看起来是一个很长的问题,但我尽力让它清晰易懂。 非常感谢您的帮助。

高级信息

连接器对 Kafka 消息进行简单的逐字节复制,并在字节数组的开头添加消息的长度(用于解压缩目的)。 这是CustomByteArrayFormat 类的作用(参见下面的配置) 根据Record时间戳对数据进行分区和分桶 CustomTimeBasedPartitioner 扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner,其唯一目的是覆盖generatePartitionedPath 方法,将主题放在路径的末尾。 Kafka Connect 进程的总堆大小为 24GB(仅一个节点) 连接器每秒处理 8,000 到 10,000 条消息 每条消息的大小接近 1 KB Kafka 主题有 32 个分区

OutOfMemory 错误的上下文

仅当连接器已关闭数小时且必须赶上数据时才会发生这些错误 重新打开连接器时,它开始赶上,但很快就因 OutOfMemory 错误而失败

可能但不完整的解释

当这些 OOM 错误发生时,连接器的 timestamp.extractor 配置设置为 Record 将此配置切换为Wallclock(即Kafka Connect进程的时间)不要抛出OOM错误,所有迟到的数据都可以处理,但迟到的数据不再正确分桶 所有迟到的数据都将在连接器重新打开时的YYYY/MM/dd/HH/mm/topic-name 分桶 所以我的猜测是,当连接器尝试根据Record 时间戳正确存储数据时,它执行过多的并行读取会导致 OOM 错误 "partition.duration.ms": "600000" 参数在每小时 6 个 10 分钟路径中生成连接器存储桶数据(2018/06/20/12/[00|10|20|30|40|50] 表示 2018 年 6 月 20 日下午 12 点) 因此,对于 24 小时的延迟数据,连接器必须在 24h * 6 = 144 不同的 S3 路径中输出数据。 每 10 分钟文件夹包含 10,000 条消息/秒 * 600 秒 = 6,000,000 条消息,大小为 6 GB 如果确实是并行读取,那么将有 864GB 的数据进入内存 我认为我必须正确配置一组给定的参数以避免那些 OOM 错误,但我觉得我没有看到全局 "flush.size": "100000" 暗示如果有更多 dans 100,000 条消息被读取,它们应该被提交到文件中(从而释放内存) 对于 1KB 的消息,这意味着每 100MB 提交一次 但即使有 144 个并行读数,总共仍只能提供 14.4 GB,小于可用的 24GB 堆大小 "flush.size" 是在提交之前每个分区要读取的记录数吗?或者可能每个连接器的任务? 我理解"rotate.schedule.interval.ms": "600000" 配置的方式是,即使flush.size 的100,000 条消息尚未到达,数据也将每10 分钟提交一次。

我的主要问题是允许我计划内存使用的数学是什么:

每秒的数量或记录数 记录的大小 我读取的主题的 Kafka 分区数 连接器任务的数量(如果相关) 每小时写入的桶数(这里是 6,因为 "partition.duration.ms": "600000" 配置) 要处理的延迟数据的最大小时数

配置

S3 接收器连接器配置


  "name": "xxxxxxx",
  "config": 
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "us-east-1",
    "partition.duration.ms": "600000",
    "topics.dir": "xxxxx",
    "flush.size": "100000",
    "schema.compatibility": "NONE",
    "topics": "xxxxxx,xxxxxx",
    "tasks.max": "16",
    "s3.part.size": "52428800",
    "timezone": "UTC",
    "locale": "en",
    "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
    "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "xxxxxxxxx",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "xxxxxxx",
    "rotate.schedule.interval.ms": "600000",
    "path.format": "YYYY/MM/dd/HH/mm",
    "timestamp.extractor": "Record"

工作器配置

bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX

编辑

我忘记添加我遇到的错误的示例:

2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

【参考方案1】:

我终于能够理解堆大小在 Kafka Connect S3 连接器中的使用方式

S3 Connector 会将每个 Kafka 分区的数据写入分区 paths 这些paths的分区方式取决于partitioner.class参数; 默认是时间戳,然后partition.duration.ms的值将决定每个分区的持续时间paths。 S3 连接器将为每个 Kafka 分区(用于读取的所有主题)和每个分区分配 s3.part.size 字节的缓冲区paths 读取 20 个分区的示例,timestamp.extractor 设置为 Recordpartition.duration.ms 设置为 1h,s3.part.size 设置为 50 MB 每小时所需的堆大小等于20 * 50 MB = 1 GB; 但是,timestamp.extractor 设置为Record,具有对应于较早时间的时间戳的消息将被缓冲在该较早时间缓冲区中。因此,实际上,连接器至少需要20 * 50 MB * 2h = 2 GB 内存,因为总是有延迟事件,如果有延迟时间超过 1 小时的事件,则需要更多内存; 请注意,如果将 timestamp.extractor 设置为 Wallclock,则情况并非如此,因为就 Kafka Connect 而言,几乎不会有延迟事件发生。 这些缓冲区在 3 个条件下被刷新(即离开内存) rotate.schedule.interval.ms时间已经过去 此刷新条件总是触发。 rotate.interval.ms 时间已经过去timestamp.extractor 时间而言 这意味着如果将timestamp.extractor 设置为Record,则Record 的10 分钟时间可以在更少或更多和10 分钟的实际时间中过去 例如,在处理延迟数据时,10 分钟的数据将在几秒钟内处理完毕,如果 rotate.interval.ms 设置为 10 分钟,则此条件将每秒触发一次(应该如此); 相反,如果事件流暂停,则此条件将不会触发,直到它看到一个事件的时间戳表明自上次触发条件以来已超过 rotate.interval.msflush.size 消息已读不到 min(rotate.schedule.interval.ms, rotate.interval.ms) 对于rotate.interval.ms,如果没有足够的消息,这种情况可能永远不会触发。 因此,您至少需要规划Kafka partitions * s3.part.size Heap Size 如果您使用Record 时间戳进行分区,则应将其乘以max lateness in milliseconds / partition.duration.ms 这是最坏的情况,您在所有分区和max lateness in milliseconds 的所有范围内都有不断延迟的事件。 当 S3 连接器从 Kafka 读取数据时,它还将为每个分区缓冲 consumer.max.partition.fetch.bytes 字节 默认设置为 2.1 MB。 最后,你不应该认为所有的 Heap Size 都可以用来缓冲 Kafka 消息,因为其中还有很多不同的对象 一个安全的考虑是确保 Kafka 消息的缓冲不超过总可用堆大小的 50%。

【讨论】:

非常感谢您的详细回答。有更多像您这样的人,互联网会是一个更好的地方。 谢谢Guido,感谢。我很高兴它有用。 @raphael 如果您的连接器正在下沉多个主题,您还会将计算乘以主题数量吗? @moku 不是按主题的倍数,而是按包含的所有主题的分区总数。假设您有一个主题有 10 个分区,而另一个主题有 24 个,那么您将乘以 34。【参考方案2】:

@raphael 完美地解释了工作原理。 粘贴我遇到过的类似问题的一小部分变化(事件太少而无法处理,但需要数小时/数天)。

在我的例子中,我有大约 150 个连接器,其中 8 个因 OOM 失败,因为它们必须处理大约 7 天的数据(我们在测试环境中的 kafka 关闭了大约 2 周)

遵循的步骤:

    将所有连接器的 s3.part.size 从 25MB 减少到 5MB。 (在我们的场景中,rotate.interval 设置为 10 分钟,flush.size 设置为 10000。我们的大多数事件应该很容易符合此限制)。 在这个设置之后,只有一个连接器还在 OOM,并且这个连接器在启动后的 5 秒内进入 OOM(基于堆分析),它从200MB-1.5GB 的堆利用率。在查看 kafka 偏移延迟时,在所有 7 天中只有 8K 事件需要处理。所以这不是因为要处理的事件太多,而是因为处理的事件太少/flush。 由于我们使用 Hourly 分区并且一个小时内几乎没有 100 个事件,这 7 天的所有缓冲区都是在没有被刷新(没有被释放到 JVM)的情况下创建的 - 7 * 24 * 5MB * 3 partitions = 2.5GB (xmx-1.5GB)

修复: 执行以下步骤之一,直到您的连接器赶上,然后恢复您的旧配置。 (推荐方法 - 1

    更新连接器配置以处理 100 或 1000 条记录 flush.size(取决于数据的结构)。 缺点:如果实际事件超过 1000 个,一小时内会创建太多小文件。 将分区更改为每日分区,以便只有每日分区。 缺点:您现在将在 S3 中混合使用 Hourly 和 Daily 分区。

【讨论】:

以上是关于Kafka Connect S3 连接器 OutOfMemory 错误与 TimeBasedPartitioner的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为

Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

gsutil 无法验证 Kafka Connect S3 上传的文件的哈希值

如何在 Kafka Connect S3 中解析记录头?

Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟

如何在没有 Confluent 的情况下使用 Kafka Connect 从 Kafka 向 AWS S3 发送数据?