Spark RDD Action 简单用例

Posted alian

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RDD Action 简单用例相关的知识,希望对你有一定的参考价值。

foreach(f: T => Unit)

对RDD的所有元素应用f函数进行处理,f无返回值。
/**

* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit
scala> val rdd = sc.parallelize(1 to 9, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.foreach(x=>{println(x)})
[Stage 0:>                                                          (0 + 0) / 2]
1 2 3 4 5 6 7 8 9

foreachPartition(f: Iterator[T] => Unit)

遍历所有的分区进行f函数操作
/**

* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: Iterator[T] => Unit): Unit

scala> val rdd = sc.parallelize(1 to 9, 2)
scala> rdd.foreachPartition(x=>{
     | while(x.hasNext){
     | println(x.next)
     | }
     | println("===========")
     | }
     | )
1
2
3
4
===========
5
6
7
8
9
===========

getCheckpointFile

获取RDD checkpoint的目录.
/**
* Gets the name of the directory to which this RDD was checkpointed.
* This is not defined if the RDD is checkpointed locally.
*/
def getCheckpointFile: Option[String]

scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd.checkpoint

/*
checkpoint操作后直接查询得到None,说明checkpoint是lazy的
*/
scala> rdd.getCheckpointFile
res6: Option[String] = None

scala> rdd.count
res7: Long = 9                                                                  

scala> rdd.getCheckpointFile
res8: Option[String] = Some(file:/home/check/ca771099-b1bf-46c8-9404-68b4ace7feeb/rdd-1)

getNumPartitions

获取分区数量
/**
* Returns the number of partitions of this RDD.
*/
@Since("1.6.0")
final def getNumPartitions: Int = partitions.length
scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.getNumPartitions
res9: Int = 2

getStorageLevel

获取当前RDD的存储级别
/** Get the RDD‘s current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel: StorageLevel = storageLevel

scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.getStorageLevel
res10: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

scala> rdd.cache
res11: rdd.type = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.getStorageLevel
res12: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)

isCheckpointed

获取该RDD是否已checkpoint处理
/**

* Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean
scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.isCheckpointed
res13: Boolean = false

scala> rdd.checkpoint

scala> rdd.isCheckpointed
res15: Boolean = false

scala> rdd.count
res16: Long = 9

scala> rdd.isCheckpointed
res17: Boolean = true

isEmpty()

获取RDD是否为空,如果RDD为Nothing或Null,则抛出异常
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean

scala> val rdd = sc.parallelize(Seq())
rdd: org.apache.spark.rdd.RDD[Nothing] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd.isEmpty
org.apache.spark.SparkDriverExecutionException: Execution error
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1187)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1279)
  at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1413)
  at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413)
  at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1412)
  ... 48 elided
Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object;
  at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884)
  at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

scala> val rdd = sc.parallelize(Seq(1 to 9))
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> rdd.isEmpty
res19: Boolean = false

 

max()

/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
* @return the maximum element of the RDD
* */
def max()(implicit ord: Ordering[T]): T

min()

/**
* Returns the min of this RDD as defined by the implicit Ordering[T].
* @return the minimum element of the RDD
* */
def min()(implicit ord: Ordering[T]): T

scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> rdd.max
res21: Int = 9

scala> rdd.min
res22: Int = 1

reduce(f: (T, T) => T)

对RDD所有元素进行聚合运算
/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T

scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> def func(x:Int, y:Int):Int={
     | if(x >= y){
     | x
     | }else{
     | y}
     | }
func: (x: Int, y: Int)Int

scala> rdd.reduce(func(_,_))
res23: Int = 9


scala> rdd.reduce((x,y)=>{
     | if(x>=y){
     | x
     | }else{
     | y
     | }
     | }
     | )
res24: Int = 9

saveAsObjectFile(path: String)

将RDD保存指定目录下文件中
/**

* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String): Unit

scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.saveAsObjectFile("/home/check/object")


[[email protected] ~]# ls /home/check/object/
part-00000  _SUCCESS

saveAsTextFile(path: String)

将RDD保存至文本文件
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit

scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.saveAsTextFile("/home/check/text")
[[email protected] ~]# ls /home/check/text/part-00000 
/home/check/text/part-00000
[[email protected] ~]# more /home/check/text/part-00000 
1
2
3
4
5
6
7
8
9

take(num: Int)

返回前num个元素。
/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver‘s memory.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T]

scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> rdd.take(5)
res28: Array[Int] = Array(1, 2, 3, 4, 5)


takeOrdered(num: Int)

排序后返回前num个元素
scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> rdd.takeOrdered(3)
res30: Array[Int] = Array(1, 2, 3)

 

def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]

scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> rdd.takeSample(true,6,8)
res34: Array[Int] = Array(5, 2, 2, 5, 3, 2)

scala> rdd.takeSample(false,6,8)
res35: Array[Int] = Array(9, 3, 2, 6, 1, 5)

top(num: Int)

降序排列后返回top n
/*
* @param num k, the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T]

scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> rdd.top(3)
res37: Array[Int] = Array(9, 6, 5)

 




以上是关于Spark RDD Action 简单用例的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD Transformation 简单用例

Spark RDD Transformation 简单用例

Spark RDD Transformation 简单用例

Spark RDD Action操作

Spark 中Transformation Action操作 以及RDD的持久化

Spark 中Transformation Action操作 以及RDD的持久化