spark中partitionBy和groupBy有啥区别

Posted

技术标签:

【中文标题】spark中partitionBy和groupBy有啥区别【英文标题】:What's the difference between partitionBy and groupBy in sparkspark中partitionBy和groupBy有什么区别 【发布时间】:2020-06-17 09:14:31 【问题描述】:

我有一个 pyspark rdd,它可以收集为如下的元组列表:

rdds = self.sc.parallelize([(("good", "spark"), 1), (("sood", "hpark"), 1), (("god", "spak"), 1),
                                (("food", "spark"), 1), (("fggood", "ssspark"), 1), (("xd", "hk"), 1),
                                (("good", "spark"), 7), (("good", "spark"), 3), (("good", "spark"), 4),
                                (("sood", "hpark"), 5), (("sood", "hpark"), 7), (("xd", "hk"), 2),
                                (("xd", "hk"), 1), (("fggood", "ssspark"), 2), (("fggood", "ssspark"), 1)], 6)
rdds.glom().collect()

def inner_map_1(p):
    d = defaultdict(int)
    for row in p:
        d[row[0]] += row[1]
    for item in d.items():
        yield item

rdd2 = rdds.partitionBy(4, partitionFunc=lambda x: hash(x)).mapPartitions(inner_map_1)
print(rdd2.glom().collect())

def inner_map_2(p):
    for row in p:
        item = row[0]
        sums = sum([num for _, num in row[1]])
        yield item, sums
rdd3 = rdds.groupBy(lambda x: x[0]).mapPartitions(inner_map_2)
print(rdd3.glom().collect())

rdd2 和 rdd3 以不同的形式计算,我得到相同的结果,但我不确定 rdd2 和 rdd3 得到相同的结果并且元素在同一个分区中是不是真的。

【问题讨论】:

这是一个典型的application of mapPartitions,并简要说明了它的工作原理。 【参考方案1】:

partitionBy 通常意味着你要散列分区键并将它们发送到 RDD 的特定分区。这会将具有匹配键的任何内容并置到同一个分区中,这在您需要所有匹配键在同一个位置的连接时很有用。 partitionBy 不会丢弃任何记录,它只会将匹配的键放在一起。

df.partitionBy("vendorId") // 保留的所有行现在位于同一个 rdd 分区中

groupBy 是一个 SQL 概念。如果找到键的所有唯一键组合。您还可以对具有相同键的所有记录执行聚合函数。例如,如果你想用相同的键计算所有记录,你可以这样做......

df.groupBy("vendorId").count.show

这将计算具有相同供应商 ID 的所有记录。与 partitionBy 不同,groupBy 往往会大大减少记录数。 (见基数)

我建议运行 df.groupBy("vendorId").explain(true)。这将打印出逻辑计划(认为 SQL 等效)和物理计划(火花将执行的确切操作集)。一般来说,spark 将 groupBy 转换为部分哈希聚合 -> shuffle(partition by key) -> 最终哈希聚合 -> 结果

【讨论】:

如果我把上面的元素相加,哪个更有效率? shuffle 不会自行对数据求和。它需要 HashAggregate 运算符来创建键及其聚合值的哈希表,在本例中为 sum。【参考方案2】:

我会说“groupBy”是一种更合乎逻辑的数据分组方式。它看起来像 SQL 中的“groupBy”。

“PartitionBy”更具物理性。您确实在集群中对数据进行了物理分区。

【讨论】:

当有人使用group by __ 子句时,后台会发生什么?即使数据可能驻留在不同的分区中,Spark 如何提供统一的逻辑视图? 触发随机播放。这意味着 Spark 将数据分布在集群中以将相同的键组合在一起。 Spark 需要它来进行计算。但你几乎无法控制。 我建议运行 df.groupBy("vendorId").explain(true)。这将打印出逻辑计划(认为 SQL 等效)和物理计划(火花将执行的确切操作集)。一般来说,spark 将 groupBy 转换为部分散列聚合 -> shuffle(按键分区) -> 最终散列聚合 是的,我也愿意进行实际的深入分析,而不是将某人的话视为理所当然。这两个选项的文档如何?

以上是关于spark中partitionBy和groupBy有啥区别的主要内容,如果未能解决你的问题,请参考以下文章

Spark 算子

如何在窗口 scala/spark 中使用 partitionBy 函数

为啥在 Spark 中重新分区比 partitionBy 快?

spark:区分大小写的 partitionBy 列

在 Spark 中使用 partitionBy 保存 CSV 文件 [重复]

在 spark DF 中使用 partitionBy 后是不是可以重新分区?