Spark 中 Distinct() 函数是如何工作的?

Posted

技术标签:

【中文标题】Spark 中 Distinct() 函数是如何工作的?【英文标题】:How does Distinct() function work in Spark? 【发布时间】:2015-06-20 23:47:13 【问题描述】:

我是 Apache Spark 的新手,正在学习基本功能。 有一个小疑问。假设我有一个元组(键,值)的RDD,并想从中获得一些独特的。我使用 distinct() 函数。我想知道该函数在什么基础上认为元组是不同的..?它是基于键,还是值,或两者兼而有之?

【问题讨论】:

【参考方案1】:

.distinct() 肯定是在跨分区进行洗牌。要了解更多正在发生的事情,请在您的 RDD 上运行 .toDebugString

val hashPart = new HashPartitioner(<number of partitions>)

val myRDDPreStep = <load some RDD>

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)

对于我拥有的 RDD 示例(myRDDPreStep 已按键进行哈希分区,由 StorageLevel.MEMORY_AND_DISK_SER 持久化并设置检查点),返回:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    |    ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
        |    myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
        |        CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
        |    myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]

请注意,可能有更有效的方法来获得不同,涉及更少的随机播放,特别是如果您的 RDD 已经以智能方式分区并且分区没有过度倾斜。

见Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? 和 Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

【讨论】:

【参考方案2】:

RDD.distinct() 的 API 文档只提供了一句话描述:

“返回一个包含此 RDD 中不同元素的新 RDD。”

根据最近的经验,我可以告诉你,在 tuple-RDD 中,会考虑整个 tuple。

如果您想要不同的键或不同的值,那么具体取决于您想要完成的任务,您可以:

A.调用 groupByKey()(k1,v11),(k1,v12),(k2,v21),(k2,v22) 转换为 (k1,[v11,v12]), (k2,[v21,v22]) ;或

B.通过调用keys()values() 后跟distinct() 删除键或值

截至撰写本文时(2015 年 6 月)UC Berkeley + EdX 正在运行免费在线课程Introduction to Big Data and Apache Spark,该课程将提供这些功能的动手实践。

【讨论】:

嗨,保罗!假设我们有一个 RDD 元组如下: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3 ,21), (3,22)..etc.,在这里您可以观察到键和值都在各种元组中重复。所以如果我在上面的RDD上应用 distinct() ,结果会是什么..?请稍等。谢谢!而且,是的,我正在网上学习该课程! :) 我现在没有时间,但是您可以使用 myRDD = sc.parallelize([ (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22) ]); 设置您自己的 RDD,这甚至可以在 Spark 课程中以前的 Lab notebook 中使用。然后运行myRDD.distinct().collect() to test the output【参考方案3】:

贾斯汀·皮奥尼是对的。 Distinct 使用对象的 hashCode 和 equals 方法进行此确定。它返回不同的元素(对象)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))

与众不同

rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)

如果你想对 key 应用 distinct。 在这种情况下,减少是更好的选择

减少

 val reduceRDD= rdd.map(tup =>
    (tup._1, tup)).reduceByKey  case (a, b) => a .map(_._2)

reduceRDD.collect().foreach(println)

输出:-

(2,20)
(1,20)
(3,21)

【讨论】:

【参考方案4】:

distinct 使用对象的hashCodeequals 方法进行此确定。元组内置了平等机制,下放了每个对象的平等和位置。因此,distinct 将作用于整个 Tuple2 对象。正如保罗指出的那样,您可以拨打keysvalues,然后拨打distinct。或者您可以通过aggregateByKey 编写您自己的不同值,这将保持密钥配对。或者,如果您想要不同的键,那么您可以使用常规的 aggregate

【讨论】:

【参考方案5】:

看起来distinct 将摆脱(键,值)重复项。

在下面的示例中,(1,20) 和 (2,20) 在 myRDD 中重复了两次,但在 distinct() 之后,重复项被删除。

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22

scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)

scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)

【讨论】:

以上是关于Spark 中 Distinct() 函数是如何工作的?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming 中的多重聚合和 Distinct 函数

Spark 上的 SQL:如何获取 DISTINCT 的所有值?

spark 例子count(distinct 字段)

Spark SQL .distinct()性能

在apache spark中使用distinct时出现***错误

Spark distinct算子