如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本

Posted

技术标签:

【中文标题】如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本【英文标题】:How to run memory intensive shell script from PySpark rdd.mapPartitions 【发布时间】:2021-08-09 20:43:37 【问题描述】:

假设我有一个带有 32gb RAM 节点的 Spark 集群。 1G 的执行器内存足以处理我的任何数据。

我需要为每个分区运行一个 Linux shell 程序(程序)。如果它是简单的 Linux 管道脚本,这听起来很容易,但该程序每次运行都需要 10GB 内存。我最初的假设是我可以将 executor 内存增加到 11GB,Spark 将为每个分区使用一个 executor 1G,另一个 10G 将分配给将在 executor 上下文中运行的程序。但事实并非如此。它使用 11GB 存储 1G 的 Spark 数据,然后在可用节点内存中运行 10GB 程序。

所以,我已将执行程序内存改回 1GB,并决定使用核心、实例和纱线。我试过使用: --executor_memory 1G --driver_memory 1G --executor_cores 1 --num_executors 1

对于 YARN 32GB -(10GB * 每个节点 2 个运行程序)= 12G - 4G 对于 OS = 8G * 1024M = : “yarn.nodemanager.resource.memory-mb”:“8192”, "yarn.scheduler.maximum-allocation-mb": "8192"

因为我每个执行器使用 1G,Spark 启动 8192 /(1024 * 1.18 开销)〜每个节点 6 个执行器。如果每个执行器都启动 10GB 的程序,那么肯定没有 RAM 来执行此操作。我增加了执行器内存以将每个节点的执行器数量减少到 2 个,其中执行器内存 = 3GB

现在每个节点运行 2 个执行器,但程序仍然因内存不足异常而失败。

我在启动程序之前添加了一个代码来检查可用内存

total_memory, used_memory, free_memory, shared_memory, cache, available_memory = map(
int, os.popen('free -t -m | grep Mem:').readlines()[0].split()[1:])

但是即使available_memory > 10G,程序也会启动,但它会在中间耗尽内存(运行大约4分钟)。

有没有办法为执行器节点上的外部脚本分配内存?或者也许有解决方法?

如果有任何帮助,我将不胜感激!!! 提前致谢, 奥卡

【问题讨论】:

【参考方案1】:

答案很简单。我的资源计算是正确的。我改变的只是

spark.dynamicAllocation.enabled=false

默认为true,Spark 尝试在每个节点上启动尽可能多的执行器。

【讨论】:

以上是关于如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本的主要内容,如果未能解决你的问题,请参考以下文章

如何从 pyspark 数据框中更快地保存 csv 文件?

如何使用其模式从 Pyspark 数据框创建配置单元表?

如何从 pyspark 中的本地 jar 导入包?

如何从 PySpark 中的 JavaSparkContext 获取 SparkContext?

如何从 PySpark 中的 spark.ml 中提取模型超参数?

如何从 pyspark.sql.function 中提取值?