Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败

Posted

技术标签:

【中文标题】Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败【英文标题】:Kafka Connector Record Writer gets stuck in S3OutputStream for lack of memory to allocate but does not fail staying idle for hours 【发布时间】:2021-02-06 11:38:06 【问题描述】:

我的行为我不知道如何修改。 我正在测试 s3 Kafka 同步连接器,但我的主题中的数据很少。

目前我可以通过使用 Kafka 管理器看到主题中有数据,但我的连接器读取数据并且从不移动偏移量,也从不将其推送到 Kafka。 在其他主题中这有效,但在此特定主题中则无效。 我认为与超时有关,但我找不到要设置的正确配置属性,以便刷新更快一些。

这是我的配置:

      curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
      http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
      \
        -d '
          ""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
          ""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
          ""s3.region"": ""us-east-1"",
          ""s3.bucket.name"": ""confluent-pipeline"",
          ""topics.dir"": ""topics"",
          ""topics"": ""com.acp.bde.doc_cmg"",
          ""flush.size"": ""25"",
          ""rotate.interval.ms"": ""5000"",
          ""auto.register.schemas"": ""false"",
          ""tasks.max"": ""1"",
          ""s3.part.size"": ""5242880"",
          ""timezone"": ""UTC"",
          ""parquet.codec"": ""snappy"",
          ""offset.flush.interval.ms"": ""5000"",
          ""offset.flush.timeout.ms"": ""1000"",
          ""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
          ""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
          ""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
          ""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
          ""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
          ""locale"": ""de-CH"",
          ""timezone"": ""Europe/Zurich"",
          ""store.url"": ""http://minio-server-svc:9000/""
        '"

这就是我在日志中看到的:

[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

所以它们已经开放了将近 1 小时,但实际上什么也没发生,我想知道我的配置是否完全错误,或者我需要一些属性和配置,因此这种数据推送速度会更快一些。

更新: 我仍然没有正确解决这个问题,但实际上它似乎是内存不足的问题。

程序卡在这一行 this.buffer = ByteBuffer.allocate(this.partSize);

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java#L85

困扰我的部分是它根本不抱怨,只是停留在那里。它不应该因内存不足而崩溃吗? 还是不应该更快地释放内存? 它几乎可以在没有任何反馈的情况下在该调用中停留超过 3 或 4 小时。

我仍然认为我的配置可能有问题,但我不知道应该查看什么或在哪里查看。

【问题讨论】:

【参考方案1】:

您的分区器是基于时间的。因此,当 rotate.schedule.interval.ms 参数不存在时,这可能会发生。看看下面的话题https://***.com/a/51160834/1551246

【讨论】:

以上是关于Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败的主要内容,如果未能解决你的问题,请参考以下文章

Flink记录一次Flink消费kafka写入Elastic在kerberos认证的情况下不消费问题

数据写入kafka的分区策略

Kafka 连接 API 客户端

将 Spark DataFrame 写入 Hive 表时的内存分配问题

获取所有写入特定主题的 Kafka 源连接器

Apache Camel Kafka 连接器:以 Avro 格式写入 GCS