Spark DataFrame java.lang.OutOfMemoryError:长循环运行时超出了GC开销限制
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark DataFrame java.lang.OutOfMemoryError:长循环运行时超出了GC开销限制相关的知识,希望对你有一定的参考价值。
我正在运行Spark应用程序(Spark 1.6.3集群),它对2个小数据集进行一些计算,并将结果写入S3 Parquet文件。
这是我的代码:
public void doWork(JavaSparkContext sc, Date writeStartDate, Date writeEndDate, String[] extraArgs) throws Exception {
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
S3Client s3Client = new S3Client(ConfigTestingUtils.getBasicAWSCredentials());
boolean clearOutputBeforeSaving = false;
if (extraArgs != null && extraArgs.length > 0) {
if (extraArgs[0].equals("clearOutput")) {
clearOutputBeforeSaving = true;
} else {
logger.warn("Unknown param " + extraArgs[0]);
}
}
Date currRunDate = new Date(writeStartDate.getTime());
while (currRunDate.getTime() < writeEndDate.getTime()) {
try {
SparkReader<FirstData> sparkReader = new SparkReader<>(sc);
JavaRDD<FirstData> data1 = sparkReader.readDataPoints(
inputDir,
currRunDate,
getMinOfEndDateAndNextDay(currRunDate, writeEndDate));
// Normalize to 1 hours & 0.25 degrees
JavaRDD<FirstData> distinctData1 = data1.distinct();
// Floor all (distinct) values to 6 hour windows
JavaRDD<FirstData> basicData1BySixHours = distinctData1.map(d1 -> new FirstData(
d1.getId(),
TimeUtils.floorTimePerSixHourWindow(d1.getTimeStamp()),
d1.getLatitude(),
d1.getLongitude()));
// Convert Data1 to Dataframes
DataFrame data1DF = sqlContext.createDataFrame(basicData1BySixHours, FirstData.class);
data1DF.registerTempTable("data1");
// Read Data2 DataFrame
String currDateString = TimeUtils.getSimpleDailyStringFromDate(currRunDate);
String inputS3Path = basedirInput + "/dt=" + currDateString;
DataFrame data2DF = sqlContext.read().parquet(inputS3Path);
data2DF.registerTempTable("data2");
// Join data1 and data2
DataFrame mergedDataDF = sqlContext.sql("SELECT D1.Id,D2.beaufort,COUNT(1) AS hours " +
"FROM data1 as D1,data2 as D2 " +
"WHERE D1.latitude=D2.latitude AND D1.longitude=D2.longitude AND D1.timeStamp=D2.dataTimestamp " +
"GROUP BY D1.Id,D1.timeStamp,D1.longitude,D1.latitude,D2.beaufort");
// Create histogram per ID
JavaPairRDD<String, Iterable<Row>> mergedDataRows = mergedDataDF.toJavaRDD().groupBy(md -> md.getAs("Id"));
JavaRDD<MergedHistogram> mergedHistogram = mergedDataRows.map(new MergedHistogramCreator());
logger.info("Number of data1 results: " + data1DF.select("lId").distinct().count());
logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
logger.info("Number of results with beaufort histograms: " + mergedDataDF.select("Id").distinct().count());
// Save to parquet
String outputS3Path = basedirOutput + "/dt=" + TimeUtils.getSimpleDailyStringFromDate(currRunDate);
if (clearOutputBeforeSaving) {
writeWithCleanup(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext, s3Client);
} else {
write(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext);
}
} finally {
TimeUtils.progressToNextDay(currRunDate);
}
}
}
public void write(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass, SQLContext sqlContext) {
// Apply a schema to an RDD of JavaBeans and save it as Parquet.
DataFrame fullDataDF = sqlContext.createDataFrame(outputRDD, outputClass);
fullDataDF.write().parquet(outputS3Path);
}
public void writeWithCleanup(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass,
SQLContext sqlContext, S3Client s3Client) {
String fileKey = S3Utils.getS3Key(outputS3Path);
String bucket = S3Utils.getS3Bucket(outputS3Path);
logger.info("Deleting existing dir: " + outputS3Path);
s3Client.deleteAll(bucket, fileKey);
write(outputS3Path, outputRDD, outputClass, sqlContext);
}
public Date getMinOfEndDateAndNextDay(Date startTime, Date proposedEndTime) {
long endOfDay = startTime.getTime() - startTime.getTime() % MILLIS_PER_DAY + MILLIS_PER_DAY ;
if (endOfDay < proposedEndTime.getTime()) {
return new Date(endOfDay);
}
return proposedEndTime;
}
data1的大小约为150,000,data2约为500,000。
我的代码所做的基本上是做一些数据操作,合并2个数据对象,做更多的操作,打印一些统计数据并保存到镶木地板。
火花每个服务器有25GB的内存,代码运行正常。每次迭代大约需要2-3分钟。
当我在大量日期上运行时,问题就开始了。
过了一会儿,我得到一个OutOfMemory:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
at org.json4s.JsonDSL$JsonListAssoc.$tilde(JsonDSL.scala:98)
at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:139)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:72)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:38)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:87)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:72)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:72)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:71)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:70)
上次运行时,它在233次迭代后崩溃。
它崩溃的线是这样的:
logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
任何人都可以告诉我最终崩溃的原因是什么?
当GC占用进程总执行时间的98%以上时,会发生此错误。您可以通过转到http://master:4040中的阶段选项卡来监控Spark Web UI中的GC时间。
尝试在提交spark应用程序时使用spark。{driver / executor} .memory -conf来增加驱动程序/执行程序(以产生此错误为准)内存。
另一件要尝试的是更改java正在使用的垃圾收集器。阅读这篇文章:https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html。它非常清楚地解释了为什么发生GC开销错误以及哪种垃圾收集器最适合您的应用程序。
我不确定每个人都会发现这个解决方案可行,但将Spark集群升级到2.2.0似乎已经解决了这个问题。
我已经运行了几天我的应用程序,并且还没有崩溃。
以上是关于Spark DataFrame java.lang.OutOfMemoryError:长循环运行时超出了GC开销限制的主要内容,如果未能解决你的问题,请参考以下文章
将 RDD 转换为 DataFrame 时出现 java.lang.***Error
将 rdd 映射到空值上的 java 语音匹配库时,Spark 抛出 java.lang.NullPointerException
无法将 Spark ML 库中的 Vector 用于 DataFrame
Pyspark - DataFrame persist() 出错 java.lang.OutOfMemoryError:超出 GC 开销限制