Apache Spark:不同的不起作用?
Posted
技术标签:
【中文标题】Apache Spark:不同的不起作用?【英文标题】:Apache Spark: distinct doesnt work? 【发布时间】:2014-07-22 11:33:50 【问题描述】:这是我的代码示例:
case class Person(name:String,tel:String)
def equals(that:Person):Boolean = that.name == this.name && this.tel == that.tel
val persons = Array(Person("peter","139"),Person("peter","139"),Person("john","111"))
sc.parallelize(persons).distinct.collect
返回
res34: Array[Person] = Array(Person(john,111), Person(peter,139), Person(peter,139))
为什么 distinct 不起作用?我希望结果为 Person("john",111),Person("peter",139)
【问题讨论】:
不知道是不是因为“peter”和“perter”不一样? 标记为关闭,因为这似乎是由一个简单的印刷错误引起的。 在写这个问题时,打字错误很不幸。我在 Spark 上试过这个,确实是一个问题。我恢复了我之前的 -1 @kviiri 这是一个真实的(而且相当令人费解的)问题。你能恢复投票吗? 仅供参考 - 我针对 Spark 1.0.0 创建了一个错误 - issues.apache.org/jira/browse/SPARK-2620 【参考方案1】:从@aaronman 的观察进一步扩展,有一个解决这个问题的方法。
在 RDD 上,distinct
有两个定义:
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
从第一个 distinct
的签名中可以明显看出,元素的顺序必须是隐式的,如果不存在则假定为 null,这就是短版本 .distinct()
所做的。
案例类没有默认的隐式排序,但很容易实现:
case class Person(name:String,tel:String) extends Ordered[Person]
def compare(that: Person): Int = this.name compare that.name
现在,尝试相同的示例会产生预期的结果(注意我是在比较名称):
val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138"))
sc.parallelize(ps5).distinct.collect
res: Array[P5] = Array(P5(john,111), P5(peter,139))
请注意,案例类已经实现了equals
和hashCode
,因此提供的示例中的 impl 是不必要的,也是不正确的。 equals
的正确签名是:equals(arg0: Any): Boolean
- 顺便说一句,我首先认为问题与不正确的 equals 签名有关,这让我走错了路。
【讨论】:
在 spark-shell 1.0 中,我遵循您指导的每一步,但返回res2: Array[Person] = Array(Person(john,138), Person(peter,138), Person(peter,55))
@MrQuestion 你使用什么版本的 Spark?不管你用什么,我都会再试一次。
我已经用 spark 1.0.0 和 1.0.1 试过了,它们都不起作用【参考方案2】:
对我来说,问题与对象相等有关,正如 Martin Odersky 在 Scala 编程(第 30 章)中所提到的,尽管我有一个普通类(不是案例类)。对于正确的相等测试,如果您有自定义的 equals(),则必须重新定义(覆盖)hashCode()。你还需要一个 canEqual() 方法来保证 100% 的正确性。我没有查看 RDD 的实现细节,但由于它是一个集合,它可能使用 HashSet 或其他基于散列的数据结构的一些复杂/并行变体来比较对象并确保区别。
声明 hashSet()、equals()、canEqual() 和 compare() 方法解决了我的问题:
override def hashCode(): Int =
41 * (41 + name.hashCode) + tel.hashCode
override def equals(other: Any) = other match
case other: Person =>
(other canEqual this) &&
(this.name == other.name) && (this.tel == other.tel)
case _ =>
false
def canEqual(other: Any) = other.isInstanceOf[Person]
def compare(that: Person): Int =
this.name compare that.name
【讨论】:
【参考方案3】:正如其他人指出的那样,这是 spark 1.0.0 中的一个错误。我关于它来自哪里的理论是,如果您查看 1.0.0 到 9.0 的差异,您会看到
- def repartition(numPartitions: Int): RDD[T] =
+ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
如果你跑了
case class A(i:Int)
implicitly[Ordering[A]]
你得到一个错误
<console>:13: error: No implicit Ordering defined for A.
implicitly[Ordering[A]]
所以我认为解决方法是为案例类定义一个隐式排序,不幸的是我不是 scala 专家,但这个 answer seems to do it correctly
【讨论】:
@MrQuestion 老实说这只是一个猜测,我不完全确定隐式解析在 scala 中是如何工作的以上是关于Apache Spark:不同的不起作用?的主要内容,如果未能解决你的问题,请参考以下文章
Apache spark - 窗口函数,FIRST_VALUE 不起作用
UICollectionViewCell 上的不需要的 UIView(TapGesture 不起作用)