将大型数据集缓存到 spark 内存中时“超出 GC 开销限制”(通过 sparklyr 和 RStudio)

Posted

技术标签:

【中文标题】将大型数据集缓存到 spark 内存中时“超出 GC 开销限制”(通过 sparklyr 和 RStudio)【英文标题】:"GC overhead limit exceeded" on cache of large dataset into spark memory (via sparklyr & RStudio) 【发布时间】:2017-07-26 07:21:23 【问题描述】:

我对尝试使用的大数据技术非常陌生,但到目前为止,我已经设法在 RStudio 中设置 sparklyr 以连接到独立的 Spark 集群。数据存储在 Cassandra 中,我可以成功地将大型数据集带入 Spark 内存(缓存)以对其进行进一步分析。

但是,最近我在将一个特别大的数据集导入 Spark 内存时遇到了很多麻烦,即使集群应该有足够的资源(60 个内核,200GB RAM)来处理其大小的数据集。

我认为通过将缓存的数据限制在几个感兴趣的选定列中,我可以解决这个问题(使用我之前查询 here 中的答案代码),但事实并非如此。发生的情况是我的本地机器上的 jar 进程加速以占用所有本地 RAM 和 CPU 资源,整个进程冻结,并且集群上的执行器不断被删除和重新添加。奇怪的是,即使我只选择 1 行进行缓存也会发生这种情况(这应该使这个数据集比其他我没有问题缓存到 Spark 内存中的数据集小得多)。

我查看了日志,这些似乎是流程早期唯一的信息性错误/警告:

17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running

然后在 20 分钟左右后,整个工作崩溃:

java.lang.OutOfMemoryError: GC overhead limit exceeded

我更改了连接配置以增加心跳间隔 (spark.executor.heartbeatInterval: '180s'),并了解了如何通过更改 yarn 集群上的设置来增加 memoryOverhead(使用 spark.yarn.executor.memoryOverhead),但不是在独立集群上。

在我的配置文件中,我通过一次添加以下每个设置进行了实验(均无效):

spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'

更新:我当前的完整 yml 配置文件如下:

default:
# local settings
  sparklyr.sanitize.column.names: TRUE
  sparklyr.cores.local: 3
  sparklyr.shell.driver-memory: "8G"

# remote core/memory settings
  spark.executor.memory: "32G"
  spark.executor.cores: 5
  spark.executor.heartbeatInterval: '180s'
  spark.ext.h2o.nthreads: 10
  spark.cores.max: 30
  spark.memory.storageFraction: 0.6
  spark.memory.fraction: 0.3
  spark.network.timeout: 300
  spark.driver.extraJavaOptions: '-XX:+UseG1GC'

# other configs for spark
  spark.serializer: org.apache.spark.serializer.KryoSerializer
  spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar

# cassandra settings
  spark.cassandra.connection.host: <cassandra_ip>
  spark.cassandra.auth.username: <cassandra_login>
  spark.cassandra.auth.password: <cassandra_pass>
  spark.cassandra.connection.keep_alive_ms: 60000

# spark packages to load
  sparklyr.defaultPackages: 
  - "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
  - "com.databricks:spark-csv_2.11:1.3.0"
  - "com.datastax.cassandra:cassandra-driver-core:3.0.2"
  - "com.amazonaws:aws-java-sdk-pom:1.10.34"

所以我的问题是:

    有人对在这种情况下该怎么做有任何想法吗? 是 是否有我可以更改的配置设置来帮助解决这个问题? 或者,有没有办法将 cassandra 数据导入 以 RStudio/sparklyr 作为驱动的批次? 或者同样,有没有办法在数据被带入缓存时对数据进行处理/过滤/编辑,以使结果表更小(类似于使用 SQL 查询,但使用更复杂的 dplyr 语法)?

【问题讨论】:

您是否尝试过增加 spark.executor.memory ?也尝试增加执行者的数量 是的 - 我将执行程序内存增加到每个节点 64gb(总共 384GB RAM)并且发生了同样的事情。还尝试将执行器加倍(到 6 个节点上的 12 个执行器)并且遇到了同样的问题。 “奇怪的是,即使我只选择 1 行进行缓存也会发生这种情况”这表明下推谓词可能未正确应用。您的数据集有多大,大约有多少(cassandra)分区?你能发布你的整个配置文件吗? 我现在更新了原始帖子以显示完整的配置文件。磁盘上的完整数据集约为 70GB,尽管我只尝试提取其中的一半。不确定如何获取 cassandra 分区的数量?使用nodetool cfstats 表示键的数量为 4156,除以总大小/分区平均字节数得到 ~1000。数据分布在 6 个节点上。 你的集群内存是如何在执行者之间分配的?看起来你有很大的堆大小。您是否尝试为 Spark 调整 GC?最简单的选择是打开 G1GC。查看详情here 【参考方案1】:

好的,我终于成功了!

我最初尝试了 @user6910411 的建议来减少 cassandra 输入拆分大小,但这以同样的方式失败了。在玩了很多其他东西之后,今天我尝试在相反的方向更改该设置:

spark.cassandra.input.split.size_in_mb: 254 

通过增加拆分大小,Spark 任务更少,因此开销也更少,对 GC 的调用也更少。成功了!

【讨论】:

以上是关于将大型数据集缓存到 spark 内存中时“超出 GC 开销限制”(通过 sparklyr 和 RStudio)的主要内容,如果未能解决你的问题,请参考以下文章

spark配置

解析spark RDD

spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足

深入探究Spark -- 基本组成

Spark核心RDD什么是RDDRDD的属性创建RDDRDD的依赖以及缓存

在 Java Spark 中迭代大型数据集的最快且有效的方法