多次迭代内存不足

Posted

技术标签:

【中文标题】多次迭代内存不足【英文标题】:spark out of memory multiple iterations 【发布时间】:2016-10-31 15:52:17 【问题描述】:

我有一个 spark 作业(在 spark 1.3.1 中运行)必须遍历几个键(大约 42 个)并处理该作业。这是程序的结构

    从地图中获取密钥 从 hive(下面的 hadoop-yarn)中获取与键匹配的数据作为数据帧 过程数据 将结果写入配置单元

当我为一键运行时,一切正常。当我使用 42 个键运行时,我在第 12 次迭代时遇到内存不足异常。有没有办法可以在每次迭代之间清理内存?帮助表示赞赏。

这是我正在使用的高级代码。

public abstract class SparkRunnable 

public static SparkContext sc = null;
public static JavaSparkContext jsc = null;
public static HiveContext hiveContext = null;
public static SQLContext sqlContext = null;

protected SparkRunnableModel(String appName)
    //get the system properties to setup the model
    // Getting a java spark context object by using the constants
    SparkConf conf = new SparkConf().setAppName(appName);
    sc = new SparkContext(conf);
    jsc = new JavaSparkContext(sc);

    // Creating a hive context object connection by using java spark
    hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);

    // sql context
    sqlContext = new SQLContext(sc);



public abstract void processModel(Properties properties) throws Exception;



class ModelRunnerMain(model: String) extends SparkRunnableModel(model: String) with Serializable 

  override def processModel(properties: Properties) = 
  val dataLoader = DataLoader.getDataLoader(properties)

//loads keys data frame from a keys table in hive and converts that to a list
val keysList = dataLoader.loadSeriesData()

for (key <- keysList) 
    runModelForKey(key, dataLoader)



  def runModelForKey(key: String, dataLoader: DataLoader) = 

//loads data frame from a table(~50 col X 800 rows) using "select * from table where key='<key>'"
val keyDataFrame = dataLoader.loadKeyData()

// filter this data frame into two data frames
...

// join them to transpose
...

// convert the data frame into an RDD
...

// run map on the RDD to add bunch of new columns
...
  


我的数据框大小低于一个兆。但是我通过选择和加入等方式从中创建了几个数据框。我假设一旦迭代完成,所有这些都会被垃圾收集。

这是我正在运行的配置。

spark.eventLog.enabled:true spark.broadcast.port:7086 spark.driver.memory:12g spark.shuffle.spill:false spark.serializer:org.apache.spark.serializer.KryoSerializer spark.storage.memoryFraction:0.7 spark.executor.cores:8 spark.io.compression.codec:lzf spark.shuffle.consolidateFiles:true spark.shuffle.service.enabled:true spark.master:yarn-client spark.executor.instances:8 spark.shuffle.service.port:7337 spark.rdd.compress:true spark.executor.memory:48g spark.executor.id: spark.sql.shuffle.partitions:700 spark.cores.max:56

这是我得到的例外。

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
at org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
at com.ning.compress.lzf.ChunkEncoder.encodeAndWriteChunk(ChunkEncoder.java:264)
at com.ning.compress.lzf.LZFOutputStream.writeCompressedBlock(LZFOutputStream.java:266)
at com.ning.compress.lzf.LZFOutputStream.write(LZFOutputStream.java:124)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

【问题讨论】:

如果没有看到重现问题的代码,这将很难回答。一般来说 Spark 应该可以让 GC 收集不再需要的数据,但魔鬼在细节中…… 我完全同意@TzachZohar,因此我投票关闭它,因为它是广泛的,没有最低限度的可验证完整示例。 谢谢你们。我将添加代码。问题是堆栈太通用了,我不知道我应该给哪个部分。我将提取重要部分并将其添加到我的问题中。 我已经用代码更新了帖子。请看一看。 这可能是引用所有对象的静态 hiveContext 吗?我有一个扩展此 SparkRunnable 模型并执行类似步骤的 java 类。这似乎工作正常。 OOM 发生在 scala 类中。我一定是做错了什么。 【参考方案1】:

使用 checkpoint() 或 localCheckpoint() 可以减少火花血统并提高迭代中应用程序的性能。

【讨论】:

以上是关于多次迭代内存不足的主要内容,如果未能解决你的问题,请参考以下文章

结帐时git内存不足

iOS:浏览器因内存不足而崩溃

J2me 应用程序在多个时间文件播放时出现内存不足异常

如何在训练过程中检查CUDA内存不足的根本原因?

RocketMQ启动内存不足

electron内存不足崩溃