有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?

Posted

技术标签:

【中文标题】有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?【英文标题】:Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? 【发布时间】:2015-06-26 21:00:05 【问题描述】:

我有一个太大的 RDD,无法在没有虚假错误的情况下始终如一地执行不同的语句(例如,SparkException 阶段失败 4 次、ExecutorLostFailure、HDFS 文件系统关闭、达到最大执行器故障数、由于 SparkContext 关闭而取消阶段等.)

我正在尝试计算特定列中的不同 ID,例如:

print(myRDD.map(a => a._2._1._2).distinct.count())

是否有一种简单、一致、较少洗牌密集的方式来执行上述命令,可能使用 mapPartitions、reduceByKey、flatMap 或其他使用较少洗牌而不是 distinct 的命令?

另见What are the Spark transformations that causes a Shuffle?

【问题讨论】:

rdd.mapPartitions(xs => xs.toSet.toIterator) 之类的开头可能会有所改进。 这是一个类似的问题吗? ***.com/questions/31081563/… 另见***.com/questions/30959955/… 【参考方案1】:

最好弄清楚是否还有其他潜在问题,但下面会做你想做的事......而不是绕过方法,但听起来它适合你的账单:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count

甚至这似乎可行,但它不是关联和可交换的。它的工作原理取决于 Spark 的内部工作原理……但我可能会遗漏一个案例……所以虽然更简单,但我不确定我是否信任它:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count

【讨论】:

我相信根本问题只是数据大小......我们有一个非常大的集群,有很多节点和一个 Yarn 调度程序,并且 distinct 命令在 99% 的时间都可以正常工作,但在非常大的情况下数据集我的程序崩溃并带有非常模糊的错误消息(有关示例,请参见上面的问题帖子),所以我认为崩溃是由于过大的随机播放造成的。我会尝试这些解决方案,看看它们是否能以更低的随机播放时间缩短运行时间——谢谢! 您的第二个示例似乎不起作用,因为 aggregateByKey 仅适用于 PairRDD 而不是一般 RDD。我尝试将您的地图功能更改为 (a => (a._2._1._2, 1L)),但现在该行的其余部分不起作用。 @GlennStrycker 糟糕,已修复 我正在使用以下内容进行测试:myRDD.map(a => (a._2._1._2, a._2._1._2)).aggregateByKey(0L)(( x,y)=>y.toLong, (x,y)=>x.toLong).keys.count 这是另一个想法:我已经使用哈希分区器将某些键放在某些节点上。如果我对这个新键进行重新分区,aggregateByKey 或 reduceByKey 是否使我能够在分区内执行“不同”调用,但忽略任何 shuffle?【参考方案2】:

在我看来,这个问题有两种可能的解决方案:

    使用 reduceByKey 带有 ma​​pPartitions

让我们用一个例子来看看它们。

我有一个包含 100.000 个电影评分的数据集,格式为 (idUser, (idMovie, rating))。假设我们想知道有多少不同的用户为一部电影评分:

让我们先看看使用 distinct

val numUsers = rddSplitted.keys.distinct()
println(s"numUsers is $numUsers.count()")
println("*******toDebugString of rddSplitted.keys.distinct*******")
println(numUsers.toDebugString)

我们会得到以下结果:

numUsers is 943

*******toDebugString of rddSplitted.keys.distinct*******
(2) MapPartitionsRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 |  ShuffledRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 +-(2) MapPartitionsRDD[4] at distinct at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[3] at keys at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

使用 toDebugString 函数,我们可以更好地分析我们的 RDD 发生了什么。

现在,我们以 reduceByKey 为例,计算每个用户投票的次数,同时获取不同用户的数量:

val numUsers2 = rddSplitted.map(x => (x._1, 1)).reduceByKey(case (a, b) => a )
println(s"numUsers is $numUsers2.count()")
println("*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******")
println(numUsers2.toDebugString)

我们现在将得到这些结果:

numUsers is 943

*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******
(2) ShuffledRDD[4] at reduceByKey at MovieSimilaritiesRicImproved.scala:104 []
 +-(2) MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:104 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

分析产生的RDD,我们可以看到reduceByKey比之前的distinct更高效地执行相同的动作。

最后,让我们使用 ma​​pPartitions。主要目标是首先尝试区分我们数据集的每个分区中的用户,然后获得最终的不同用户。

val a1 = rddSplitted.map(x => (x._1))
println(s"Number of elements in a1: $a1.count")
val a2 = a1.mapPartitions(x => x.toList.distinct.toIterator)
println(s"Number of elements in a2: $a2.count")
val a3 = a2.distinct()
println("There are "+ a3.count()+" different users")
println("*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******")
println(a3.toDebugString)

我们将得到以下信息:

Number of elements in a1: 100000
Number of elements in a2: 1709
There are 943 different users

*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******
(2) MapPartitionsRDD[7] at distinct at MovieSimilaritiesRicImproved.scala:124 []
 |  ShuffledRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:124 []
 +-(2) MapPartitionsRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:124 []
    |  MapPartitionsRDD[4] at mapPartitions at MovieSimilaritiesRicImproved.scala:122 []
    |  MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:120 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

我们现在可以看到,mapPartition 首先在数据集的每个分区中获取不同的用户数,将实例数从 100,000 缩短到 1,709 而不执行任何 shuffle。然后,有了这么少的数据量,就可以对整个 RDD 进行distinct,而无需担心 shuffle 并更快地获得结果。

我建议将最后一个提议与 mapPartitions 一起使用,而不是 reduceByKey,因为它管理的数据量较少。另一种解决方案可能是同时使用这两个函数,首先是前面提到的 mapPartitions,然后不是 distinct,使用 reduceByKey 的方式与前面提到的相同之前。

【讨论】:

以上是关于有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Spark 中的 Slaves 内存中创建 RDD?

对RDD进行Spark重复数据删除以获得更大的RDD

在 Spark Streaming 中刷新 RDD

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

在 python Spark 中组合 2 个 RDD

如何通过 pyspark 以 gzip 格式保存 spark RDD