spark通过合理设置spark.default.parallelism参数提高执行效率
Posted 奔跑-起点
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark通过合理设置spark.default.parallelism参数提高执行效率相关的知识,希望对你有一定的参考价值。
spark中有partition的概念(和slice是同一个概念,在spark1.2中官网已经做出了说明),一般每个partition对应一个task。在我的测试过程中,如果没有设置spark.default.parallelism参数,spark计算出来的partition非常巨大,与我的cores非常不搭。我在两台机器上(8cores *2 +6g * 2)上,spark计算出来的partition达到2.8万个,也就是2.9万个tasks,每个task完成时间都是几毫秒或者零点几毫秒,执行起来非常缓慢。在我尝试设置了 spark.default.parallelism 后,任务数减少到10,执行一次计算过程从minute降到20second。
参数可以通过spark_home/conf/spark-default.conf配置文件设置。
eg.
spark.master spark://master:7077 spark.default.parallelism 10 spark.driver.memory 2g spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.shuffle.partitions 50
下面是官网的相关描述:
from:http://spark.apache.org/docs/latest/configuration.html
Property Name | Default | Meaning |
---|---|---|
spark.default.parallelism | For distributed shuffle operations like reduceByKey and join , the largest number of partitions in a parent RDD. For operations likeparallelize with no parent RDDs, it depends on the cluster manager:
| Default number of partitions in RDDs returned by transformations like join , reduceByKey , and parallelize when not set by user. |
from:http://spark.apache.org/docs/latest/tuning.html
Level of Parallelism
Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile
, etc), and for distributed “reduce” operations, such as groupByKey
and reduceByKey
, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions
documentation), or set the config propertyspark.default.parallelism
to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.
以上是关于spark通过合理设置spark.default.parallelism参数提高执行效率的主要内容,如果未能解决你的问题,请参考以下文章
[Spark]What's the difference between spark.sql.shuffle.partitions and spark.default.parallelism?