Apache Spark join 操作扩展性差
Posted
技术标签:
【中文标题】Apache Spark join 操作扩展性差【英文标题】:Poor weak scaling of Apache Spark join operation 【发布时间】:2017-10-04 14:34:25 【问题描述】:我在 Apache Spark 上运行“join”操作,发现没有弱可扩展性。如果有人能解释一下,将不胜感激。
我创建了两个数据框(“a”、“b”)和(“a”、“c”),并通过第一列连接数据框。我为“一对一”连接生成数据框值。另外,我使用相同的分区器来避免随机播放。
数据帧中的行数 - 1024 * 1024 * 16 * cores_total(cores_total - 启动程序的内核总数)。 “a”列由随机 Int 值组成,“b”列的所有值等于 1,“c”列的所有值等于 2。
理论上,随着数据大小和核心数增加 64 倍,执行时间应该保持不变,但执行时间略有增长。我获得以下执行时间:
Apache Spark 版本 - 2.1.0。我们使用 8 个集群节点,配备 1 Gbit 以太网,每个节点有 2x Intel Xeon E5-2630,64 GB RAM。
/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner
object joinPerf
def get_array(n: Int): Array[Int] =
var res = Array[Int]()
for (x <- 1 to n)
res :+= Random.nextInt
return res
def main(args: Array[String])
val start_time = System.nanoTime
val conf = new SparkConf().setAppName("joinPerf")
val sc = new SparkContext(conf)
val cores_total = sc.getConf.get("spark.cores.max").toInt
val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
println("start")
val elems_total = 1024 * 1024 * 16 * cores_total
val start_cnt = 1024 * 1024
Random.setSeed(785354)
var vals = Vector[Int]()
for (x <- 1 to start_cnt)
vals :+= Random.nextInt
var test_rdd = sc.parallelize(vals)
println(test_rdd.count)
test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct
println("test_rdd count = " + test_rdd.count)
println("partitions count = " + test_rdd.getNumPartitions)
var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache
println("test_rdd1 count = " + test_rdd1.count)
println("test_rdd2 count = " + test_rdd2.count)
var start_test_time = System.nanoTime
var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
println(test_res.count)
print("join time = ")
println((System.nanoTime - start_test_time) / 1e9d + " sec. ")
print("all time = ")
println((System.nanoTime - start_time) / 1e9d + " sec. ")
sc.stop()
配置参数:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 1024
spark.kryo.unsafe true
spark.kryo.referenceTracking false
spark.driver.memory 22g
spark.executor.memory 22g
spark.driver.maxResultSize 22g
spark.rpc.message.maxSize 2047
spark.memory.fraction 0.8
spark.memory.storageFraction 0.5
spark.executor.extraJavaOptions "-XX:+UseParallelGC"
每个核心的分区数 - 4。
启动程序示例:
./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar
【问题讨论】:
【参考方案1】:理论上,随着数据大小和核数增加 64 倍,执行时间应该保持不变,但执行时间略有增长
不应该。虽然可以预期线性可伸缩性,假设没有 IO 瓶颈,但在对均匀分布的数据执行严格的本地操作时,情况不再是这样,当转换需要数据交换时(RDD
shuffles,Dataset
Exchange
)。在广泛的转换中,joins
属于最昂贵的类别(下一个类似groupByKey
的操作),因为它们具有非还原性,并且使用了大型的本地支持集合。
Shuffle 不仅具有高于线性复杂度(对于基于排序的方法至少 O(N log N)),而且会导致数据分布不均匀,并且需要大量磁盘和网络 IO。
这在您的代码的情况下更加严重,它会洗牌两次数据 - 一次重新分区 RDDs
和一次重新分区 join
Datasets
(HashPartitioner
for RDDs
与 Dataset
分区不兼容)。
最后增加集群大小,有其自身的性能影响,与增加的通信和同步开销以及降低的数据局部性有关。
总体而言,您很少会看到真正的线性可扩展性,即使看到了,您也可以预期斜率
附带说明,在使用 Datasets
时,我不会依赖 cache
- count
成语。 It is likely to be unreliable.
另见Spark: Inconsistent performance number in scaling number of cores
【讨论】:
以上是关于Apache Spark join 操作扩展性差的主要内容,如果未能解决你的问题,请参考以下文章
sparksql系列 sparksql列操作窗口函数join
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段