将 groupByKey 转换为 reduceByKey

Posted

技术标签:

【中文标题】将 groupByKey 转换为 reduceByKey【英文标题】:Convert a groupByKey to reduceByKey 【发布时间】:2017-10-18 02:57:18 【问题描述】:

我读到reducebyKey 是大型数据集的更好选择,可以减少数据的混洗,从而提高性能。

我正在尝试转换我对groupByKey 的使用。首先它必须被转换为 rdd 为:

val linksNew = links.map(convertToRelationship)
  .flatMap(bidirRelationship)

links 是一个数据集,而数据集 api 没有 reduceByKey。当使用reduceByKey 时,.groupByKey(_._1) 的等价物是什么?

val linksfinal = linksNew.rdd.reduceByKey(???)

实际代码:

val biLinks = links
  .map(convertToRelationship)
  .flatMap(bidirRelationship)
  .groupByKey(_._1)
  .reduceGroups((left, right) => combineBidirerRelationships(left,right))
  .map(_._2._2)

数据集的架构,就在使用groupByKey(_._1)之前:

数据集中的一些实际数据:

【问题讨论】:

您的数据在groupByKey()之前的样子如何? @Shaido 像这样DataSet(String, Relationship) 然后应该跟 groupByKey 一样,KeyValueGroupedDataSet[String, (String, Relationship)] 添加了一个我相信会起作用的答案,虽然我没有测试它,告诉我它是如何工作的。 @Shaido 添加了一张图片,这两个添加了,col1: String, col2: Relationship 当我打印我们要转换的架构时,它们是struct1struct2,例如struct2 是一个数据创建的关系案例类 你能把show()的输出也加进去吗? 【参考方案1】:

不确定它是否更有效,但是应该可以转换为reduceByKey,因为您在groupByKey 之后直接执行reduceGroups。使用提供的部分代码的简短示例:

val biLinks = links
  .map(convertToRelationship)
  .flatMap(bidirRelationship)
  .rdd
  .maprow => (row.getAs[String](0), row.getAs[Relationship](1)) // See explanation below 
  .reduceByKey((left, right) => combineBidirerRelationships(left, right))
  .map(_._2._2)

根据使用 .rdd 后数据框的外观,可能需要进行额外的转换。从数据帧转换时,生成的 rdd 将是 RDD[Row]。然而,reduceByKey() 需要 RDD[(A,B)] 类型的元组 rdd,其中 AB 是类型(它们本身也可以是元组)。


rdd.map(...) 转换如何与structs 一起使用的简短示例:

case class Relationship(a: Long, b: Long)
val df = spark.createDataFrame(Seq((1, Relationship(3L, 2L)), (2, Relationship(20L, 7L)))).toDF()
val rdd = df.rdd.map row => (row.getAs[String](0), row.getAs[Relationship](1))

这给出了所需的元组 rdd 类型,此处为 RDD[(String, Relationship)]

【讨论】:

这两个添加,col1: String, col2: Relationship 当我打印我们要转换的架构时,它们是 struct1 和 striuct2,例如 struct2 是创建的关系案例类的数据?【参考方案2】:

我读到 reducebyKey 是大型数据集的更好选择,可以减少在 reduce 端的 shuffle 和/或 shuffle,并提高性能。

事实并非如此。您正在混淆 groupByKey 具有不同语义的“旧”RDD API。

Dataset API 中,groupByKey + reduceGroups 使用与旧 API 中的 reduceByKey 类似的执行模型。事实上,转换为 RDD 使用效率较低的 shuffle 机制并且成本非常高,所以你只会让它变得更糟。

【讨论】:

以上是关于将 groupByKey 转换为 reduceByKey的主要内容,如果未能解决你的问题,请参考以下文章

将 groupByKey 转换为 reduceByKey

如何改善数据流管道中的低吞吐量 groupbykey

Spark DataFrame 的通用“reduceBy”或“groupBy + aggregate”功能

Apache Spark 转换:groupByKey vs reduceByKey vs aggregateByKey

如何将 RDD[Row] 转换为 RDD[Vector]

spark 例子groupByKey分组计算