通过键对元组进行归约和求和
Posted
技术标签:
【中文标题】通过键对元组进行归约和求和【英文标题】:Reduce and sum tuples by key 【发布时间】:2019-03-24 16:28:21 【问题描述】:在我的 Spark Scala 应用程序中,我有一个格式如下的 RDD:
(05/05/2020, (name, 1))
(05/05/2020, (name, 1))
(05/05/2020, (name2, 1))
...
(06/05/2020, (name, 1))
我想做的是按日期对这些元素进行分组,并对与键具有相同“名称”的元组求和。
预期输出:
(05/05/2020, List[(name, 2), (name2, 1)]),
(06/05/2020, List[(name, 1)])
...
为了做到这一点,我目前正在使用groupByKey
操作和一些额外的转换,以便按键对元组进行分组并计算共享相同元组的总和。
出于性能原因,我想将这个groupByKey
操作替换为reduceByKey
或aggregateByKey
,以减少通过网络传输的数据量。
但是,我不知道该怎么做。这两种转换都将值之间的函数(在我的例子中为元组)作为参数,所以我看不到如何通过键对元组进行分组以计算它们的总和。
可行吗?
【问题讨论】:
【参考方案1】:以下是使用 reduceByKey
合并元组的方法:
/**
File /path/to/file1:
15/04/2010 name
15/04/2010 name
15/04/2010 name2
15/04/2010 name2
15/04/2010 name3
16/04/2010 name
16/04/2010 name
File /path/to/file2:
15/04/2010 name
15/04/2010 name3
**/
import org.apache.spark.rdd.RDD
val filePaths = Array("/path/to/file1", "/path/to/file2").mkString(",")
val rdd: RDD[(String, (String, Int))] = sc.textFile(filePaths).
map line =>
val pair = line.split("\\t", -1)
(pair(0), (pair(1), 1))
rdd.
map case (k, (n, v)) => (k, Map(n -> v)) .
reduceByKey (acc, m) =>
acc ++ m.map case (n, v) => (n -> (acc.getOrElse(n, 0) + v))
.
map(x => (x._1, x._2.toList)).
collect
// res1: Array[(String, List[(String, Int)])] = Array(
// (15/04/2010, List((name,3), (name2,2), (name3,2))), (16/04/2010, List((name,2)))
// )
请注意,需要初始映射,因为我们要将元组合并为 Map
中的元素,而 RDD[K, V] 的 reduceByKey 在转换前后需要相同的数据类型 V
:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
【讨论】:
您的解决方案,虽然最小化似乎不起作用。在您提供的那个特别简单的示例中,它可以工作。但是,我从不同的文件中读取输入数据,奇怪的是,这些计算仅适用于从第一个文件中获取的数据。 我进一步对此进行了测试,似乎当我读取 2 个不同的文件(通过 sc.textfile)时,分区数为 2。看来您的方法仅适用于第一个分区。当我将我的 rdd 重新分区为 1 时,它可以工作,但这不是最佳的。 @matrix,我无法复制上述问题。还测试了将更大的 RDD 显式加载到多个分区中的代码,并按预期跨分区聚合。如您所见,建议的解决方案仅以最简单的形式使用常见的 RDD 方法map
和 reduceByKey
。您能否分享导致错误结果的确切代码(以及示例数据,如果可能)的最小版本?
我创建了一个重现该问题的小应用程序。 (在 cmets 你会看到每个文件的数据)你可以在这里找到它:pastebin.com/E7s2zcw4
@matrix,感谢您提供能够证明上述问题的示例代码和数据。在某些情况下,该异常似乎是由reduceByKey
中的函数未在分区之间正确处理引起的。这个问题是如此微妙,以至于它不一定会通过拥有多个分区而浮出水面。当它确实浮出水面时,也不是只处理第一个分区。在这一点上,我倾向于认为它是一个错误。将函数重写为使用Map ++ Map
操作,而不是Map + element
,似乎可以解决问题。请看我修改后的答案。【参考方案2】:
是的.aggeregateBykey()
可以这样使用:
import scala.collection.mutable.HashMap
def merge(map: HashMap[String, Int], element: (String, Int)) =
if(map.contains(element._1)) map(element._1) += element._2 else map(element._1) = element._2
map
val input = sc.parallelize(List(("05/05/2020",("name",1)),("05/05/2020", ("name", 1)),("05/05/2020", ("name2", 1)),("06/05/2020", ("name", 1))))
val output = input.aggregateByKey(HashMap[String, Int]())(
//combining map & tuple
case (map, element) => merge(map, element)
,
// combining two maps
case (map1, map2) =>
val combined = (map1.keySet ++ map2.keySet).map i=> (i,map1.getOrElse(i,0) + map2.getOrElse(i,0)) .toMap
collection.mutable.HashMap(combined.toSeq: _*)
).mapValues(_.toList)
学分:Best way to merge two maps and sum the values of same key?
【讨论】:
【参考方案3】:您可以将 RDD 转换为 DataFrame,然后将 groupBy 与 sum 一起使用,这是一种方法
import org.apache.spark.sql.types._
val schema = StructType(StructField("date", StringType, false) :: StructField("name", StringType, false) :: StructField("value", IntegerType, false) :: Nil)
val rd = sc.parallelize(Seq(("05/05/2020", ("name", 1)),
("05/05/2020", ("name", 1)),
("05/05/2020", ("name2", 1)),
("06/05/2020", ("name", 1))))
val df = spark.createDataFrame(rd.map case (a, (b,c)) => Row(a,b,c),schema)
df.show
+----------+-----+-----+
| date| name|value|
+----------+-----+-----+
|05/05/2020| name| 1|
|05/05/2020| name| 1|
|05/05/2020|name2| 1|
|06/05/2020| name| 1|
+----------+-----+-----+
val sumdf = df.groupBy("date","name").sum("value")
sumdf.show
+----------+-----+----------+
| date| name|sum(value)|
+----------+-----+----------+
|06/05/2020| name| 1|
|05/05/2020| name| 2|
|05/05/2020|name2| 1|
+----------+-----+----------+
【讨论】:
以上是关于通过键对元组进行归约和求和的主要内容,如果未能解决你的问题,请参考以下文章