Spark RDD groupByKey + join vs join 性能

Posted

技术标签:

【中文标题】Spark RDD groupByKey + join vs join 性能【英文标题】:Spark RDD groupByKey + join vs join performance 【发布时间】:2015-10-24 21:13:29 【问题描述】:

我在与其他用户共享的集群上使用 Spark。因此,仅根据运行时间来判断我的哪个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据,从而使我的代码执行更长时间。

所以我可以在这里问两个问题:

    我正在使用join 函数加入2 RDDs,并且我在使用join 之前尝试使用groupByKey(),如下所示:

    rdd1.groupByKey().join(rdd2)
    

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 使我的查询运行得更快。由于 Spark 使用惰性评估,我想知道 groupByKeyjoin 之前是否会使事情变得更快

    我注意到Spark有一个SQL模块,到现在还真没时间去尝试,请问SQL模块和RDD SQL类函数有什么区别?

【问题讨论】:

以后请不要同时发两个问题。 我认为现在编辑第二个问题还为时不晚。 @Cherry Wu,你能接受吗? 您希望 rdd1 和 rdd2 中都有重复的键吗? @DanielDarabos,我不介意您是否添加第二个问题。我发现 Spark SQL 描述 spark.apache.org/docs/latest/sql-programming-guide.html, Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. 在这里,我猜“更多信息”意味着数据像关系数据表一样组织 【参考方案1】:

    没有充分的理由让groupByKey 后跟join 比单独的join 更快。如果rdd1rdd2 没有分区器或分区器不同,那么限制因素就是HashPartitioning 所需的简单改组。

    通过使用groupByKey,您不仅会通过保留分组所需的可变缓冲区来增加总成本,而且更重要的是,您使用了额外的转换,这会导致更复杂的 DAG。 groupByKey + join:

    rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
    rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
    rdd1.groupByKey().join(rdd2)
    

    对比join一个人:

    rdd1.join(rdd2)
    

    最后,这两个计划甚至不相等,要获得相同的结果,您必须在第一个计划中添加一个额外的 flatMap

    这是一个相当广泛的问题,但要强调主要区别:

    PairwiseRDDs 是任意Tuple2 元素的同类集合。对于默认操作,您希望 key 以有意义的方式可散列,否则对类型没有严格要求。相比之下,DataFrame 表现出更多的动态类型,但每列只能包含来自supported set of defined types 的值。可以定义UDT,但仍然必须使用基本的来表示。

    DataFrames 使用 Catalyst Optimizer 生成逻辑和物理执行计划,并且可以生成高度优化的查询,而无需应用手动低级优化。基于 RDD 的操作只是遵循依赖 DAG。这意味着在没有自定义优化的情况下性能更差,但对执行的控制要好得多,并且有一些微调的潜力。

其他需要阅读的内容:

Difference between DataFrame and RDD in Spark Why spark.ml don't implement any of spark.mllib algorithms?

【讨论】:

【参考方案2】:

我基本同意 zero323 的回答,但我认为 有理由期望 joingroupByKey 之后会更快。 groupByKey 减少数据量,按key对数据进行分区。这两个都有助于后续join 的性能。

我认为前者(减少数据大小)并不重要。为了获得后者(分区)的好处,您需要以相同的方式对其他 RDD 进行分区。

例如:

val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect

【讨论】:

看起来在 PySpark 中联合后分区(join 单独)与分区后联合(groupByKey + join 分区 b)之间的区别。我不确定是否有收获。 啊,你是对的。但可能是您的其他 RDD 已经以正确的方式分区,因此您不必计算该工作的成本。至少这是我们经常发生的事情。

以上是关于Spark RDD groupByKey + join vs join 性能的主要内容,如果未能解决你的问题,请参考以下文章

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和区别

spark-groupByKey

Spark:使用 reduceByKey 而不是 groupByKey 和 mapByValues