Spark Executor数量设置

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Executor数量设置相关的知识,希望对你有一定的参考价值。

参考技术A 我们知道Spark application的运行单元是task,资源分配单元是executor。task数的多少是和RDD的分区数相关的,整个application的并行度是 Executor数 * Task。这里整理一下executor的个数是如何设置的。

我们通常都使用spark-submit 来提交任务,对于不同的部署模式,需要使用不同的参数来指定executor数,实际生产环境中最常使用的部署模式就是 ON YARN 和 Standalone两种模式。

ON YARN模式下可以使用选项 –num-executors 来直接设置application的executor数,该选项默认值是2.。 该选项对应的配置参数是 spark.executor.instances

如下,我们可以在启动spark-shell时指定executor数

通过web监控页面可以看到有5个executor

在standalone模式下,并没有参数直接指定使用多少个executor数。而是按照如下方式计算而来

spark.cores.max 默认没有设置,这时它使用的是 spark.deploy.defaultCores,而这个的默认值是Int.max,也就是不限制,这样应用就会获取所有可用的CPU资源,所以为了限制应用程序使用CPU资源,用户需要设置spark.core.max配置项,约束每个应用程序所能申请的最大CPU核数。
spark.executor.cores 该参数用以设置每个executor使用的CPU资源,在 ON YARN模式下,默认是1,而standalone模式下是worker节点上所有可用的CPU的资源,显然在实际生产环境中,这样做也不合理,因此需要设置spark.executor.cores配置项,用于设置在standalone模式下每个Executor使用的CPU核数。

最后补充说明一下单个executor的资源配置,也就是一个executor使用的CPU和内存资源

选项 –executor-memory 可以指定每个executor的内存,默认是 1G,对应的配置项是 spark.executor.memory,该配置项默认单位是MB,也可以显示指定单位 (如2g,8g)
选项 –executor-cores (适用于ON YARN和standalone模式) 可以指定每个executor的内存,对应的配置项是 spark.executor.cores

工作常用之Spark调优[二资源调优

2 章 资源调优 2.1 资源规划 2.1.1 资源设定考虑 1 、总体原则 以单台服务器 128G 内存, 32 线程为例。 先设定单个 Executor 核数,根据 Yarn 配置得出每个节点最多的 Executor 数量,每个节 点的 yarn 内存 / 每个节点数量 = 单个节点的数量 总的 executor = 单节点数量 * 节点数。 2 、具体提交参数 1 executor-cores 每个 executor 的最大核数。根据经验实践,设定在 3~6 之间比较合理。 2 num-executors 该参数值 = 每个节点的 executor * work 节点数 每个 node executor = 单节点 yarn 总核数 / 每个 executor 的最大 cpu 核数 考虑到系统基础服务和 HDFS 等组件的余量, yarn.nodemanager.resource.cpu-vcores 置为: 28 ,参数 executor-cores 的值为: 4 ,那么每个 node executor = 28/4 = 7, 假设集 群节点为 10 ,那么 num-executors = 7 * 10 = 70 3 executor-memory 该参数值 =yarn-nodemanager.resource.memory-mb / 每个节点的 executor 数量 如果 yarn 的参数配置为 100G ,那么每个 Executor 大概就是 100G/7 14G, 同时要注意 yarn 配置中每个容器允许的最大内存是否匹配。 2.1.2 内存估算

 

估算 Other 内存 = 自定义数据结构 * 每个 Executor 核数 估算 Storage 内存 = 广播变量 + cache/Executor 数量 估算 Executor 内存 = 每个 Executor 核数 * (数据集大小 / 并行度) 2.1.3 调整内存配置项 一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分 配,可以按照如下思路: spark.memory.fraction= (估算 storage 内存 + 估算 Execution 内存) / (估算 storage 内存 + 估算 Execution 内存 + 估算 Other 内存)得到 spark.memory.storageFraction = (估算 storage 内存) / (估算 storage 内存 + 估算 Execution 内存) 代入公式计算: Storage 堆内内存 =(spark.executor.memory 300MB)*spark.memory.fraction*spark.memory.storageFraction Execution 堆内内存 = (spark.executor.memory 300MB)*spark.memory.fraction*(1-spark.memory.storageFraction) 2.1 持久化和序列化 2.1.1 RDD 1 cache

 

打成 jar ,提交 yarn 任务 , 并在 yarn 界面查看 spark ui spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.cache. RddCacheDemo spark-tuning-1.0-SNAPSHOT-jar with-dependencies.jar

 

通过 spark ui 看到, rdd 使用默认 cache 缓存级别,占用内存 2.5GB, 并且 storage 内存 还不够,只缓存了 29% 2 kryo+ 序列化缓存 使用 kryo 序列化并且使用 rdd 序列化缓存级别。使用 kryo 序列化需要修改 spark 的序 列化模式,并且需要进程注册类操作。 打成 jar 包在 yarn 上运行。 spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.cache. RddCacheKryoDemo spark-tuning-1.0-SNAPSHOT jar-with-dependencies.jar 查看 storage 所占内存,内存占用减少了 1083.6mb 并且缓存了 100% 。使用序列化缓 存配合 kryo 序列化,可以优化存储内存占用。

 

 

 

根据官网的描述,那么可以推断出,如果 yarn 内存资源充足情况下,使用默认级别 MEMORY_ONLY 是对 CPU 的支持最好的。但是序列化缓存可以让体积更小,那么当 yarn 存资源不充足情况下可以考虑使用 MEMORY_ONLY_SER 配合 kryo 使用序列化缓存。 2.1.2 DataFrame DataSet 1 cache 提交任务,在 yarn 上查看 spark ui ,查看 storage 内存占用。内存使用 612.3mb spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.cache. DatasetCacheDemo spark-tuning-1.0-SNAPSHOT jar-with-dependencies.jar 

 

DataSet cache 默认缓存级别与 RDD 不一样,是 MEMORY_AND_DISK 源码: Dataset.cache() -> Dataset.persist() -> CacheManager.cacheQuery()

 

2 、序列化缓存 DataSet 类似 RDD ,但是并不使用 JAVA 序列化也不使用 Kryo 序列化,而是使用一种特 有的编码器进行序列化对象。

 

打成 jar 包,提交 yarn 。查看 spark ui,storage 占用内存 646.2mb spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.cache. DatasetCacheSerDemo spark-tuning-1.0- SNAPSHOT-jar-with-dependencies.jar

 

和默认 cache 缓存级别差别不大。所以 DataSet 可以直接使用 cache 从性能上来讲, DataSet,DataFrame 大于 RDD ,建议开发中使用 DataSet DataFrame 2.2 CPU 优化 2.2.1 CPU 低效原因 1 、概念理解 1 )并行度 spark.default.parallelism

 

设置 RDD 的默认并行度,没有设置时,由 join reduceByKey parallelize 等转换决定。 spark.sql.shuffle.partitions 适用 SparkSQL 时, Shuffle Reduce 阶段默认的并行度,默认 200 。此参数只能控制 Spark sql DataFrame DataSet 分区个数。不能控制 RDD 分区个数

 

2 )并发度:同时执行的 task 2 CPU 低效原因 1 )并行度较低、数据分片较大容易导致 CPU 线程挂起 2 )并行度过高、数据过于分散会让调度开销更多 Executor 接收到 TaskDescription 之后,首先需要对 TaskDescription 反序列化才能读取任 务信息,然后将任务代码再反序列化得到可执行代码,最后再结合其他任务信息创建 TaskRunner 。当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据 量却少之又少,就 CPU 消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与 之分庭抗礼。显然,在这种情况下, CPU 的有效利用率也是极低的。 2.2.2 合理利用 CPU 资源 每个并行度的数据量(总数据量 / 并行度) 在( Executor 内存 /core /2, Executor 内存 /core 数)区间 提交执行: spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 4 --executor-memory 6g --class com.atguigu.sparktuning.partition.PartitionDemo spark-tuning-1.0- SNAPSHOT-jar-with-dependencies.jar 去向 yarn 申请的 executor vcore 资源个数为 12 个( num-executors*executor-cores , 果不修改 spark sql 分区个数,那么就会像上图所展示存在 cpu 空转的情况。这个时候需要 合理控制 shuffle 分区个数。如果想要让任务运行的最快当然是一个 task 对应一个 vcore, 是一般不会这样设置,为了合理利用资源,一般会将并行度( task 数)设置成并发度 vcore 数)的 2 倍到 3 倍。 修改参数 spark.sql.shuffle.partitions (默认 200 , 根据我们当前任务的提交参数有 12 vcore ,将此参数设置为 24 36 为最优效果: spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 4 --executor-memory 6g --class com.atguigu.sparktuning.partition.PartitionTuning spark-tuning-1.0- SNAPSHOT-jar-with-dependencies.jar

以上是关于Spark Executor数量设置的主要内容,如果未能解决你的问题,请参考以下文章

spark 并行度

spark executor 数量

Spark面试题——Spark的内存管理机制

生产常用Spark累加器剖析之二

Spark:如何指定持有 RDD 的 executor 数量?

Yarn 是不是根据我们在 spark-submit 命令中传递的 executor 数量为 application master 分配一个容器