通过键对元组进行归约和求和

Posted

技术标签:

【中文标题】通过键对元组进行归约和求和【英文标题】:Reduce and sum tuples by key 【发布时间】:2019-03-24 16:28:21 【问题描述】:

在我的 Spark Scala 应用程序中,我有一个格式如下的 RDD:

(05/05/2020, (name, 1))
(05/05/2020, (name, 1))
(05/05/2020, (name2, 1))
...
(06/05/2020, (name, 1))

我想做的是按日期对这些元素进行分组,并对与键具有相同“名称”的元组求和。

预期输出:

(05/05/2020, List[(name, 2), (name2, 1)]),
(06/05/2020, List[(name, 1)])
...

为了做到这一点,我目前正在使用groupByKey 操作和一些额外的转换,以便按键对元组进行分组并计算共享相同元组的总和。

出于性能原因,我想将这个groupByKey 操作替换为reduceByKeyaggregateByKey,以减少通过网络传输的数据量。

但是,我不知道该怎么做。这两种转换都将值之间的函数(在我的例子中为元组)作为参数,所以我看不到如何通过键对元组进行分组以计算它们的总和。

可行吗?

【问题讨论】:

【参考方案1】:

以下是使用 reduceByKey 合并元组的方法:

/**
File /path/to/file1:
15/04/2010  name
15/04/2010  name
15/04/2010  name2
15/04/2010  name2
15/04/2010  name3
16/04/2010  name
16/04/2010  name

File /path/to/file2:
15/04/2010  name
15/04/2010  name3
**/

import org.apache.spark.rdd.RDD

val filePaths = Array("/path/to/file1", "/path/to/file2").mkString(",")

val rdd: RDD[(String, (String, Int))] = sc.textFile(filePaths).
  map line =>
    val pair = line.split("\\t", -1)
    (pair(0), (pair(1), 1))
  

rdd.
  map case (k, (n, v)) => (k, Map(n -> v)) .
  reduceByKey (acc, m) =>
    acc ++ m.map case (n, v) => (n -> (acc.getOrElse(n, 0) + v)) 
  .
  map(x => (x._1, x._2.toList)).
  collect
// res1: Array[(String, List[(String, Int)])] = Array(
//   (15/04/2010, List((name,3), (name2,2), (name3,2))), (16/04/2010, List((name,2)))
// )

请注意,需要初始映射,因为我们要将元组合并为 Map 中的元素,而 RDD[K, V] 的 reduceByKey 在转换前后需要相同的数据类型 V

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

【讨论】:

您的解决方案,虽然最小化似乎不起作用。在您提供的那个特别简单的示例中,它可以工作。但是,我从不同的文件中读取输入数据,奇怪的是,这些计算仅适用于从第一个文件中获取的数据。 我进一步对此进行了测试,似乎当我读取 2 个不同的文件(通过 sc.textfile)时,分区数为 2。看来您的方法仅适用于第一个分区。当我将我的 rdd 重新分区为 1 时,它可以工作,但这不是最佳的。 @matrix,我无法复制上述问题。还测试了将更大的 RDD 显式加载到多个分区中的代码,并按预期跨分区聚合。如您所见,建议的解决方案仅以最简单的形式使用常见的 RDD 方法 mapreduceByKey。您能否分享导致错误结果的确切代码(以及示例数据,如果可能)的最小版本? 我创建了一个重现该问题的小应用程序。 (在 cmets 你会看到每个文件的数据)你可以在这里找到它:pastebin.com/E7s2zcw4 @matrix,感谢您提供能够证明上述问题的示例代码和数据。在某些情况下,该异常似乎是由reduceByKey 中的函数未在分区之间正确处理引起的。这个问题是如此微妙,以至于它不一定会通过拥有多个分区而浮出水面。当它确实浮出水面时,也不是只处理第一个分区。在这一点上,我倾向于认为它是一个错误。将函数重写为使用Map ++ Map 操作,而不是Map + element,似乎可以解决问题。请看我修改后的答案。【参考方案2】:

是的.aggeregateBykey()可以这样使用:

import scala.collection.mutable.HashMap

def merge(map: HashMap[String, Int], element: (String, Int)) = 
 if(map.contains(element._1)) map(element._1) += element._2 else map(element._1) = element._2
 map


val input = sc.parallelize(List(("05/05/2020",("name",1)),("05/05/2020", ("name", 1)),("05/05/2020", ("name2", 1)),("06/05/2020", ("name", 1))))

val output = input.aggregateByKey(HashMap[String, Int]())(
  //combining map & tuple   
  case (map, element) => merge(map, element) 
, 
  // combining two maps 
  case (map1, map2) => 
   val combined = (map1.keySet ++ map2.keySet).map  i=> (i,map1.getOrElse(i,0) + map2.getOrElse(i,0)) .toMap
   collection.mutable.HashMap(combined.toSeq: _*)
   
).mapValues(_.toList)

学分:Best way to merge two maps and sum the values of same key?

【讨论】:

【参考方案3】:

您可以将 RDD 转换为 DataFrame,然后将 groupBy 与 sum 一起使用,这是一种方法

import org.apache.spark.sql.types._
val schema = StructType(StructField("date", StringType, false) :: StructField("name", StringType, false) ::  StructField("value", IntegerType, false) :: Nil)

val rd = sc.parallelize(Seq(("05/05/2020", ("name", 1)),
("05/05/2020", ("name", 1)),
("05/05/2020", ("name2", 1)),
("06/05/2020", ("name", 1))))

val df = spark.createDataFrame(rd.map case (a, (b,c)) => Row(a,b,c),schema)
df.show

+----------+-----+-----+
|      date| name|value|
+----------+-----+-----+
|05/05/2020| name|    1|
|05/05/2020| name|    1|
|05/05/2020|name2|    1|
|06/05/2020| name|    1|
+----------+-----+-----+

val sumdf = df.groupBy("date","name").sum("value")
sumdf.show

+----------+-----+----------+
|      date| name|sum(value)|
+----------+-----+----------+
|06/05/2020| name|         1|
|05/05/2020| name|         2|
|05/05/2020|name2|         1|
+----------+-----+----------+

【讨论】:

以上是关于通过键对元组进行归约和求和的主要内容,如果未能解决你的问题,请参考以下文章

python 对元组列表中的每个值求和

如何对元组列表进行分组?

根据元组的值对元组列表中的重复元组进行平均

如何根据某些文本标准对元组列表进行分组/存储?

从 Sqlite 表中选择行的元组并有效地对元组进行排序

如何按两个元素对元组列表进行排序?