TOP N

Posted playforever

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TOP N相关的知识,希望对你有一定的参考价值。

数据量较少的情况下:

scala> numrdd.sortBy(x=>x,false).take(3)

res17: Array[Int] = Array(100, 99, 98)

scala> numrdd.sortBy(x=>x,true).take(3)

res18: Array[Int] = Array(1, 2, 3)

数据相当大的情况下,当个服务器内存无法完成TOP N,由于数据比较大,spark从hdfs上读取数据,根据数据本地化的原则,数据根据加载到不同的节点上,我们可以使用mappartition获取每个分区的top N,然后再次排序获取整个数据文件的top N

scala> val numrdd=sc.makeRDD(1 to 10000000,20) // 例如有1KW的数字,当然实际中数值可能更大
numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24


scala> numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)}).collect
res2: Array[Int] = Array(5000000, 4999999, 4999998, 4999997, 4999996, 10000000, 9999999, 9999998, 9999997, 9999996, 15000000, 14999999, 14999998, 14999997, 14999996, 20000000, 19999999, 19999998, 19999997, 19999996, 25000000, 24999999, 24999998, 24999997, 24999996, 30000000, 29999999, 29999998, 29999997, 29999996, 35000000, 34999999, 34999998, 34999997, 34999996, 40000000, 39999999, 39999998, 39999997, 39999996, 45000000, 44999999, 44999998, 44999997, 44999996, 50000000, 49999999, 49999998, 49999997, 49999996, 55000000, 54999999, 54999998, 54999997, 54999996, 60000000, 59999999, 59999998, 59999997, 59999996, 65000000, 64999999, 64999998, 64999997, 64999996, 70000000, 69999999, 69999998, 69999997, 69999996, 75000000, 74999999, 74999998, 74999997, 74999996, 80000000, 79999999, 79999998, 7...

scala> val maprdd=numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)})
maprdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:26

 

scala> maprdd.sortBy(x=>x,false).take(5)

[Stage 3:==========================================>              (15 + 5) / 20]18/08/31 18:05:20 WARN spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 166889 ms exceeds timeout 120000 ms 18/08/31 18:05:25 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 3.0 (TID 53, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms [Stage 3:===================================================>     (18 + 2) / 20]18/08/31 18:06:19 WARN spark.HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 156368 ms exceeds timeout 120000 ms 18/08/31 18:06:23 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:23 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0 (TID 55, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:27 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 3.0 (TID 52, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms [Stage 3:===================================================>     (18 + 0) / 20]18/08/31 18:06:32 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. [Stage 3:===================================================>     (18 + 2) / 20]18/08/31 18:06:33 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/1 [Stage 4:>                                                         (0 + 2) / 20]18/08/31 18:06:44 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 18/08/31 18:06:45 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/0 [Stage 4:=========================================================(20 + 0) / 20]18/08/31 18:09:42 WARN master.Master: Removing worker-20180831175320-192.168.53.122-55296 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-59602 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-56119 because we got no heartbeat in 60 seconds 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 3 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 4 on 192.168.53.122: worker lost 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector [email protected] 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector [email protected] 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/3 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/2 18/08/31 18:10:01 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/4 18/08/31 18:10:19 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 94, 192.168.53.122, executor 5): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0         at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)         at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)         at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)         at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)         at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)         at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)         at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)         at org.apache.spark.scheduler.Task.run(Task.scala:108)         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)         at java.lang.Thread.run(Thread.java:745)

) res3: Array[Int] = Array(100000000, 99999999, 99999998, 99999997, 99999996)    

由于本机是单机测试,内存不足,重试多次后艰难完成。

 





以上是关于TOP N的主要内容,如果未能解决你的问题,请参考以下文章

R语言dplyr包的top_n函数返回dataframe或tibble的前N行数据dplyr包的top_frac函数返回dataframe或tibble的前百分之N(N%)的数据

top与with ties用法

Oracle取TOP N条记录

获取分组后的TOP 1和TOP N记录

sql计算前top n%的数据

MySQL获取分组后的TOP 1和TOP N记录