如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本
Posted
技术标签:
【中文标题】如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本【英文标题】:How to run memory intensive shell script from PySpark rdd.mapPartitions 【发布时间】:2021-08-09 20:43:37 【问题描述】:假设我有一个带有 32gb RAM 节点的 Spark 集群。 1G 的执行器内存足以处理我的任何数据。
我需要为每个分区运行一个 Linux shell 程序(程序)。如果它是简单的 Linux 管道脚本,这听起来很容易,但该程序每次运行都需要 10GB 内存。我最初的假设是我可以将 executor 内存增加到 11GB,Spark 将为每个分区使用一个 executor 1G,另一个 10G 将分配给将在 executor 上下文中运行的程序。但事实并非如此。它使用 11GB 存储 1G 的 Spark 数据,然后在可用节点内存中运行 10GB 程序。
所以,我已将执行程序内存改回 1GB,并决定使用核心、实例和纱线。我试过使用: --executor_memory 1G --driver_memory 1G --executor_cores 1 --num_executors 1
对于 YARN 32GB -(10GB * 每个节点 2 个运行程序)= 12G - 4G 对于 OS = 8G * 1024M = : “yarn.nodemanager.resource.memory-mb”:“8192”, "yarn.scheduler.maximum-allocation-mb": "8192"
因为我每个执行器使用 1G,Spark 启动 8192 /(1024 * 1.18 开销)〜每个节点 6 个执行器。如果每个执行器都启动 10GB 的程序,那么肯定没有 RAM 来执行此操作。我增加了执行器内存以将每个节点的执行器数量减少到 2 个,其中执行器内存 = 3GB
现在每个节点运行 2 个执行器,但程序仍然因内存不足异常而失败。
我在启动程序之前添加了一个代码来检查可用内存
total_memory, used_memory, free_memory, shared_memory, cache, available_memory = map(
int, os.popen('free -t -m | grep Mem:').readlines()[0].split()[1:])
但是即使available_memory > 10G,程序也会启动,但它会在中间耗尽内存(运行大约4分钟)。
有没有办法为执行器节点上的外部脚本分配内存?或者也许有解决方法?
如果有任何帮助,我将不胜感激!!! 提前致谢, 奥卡
【问题讨论】:
【参考方案1】:答案很简单。我的资源计算是正确的。我改变的只是
spark.dynamicAllocation.enabled=false
默认为true
,Spark 尝试在每个节点上启动尽可能多的执行器。
【讨论】:
以上是关于如何从 PySpark rdd.mapPartitions 运行内存密集型 shell 脚本的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PySpark 中的 JavaSparkContext 获取 SparkContext?