如何反转 RDD.takeOrdered() 的排序?

Posted

技术标签:

【中文标题】如何反转 RDD.takeOrdered() 的排序?【英文标题】:How to reverse ordering for RDD.takeOrdered()? 【发布时间】:2014-10-15 16:43:41 【问题描述】:

Spark中RDD的takeOrdered()方法倒序的语法是什么?

对于奖励积分,Spark 中 RDD 的自定义排序语法是什么?

【问题讨论】:

【参考方案1】:

倒序

val seq = Seq(3,9,2,3,5,4)
val rdd = sc.parallelize(seq,2)
rdd.takeOrdered(2)(Ordering[Int].reverse)

结果将是 Array(9,5)

自定义顺序

我们会按年龄对人进行排序。

case class Person(name:String, age:Int)
val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19))
val rdd = sc.parallelize(people,2)
rdd.takeOrdered(1)(Ordering[Int].reverse.on(x=>x.age))

结果将是 Array(Person(ann,32))

【讨论】:

要做升序排序,我们需要做 >>>rdd.takeOrdered(2) 或 >>>rdd.takeOrdered(2)(Ordering[Int]) 。默认情况下,它按升序排序。而是使用 top(2) 命令进行更优化的方式。 按升序订购自定义 >>>rdd.takeOrdered(1)(Ordering[Int].on(x=>x.age)).foreach(println)... 订购自定义对象的字符串字段 >>>rdd.takeOrdered(1)(Ordering[String].on(x=>x.name)).foreach(println)【参考方案2】:
val rdd1 = sc.parallelize(List(("Hadoop PIG Hive"), ("Hive PIG PIG Hadoop"), ("Hadoop Hadoop Hadoop")))

val rdd2 = rdd1.flatMap(x => x.split(" ")).map(x => (x,1))

val rdd3 = rdd2.reduceByKey((x,y) => (x+y))

//逆序(降序)

rdd3.takeOrdered(3)(Ordering[Int].reverse.on(x=>x._2))

输出:

res0: Array[(String, Int)] = Array((Hadoop,5), (PIG,3), (Hive,2))

//升序

rdd3.takeOrdered(3)(Ordering[Int].on(x=>x._2))

输出:

res1: Array[(String, Int)] = Array((Hive,2), (PIG,3), (Hadoop,5))

【讨论】:

【参考方案3】:

关于 K,V 对的字数类型问题。如果您想从订购列表中获取最后 10 个 -

SparkContext().parallelize(wordCounts.takeOrdered(10, lambda pair: -pair[1]))

【讨论】:

以上是关于如何反转 RDD.takeOrdered() 的排序?的主要内容,如果未能解决你的问题,请参考以下文章

PHP的排期软件开发

UIStackView - 三个具有相对宽度的排列子视图

为啥我所有的开放 MPI 进程的排名都是 0?

python 使用蟒蛇的排程和定时执行定时任务

python 使用蟒蛇的排程和定时执行定时任务

Mysql学习---MySQL悲观锁中的排它锁