Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败
Posted
技术标签:
【中文标题】Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败【英文标题】:Kafka Connector Record Writer gets stuck in S3OutputStream for lack of memory to allocate but does not fail staying idle for hours 【发布时间】:2021-02-06 11:38:06 【问题描述】:我的行为我不知道如何修改。 我正在测试 s3 Kafka 同步连接器,但我的主题中的数据很少。
目前我可以通过使用 Kafka 管理器看到主题中有数据,但我的连接器读取数据并且从不移动偏移量,也从不将其推送到 Kafka。 在其他主题中这有效,但在此特定主题中则无效。 我认为与超时有关,但我找不到要设置的正确配置属性,以便刷新更快一些。
这是我的配置:
curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
\
-d '
""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
""s3.region"": ""us-east-1"",
""s3.bucket.name"": ""confluent-pipeline"",
""topics.dir"": ""topics"",
""topics"": ""com.acp.bde.doc_cmg"",
""flush.size"": ""25"",
""rotate.interval.ms"": ""5000"",
""auto.register.schemas"": ""false"",
""tasks.max"": ""1"",
""s3.part.size"": ""5242880"",
""timezone"": ""UTC"",
""parquet.codec"": ""snappy"",
""offset.flush.interval.ms"": ""5000"",
""offset.flush.timeout.ms"": ""1000"",
""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
""locale"": ""de-CH"",
""timezone"": ""Europe/Zurich"",
""store.url"": ""http://minio-server-svc:9000/""
'"
这就是我在日志中看到的:
[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
所以它们已经开放了将近 1 小时,但实际上什么也没发生,我想知道我的配置是否完全错误,或者我需要一些属性和配置,因此这种数据推送速度会更快一些。
更新: 我仍然没有正确解决这个问题,但实际上它似乎是内存不足的问题。
程序卡在这一行 this.buffer = ByteBuffer.allocate(this.partSize);
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java#L85
困扰我的部分是它根本不抱怨,只是停留在那里。它不应该因内存不足而崩溃吗? 还是不应该更快地释放内存? 它几乎可以在没有任何反馈的情况下在该调用中停留超过 3 或 4 小时。
我仍然认为我的配置可能有问题,但我不知道应该查看什么或在哪里查看。
【问题讨论】:
【参考方案1】:您的分区器是基于时间的。因此,当 rotate.schedule.interval.ms 参数不存在时,这可能会发生。看看下面的话题https://***.com/a/51160834/1551246
【讨论】:
以上是关于Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败的主要内容,如果未能解决你的问题,请参考以下文章
Flink记录一次Flink消费kafka写入Elastic在kerberos认证的情况下不消费问题