Spark/Scala:仅使用 RDD 使用 ReduceByKey 创建嵌套结构

Posted

技术标签:

【中文标题】Spark/Scala:仅使用 RDD 使用 ReduceByKey 创建嵌套结构【英文标题】:Spark/Scala : Creating Nested Structure using ReduceByKey using RDD only 【发布时间】:2017-08-29 11:15:38 【问题描述】:

我只想使用 RDD 创建嵌套结构。我可以使用 groupBy 函数来做到这一点,该函数对于海量数据表现不佳。所以我想用reduceByKey来做,但我无法得到我想要的。任何帮助将不胜感激。

输入数据:

val sales=sc.parallelize(List(
  ("West",  "Apple",  2.0, 10),
  ("West",  "Apple",  3.0, 15),
  ("West",  "Orange", 5.0, 15),
  ("South", "Orange", 3.0, 9),
  ("South", "Orange", 6.0, 18),
  ("East",  "Milk",   5.0, 5)))

必需的输出是结构列表。我可以使用groupByKey 来做到这一点,如下所示:

sales.map(value => (value._1 ,(value._2,value._3,value._4  )) )
  .groupBy(_._1)
  .map  case(k,v) => (k, v.map(_._2)) 
  .collect()
  .foreach(println)

// (South,List((Orange,3.0,9), (Orange,6.0,18)))
// (East,List((Milk,5.0,5)))
// (West,List((Apple,2.0,10), (Apple,3.0,15), (Orange,5.0,15)))

但我想使用reduceByKey 实现同样的目的。我无法获得 List[Struct]。相反,我可以得到 List[List]。有没有办法获取 List[Struct]?

sales.map(value => (value._1 ,List(value._2,value._3,value._4)))
  .reduceByKey((a,b) => (a ++ b))
  .collect()
  .foreach(println)

// (South,List(Orange, 3.0, 9, Orange, 6.0, 18))
// (East,List(Milk, 5.0, 5))
// (West,List(Apple, 2.0, 10, Apple, 3.0, 15, Orange, 5.0, 15))

sales.map(value => (value._1 ,List(value._2,value._3,value._4)))
  .reduceByKey((a,b) =>(List(a) ++ List(b)))
  .collect()
  .foreach(println)

// (South,List(List(Orange, 3.0, 9), List(Orange, 6.0, 18)))
// (East,List(Milk, 5.0, 5))
// (West,List(List(List(Apple, 2.0, 10), List(Apple, 3.0, 15)), List(Orange, 5.0, 15)))

【问题讨论】:

【参考方案1】: 你不能 - reduceByKey 需要一个函数 (V, V) ⇒ V 因此它不能更改类型。参见例如Can reduceBykey be used to change type and combine values - Scala Spark? 您可以使用aggregateByKeycombineByKey,但它不会提高性能,因为您的流程不会减少数据量。参见例如Spark groupByKey alternative。

您可以获得一点(不需要临时对象):

sales.map(value => (value._1 ,(value._2,value._3,value._4)) ).groupByKey

【讨论】:

以上是关于Spark/Scala:仅使用 RDD 使用 ReduceByKey 创建嵌套结构的主要内容,如果未能解决你的问题,请参考以下文章

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

Spark / Scala - RDD填充最后一个非空值

如何在 spark scala 中加入 2 rdd

来自 RDD 映射的 Spark Scala 序列化错误

在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]