如何避免 Spark executor 丢失以及由于内存限制而导致纱线容器杀死它?

Posted

技术标签:

【中文标题】如何避免 Spark executor 丢失以及由于内存限制而导致纱线容器杀死它?【英文标题】:How to avoid Spark executor from getting lost and yarn container killing it due to memory limit? 【发布时间】:2015-08-05 18:48:35 【问题描述】:

我有以下代码大部分时间都会触发hiveContext.sql()。我的任务是我想创建几个表并在处理完所有配置单元表分区后将值插入。

所以我首先触发show partitions 并在for 循环中使用它的输出,我调用了一些方法来创建表(如果它不存在)并使用hiveContext.sql 插入它们。

现在,我们不能在执行器中执行hiveContext,所以我必须在驱动程序的for循环中执行它,并且应该一个一个地串行运行。当我在 YARN 集群中提交此 Spark 作业时,几乎所有时间我的执行程序都会因为未找到 shuffle 异常而丢失。

现在发生这种情况是因为 YARN 由于内存过载而杀死了我的执行程序。我不明白为什么,因为每个配置单元分区的数据集都非常小,但它仍然会导致 YARN 杀死我的执行程序。

以下代码是否会并行执行所有操作并尝试同时容纳内存中的所有 hive 分区数据?

public static void main(String[] args) throws IOException    
    SparkConf conf = new SparkConf(); 
    SparkContext sc = new SparkContext(conf); 
    HiveContext hc = new HiveContext(sc); 

    DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")"); 
  
    Row[] rowArr = partitionFrame.collect(); 
    for(Row row : rowArr)  
        String[] splitArr = row.getString(0).split("/"); 
        String server = splitArr[0].split("=")[1]; 
        String date =  splitArr[1].split("=")[1]; 
        String csvPath = "hdfs:///user/db/ext/"+server+".csv"; 
        if(fs.exists(new Path(csvPath)))  
            hiveContext.sql("ADD FILE " + csvPath); 
         
        createInsertIntoTableABC(hc,entity, date); 
        createInsertIntoTableDEF(hc,entity, date); 
        createInsertIntoTableGHI(hc,entity,date); 
        createInsertIntoTableJKL(hc,entity, date); 
        createInsertIntoTableMNO(hc,entity,date); 
    

【问题讨论】:

【参考方案1】:

通常,您应该始终深入研究日志以找出真正的异常(至少在 Spark 1.3.1 中)。

tl;dr Yarn 下 Spark 的安全配置spark.shuffle.memoryFraction=0.5 - 这将允许 shuffle 使用更多分配的内存spark.yarn.executor.memoryOverhead=1024 - 以 MB 为单位设置。当 Yarn 的内存使用量大于(executor-memory + executor.memoryOverhead)时,纱线会杀死执行者

更多信息

通过阅读您的问题,您提到您得到 shuffle not found 异常。

如果发生 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 你应该增加spark.shuffle.memoryFraction,例如增加到0.5

Yarn 杀死我的执行程序的最常见原因是内存使用超出预期。 为避免您增加 spark.yarn.executor.memoryOverhead ,我将其设置为 1024,即使我的执行程序仅使用 2-3G 内存。

【讨论】:

Hmm Barak 对数据集进行重新分区以使每个分区包含更少的数据怎么样? @gsamaras 数据驻留在不同的内存区域,在 spark 1.3.1 中它不是动态的。因此,您实际上不会“释放”执行程序上的一些内存以进行随机播放。您必须明确增加洗牌区域。也就是说,如果您减少每个分区的数据,您可能在 map 端需要更小的 shuffle 内存,所以它可能会有所帮助。请记住,重新分区对过程有其他影响,因此我不会将其用作此特定问题的解决方案。这可能是个好主意,但这是一个更大的主题:)【参考方案2】:

这是我的假设:您的集群上的执行程序必须有限,并且作业可能在共享环境中运行。

如您所说,您的文件很小,您可以设置较少数量的执行器并增加执行器核心,此处设置memoryOverhead 属性很重要。

    设置执行者数 = 5 设置执行核心数 = 4 设置内存开销 = 2G shuffle partition = 20(根据执行程序和内核使用最大并行度)

使用上述属性,我相信您会在不影响性能的情况下避免任何执行程序内存不足的问题。

【讨论】:

以上是关于如何避免 Spark executor 丢失以及由于内存限制而导致纱线容器杀死它?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 遇上 Spark Streaming

大数据:Spark Standalone 集群调度如何创建分配Executors的资源

spark核心概念以及运行架构

Spark内部执行机制

Spark ShuffleExecutor是如何fetch shuffle的数据文件

Swift 中如何避免精度丢失