Apache Spark join 操作扩展性差

Posted

技术标签:

【中文标题】Apache Spark join 操作扩展性差【英文标题】:Poor weak scaling of Apache Spark join operation 【发布时间】:2017-10-04 14:34:25 【问题描述】:

我在 Apache Spark 上运行“join”操作,发现没有弱可扩展性。如果有人能解释一下,将不胜感激。

我创建了两个数据框(“a”、“b”)和(“a”、“c”),并通过第一列连接数据框。我为“一对一”连接生成数据框值。另外,我使用相同的分区器来避免随机播放。

数据帧中的行数 - 1024 * 1024 * 16 * cores_total(cores_total - 启动程序的内核总数)。 “a”列由随机 Int 值组成,“b”列的所有值等于 1,“c”列的所有值等于 2。

理论上,随着数据大小和核心数增加 64 倍,执行时间应该保持不变,但执行时间略有增长。我获得以下执行时间:

Apache Spark 版本 - 2.1.0。我们使用 8 个集群节点,配备 1 Gbit 以太网,每个节点有 2x Intel Xeon E5-2630,64 GB RAM。

/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner

object joinPerf 

    def get_array(n: Int): Array[Int] = 
        var res = Array[Int]()
        for (x <- 1 to n) 
            res :+= Random.nextInt
        

        return res
    

    def main(args: Array[String]) 
        val start_time = System.nanoTime
        val conf = new SparkConf().setAppName("joinPerf")
        val sc = new SparkContext(conf)
        val cores_total = sc.getConf.get("spark.cores.max").toInt
        val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext._
        import sqlContext.implicits._
        println("start")
        val elems_total = 1024 * 1024 * 16 * cores_total
        val start_cnt = 1024 * 1024
        Random.setSeed(785354)

        var vals = Vector[Int]()
        for (x <- 1 to start_cnt) 
            vals :+= Random.nextInt
        

        var test_rdd = sc.parallelize(vals)
        println(test_rdd.count)
        test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct

        println("test_rdd count = " + test_rdd.count)
        println("partitions count = " + test_rdd.getNumPartitions)

        var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
        var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache

        println("test_rdd1 count = " + test_rdd1.count)
        println("test_rdd2 count = " + test_rdd2.count)

        var start_test_time = System.nanoTime
        var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
        println(test_res.count)
        print("join time = ")
        println((System.nanoTime - start_test_time) / 1e9d + " sec. ")

        print("all time = ")
        println((System.nanoTime - start_time) / 1e9d + " sec. ")
        sc.stop()
    

配置参数:

spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max  1024
spark.kryo.unsafe                true
spark.kryo.referenceTracking     false
spark.driver.memory              22g
spark.executor.memory            22g
spark.driver.maxResultSize       22g
spark.rpc.message.maxSize        2047
spark.memory.fraction            0.8
spark.memory.storageFraction     0.5
spark.executor.extraJavaOptions  "-XX:+UseParallelGC"

每个核心的分区数 - 4。

启动程序示例:

./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar

【问题讨论】:

【参考方案1】:

理论上,随着数据大小和核数增加 64 倍,执行时间应该保持不变,但执行时间略有增长

不应该。虽然可以预期线性可伸缩性,假设没有 IO 瓶颈,但在对均匀分布的数据执行严格的本地操作时,情况不再是这样,当转换需要数据交换时(RDD shuffles,DatasetExchange)。在广泛的转换中,joins 属于最昂贵的类别(下一个类似groupByKey 的操作),因为它们具有非还原性,并且使用了大型的本地支持集合。

Shuffle 不仅具有高于线性复杂度(对于基于排序的方法至少 O(N log N)),而且会导致数据分布不均匀,并且需要大量磁盘和网络 IO。

这在您的代码的情况下更加严重,它会洗牌两次数据 - 一次重新分区 RDDs 和一次重新分区 join DatasetsHashPartitioner for RDDsDataset 分区不兼容)。

最后增加集群大小,有其自身的性能影响,与增加的通信和同步开销以及降低的数据局部性有关。

总体而言,您很少会看到真正的线性可扩展性,即使看到了,您也可以预期斜率

附带说明,在使用 Datasets 时,我不会依赖 cache - count 成语。 It is likely to be unreliable.

另见Spark: Inconsistent performance number in scaling number of cores

【讨论】:

以上是关于Apache Spark join 操作扩展性差的主要内容,如果未能解决你的问题,请参考以下文章

Spark join和cogroup算子

Apache Spark 中支持的七种 Join 类型简介

Apache Spark 中支持的七种 Join 类型简介

sparksql系列 sparksql列操作窗口函数join

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

从Apache Spark 2.3看大数据流式计算的发展趋势