雪花卡夫卡连接器限制?
Posted
技术标签:
【中文标题】雪花卡夫卡连接器限制?【英文标题】:snowflake kafka connector limits? 【发布时间】:2020-06-20 12:20:39 【问题描述】:我正在尝试通过 kafka 连接器将 467 个主题的大约 2797 个分区推送到雪花,并且(2 个任务)kafka 连接器通过以下方式轰炸:
[2020-03-08 10:13:10,884] ERROR WorkerSinkTaskid=snowflake-1 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:803)
at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:937)
at java.base/java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1583)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:346)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:632)
at net.snowflake.ingest.connection.SecurityManager.<init>(SecurityManager.java:105)
at net.snowflake.ingest.connection.SecurityManager.<init>(SecurityManager.java:120)
at net.snowflake.ingest.connection.RequestBuilder.<init>(RequestBuilder.java:216)
at net.snowflake.ingest.SimpleIngestManager.<init>(SimpleIngestManager.java:353)
at com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1.<init>(SnowflakeIngestionServiceV1.java:41)
at com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceFactory$SnowflakeIngestionServiceBuilder.<init>(SnowflakeIngestionServiceFactory.java:35)
at com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceFactory$SnowflakeIngestionServiceBuilder.<init>(SnowflakeIngestionServiceFactory.java:25)
at com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceFactory.builder(SnowflakeIngestionServiceFactory.java:18)
at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.buildIngestService(SnowflakeConnectionServiceV1.java:681)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.<init>(SnowflakeSinkServiceV1.java:271)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.<init>(SnowflakeSinkServiceV1.java:232)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.startTask(SnowflakeSinkServiceV1.java:70)
at com.snowflake.kafka.connector.SnowflakeSinkTask.lambda$open$2(SnowflakeSinkTask.java:168)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at com.snowflake.kafka.connector.SnowflakeSinkTask.open(SnowflakeSinkTask.java:168)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:651)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:285)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:443)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-03-08 10:13:10,885] ERROR WorkerSinkTaskid=snowflake-1 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
我的虚拟机在资源方面看起来不错,因此我怀疑内部雪管限制向我吐口水;还有其他想法吗?
【问题讨论】:
欢迎来到***!你确定你的虚拟机在资源方面没问题吗?错误消息java.lang.OutOfMemoryError
表示机器内存不足。
【参考方案1】:
Kafka 连接默认为 2G 的堆空间。
如果您想将一个虚拟机用于这么多主题和分区,那似乎是个坏主意。
创建一个 Connect 集群。分配工作量
【讨论】:
是的;得到集群并将 -Xms8G -Xmx16G 作为 jvm 选项放在 2 个工作人员上;但是我注意到通过 visualvm 看到的连接器上的 ram 使用率非常高 嗯,是的。您只有 2 个任务和一大堆主题/分区 问题出在线程数上;将 TasksMax=MAX_TASKS|infinity 放入 systemd 可避免问题以上是关于雪花卡夫卡连接器限制?的主要内容,如果未能解决你的问题,请参考以下文章
卡夫卡连接错误:java.lang.NoClassDefFoundError:org/apache/http/conn/HttpClientConnectionManager