多次迭代内存不足
Posted
技术标签:
【中文标题】多次迭代内存不足【英文标题】:spark out of memory multiple iterations 【发布时间】:2016-10-31 15:52:17 【问题描述】:我有一个 spark 作业(在 spark 1.3.1 中运行)必须遍历几个键(大约 42 个)并处理该作业。这是程序的结构
-
从地图中获取密钥
从 hive(下面的 hadoop-yarn)中获取与键匹配的数据作为数据帧
过程数据
将结果写入配置单元
当我为一键运行时,一切正常。当我使用 42 个键运行时,我在第 12 次迭代时遇到内存不足异常。有没有办法可以在每次迭代之间清理内存?帮助表示赞赏。
这是我正在使用的高级代码。
public abstract class SparkRunnable
public static SparkContext sc = null;
public static JavaSparkContext jsc = null;
public static HiveContext hiveContext = null;
public static SQLContext sqlContext = null;
protected SparkRunnableModel(String appName)
//get the system properties to setup the model
// Getting a java spark context object by using the constants
SparkConf conf = new SparkConf().setAppName(appName);
sc = new SparkContext(conf);
jsc = new JavaSparkContext(sc);
// Creating a hive context object connection by using java spark
hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
// sql context
sqlContext = new SQLContext(sc);
public abstract void processModel(Properties properties) throws Exception;
class ModelRunnerMain(model: String) extends SparkRunnableModel(model: String) with Serializable
override def processModel(properties: Properties) =
val dataLoader = DataLoader.getDataLoader(properties)
//loads keys data frame from a keys table in hive and converts that to a list
val keysList = dataLoader.loadSeriesData()
for (key <- keysList)
runModelForKey(key, dataLoader)
def runModelForKey(key: String, dataLoader: DataLoader) =
//loads data frame from a table(~50 col X 800 rows) using "select * from table where key='<key>'"
val keyDataFrame = dataLoader.loadKeyData()
// filter this data frame into two data frames
...
// join them to transpose
...
// convert the data frame into an RDD
...
// run map on the RDD to add bunch of new columns
...
我的数据框大小低于一个兆。但是我通过选择和加入等方式从中创建了几个数据框。我假设一旦迭代完成,所有这些都会被垃圾收集。
这是我正在运行的配置。
spark.eventLog.enabled:true spark.broadcast.port:7086 spark.driver.memory:12g spark.shuffle.spill:false spark.serializer:org.apache.spark.serializer.KryoSerializer spark.storage.memoryFraction:0.7 spark.executor.cores:8 spark.io.compression.codec:lzf spark.shuffle.consolidateFiles:true spark.shuffle.service.enabled:true spark.master:yarn-client spark.executor.instances:8 spark.shuffle.service.port:7337 spark.rdd.compress:true spark.executor.memory:48g spark.executor.id: spark.sql.shuffle.partitions:700 spark.cores.max:56这是我得到的例外。
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
at org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
at com.ning.compress.lzf.ChunkEncoder.encodeAndWriteChunk(ChunkEncoder.java:264)
at com.ning.compress.lzf.LZFOutputStream.writeCompressedBlock(LZFOutputStream.java:266)
at com.ning.compress.lzf.LZFOutputStream.write(LZFOutputStream.java:124)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
【问题讨论】:
如果没有看到重现问题的代码,这将很难回答。一般来说 Spark 应该可以让 GC 收集不再需要的数据,但魔鬼在细节中…… 我完全同意@TzachZohar,因此我投票关闭它,因为它是广泛的,没有最低限度的可验证完整示例。 谢谢你们。我将添加代码。问题是堆栈太通用了,我不知道我应该给哪个部分。我将提取重要部分并将其添加到我的问题中。 我已经用代码更新了帖子。请看一看。 这可能是引用所有对象的静态 hiveContext 吗?我有一个扩展此 SparkRunnable 模型并执行类似步骤的 java 类。这似乎工作正常。 OOM 发生在 scala 类中。我一定是做错了什么。 【参考方案1】:使用 checkpoint() 或 localCheckpoint() 可以减少火花血统并提高迭代中应用程序的性能。
【讨论】:
以上是关于多次迭代内存不足的主要内容,如果未能解决你的问题,请参考以下文章