SparkRDD常用算子

Posted lishisan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkRDD常用算子相关的知识,希望对你有一定的参考价值。

sparkRDD算子

分区/只读/依赖/缓存/checkPoint

Transformation

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

scala> var source  = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

flatmap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> sourceFlat.collect()
res11: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26

scala> flatMap.collect()
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

mapPartition(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

mapPartitionWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

sample(withReplacement,fraction,seed)

以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)

tabkesample

和Sample的区别是:takeSample返回的是最终的结果集合。

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28

scala> rdd3.collect()
res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> val rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28

scala> rdd3.collect()
[Stage 15:=============================>(2 + 2)
res19: Array[Int] = Array(5, 6, 7)

distinct(numTasks)

对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24

scala> val unionRDD = distinctRdd.distinct()
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26

scala> unionRDD.collect()
[Stage 16:> (0 + 4) [Stage 16:=============================>(2 + 2) 
res20: Array[Int] = Array(1, 9, 5, 6, 2)

scala> val unionRDD = distinctRdd.distinct(2)
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26

scala> unionRDD.collect()
res21: Array[Int] = Array(6, 2, 1, 9, 5)

partitionBy

对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD.

reduceByKey(fun,[numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置

groupByKey

groupByKey也是对每个key进行操作,但只生成一个sequence。

combinerByKey[C]

对相同K,把V合并成一个集合.

aggregateBykey()

在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果****

foldByKey()

aggregateByKey的简化操作,seqop和combop相同

sortByKey([ascending],[numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> rdd.sortByKey(true).collect()
res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

scala> rdd.sortByKey(false).collect()
res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

sortBy(func,[ascending],[numTasks])

与sortByKey类似,但是更灵活,可以用func先对数据进行处理,按照处理后的数据比较结果排序。

scala> val rdd = sc.parallelize(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)

join(otherDataset,[numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> rdd.join(rdd1).collect()
res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

cogroup(otherDataset,[numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

cartsian(otherDataset)

笛卡尔积

pipe(command,[envVars])

对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD

Shell脚本
#!/bin/sh
echo "AA"
while read LINE; do
   echo ">>>"$LINE
done
注意:shell脚本需要集群中的所有节点都能访问到。
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24

scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24

scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

pipe.sh:
#!/bin/sh
echo "AA"
while read LINE; do
   echo ">>>"$LINE
done

coalesce(numPartitions)

缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24

scala> rdd.partitions.size
res20: Int = 4

scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26

scala> coalesceRDD.partitions.size
res21: Int = 3

repartition(numPartitions)

根据分区数重新通过网络对数据随机洗牌

repartitionAndSortWithPartition(partitioner)

repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。

golm

将每一个分区形成一个数组,形成新的RDD类型RDD[Array[T]]

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

mapValues

针对(K,V)形式的类型只对V进行操作

scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24

scala> rdd3.mapValues(_+"|||").collect()
res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

subtract

计算差的一中函数去除两个RDD中相同的元素,不同的RDD保存下来

scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24

scala> rdd.subtract(rdd1).collect()
res27: Array[Int] = Array(8, 6, 7)

Action

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

collect

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

tabkeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n)

返回前几个的排序

aggregate(zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)

aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

fold(num)(func)

折叠操作,aggregate的简化操作,seqop和combop一样。

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path)

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path)

用于将RDD中的元素序列化成对象,存储到文件中。

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

数值RDD统计操作

方法 含义
count() RDD中元素个数
mean() 元素的平均值
sum() 总和
max() 最大值
min() 最小值
variance() 元素的平方差
sampleVariance() 采样计算出平方差
stdev() 标准差
sampleStdev() 采样计算出标准差

读数据与保存数据的主要方式

文本文件输入输出

scala> sc.textFile("./README.md")
res6: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[7] at textFile at <console>:25

scala> val readme = sc.textFile("./README.md")
readme: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[9] at textFile at <console>:24

scala> readme.collect()
res7: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster...
scala> readme.saveAsTextFile("hdfs://node01:8020/test")

JSON文件输入输出

scala> import org.json4s._  
import org.json4s._

scala> import org.json4s.jackson.JsonMethods._  
import org.json4s.jackson.JsonMethods._

scala> import org.json4s.jackson.Serialization  
import org.json4s.jackson.Serialization

scala> var result = sc.textFile("examples/src/main/resources/people.json")
result: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.json MapPartitionsRDD[7] at textFile at <console>:47

scala> implicit val formats = Serialization.formats(ShortTypeHints(List())) 
formats: org.json4s.Formatsval dateFormat: org.json4s.DateFormat; val typeHints: org.json4s.TypeHints = org.json4s.Serialization$$anon$1@61f2c1da

scala>  result.collect()
res3: Array[String] = Array("name":"Michael", "name":"Andy", "age":30, "name":"Justin", "age":19)

CSV文件输入输出

SequenceFile文件输入输出

scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala>  data.saveAsSequenceFile("hdfs://node01:8020/sequdata")
scala> val sdata = sc.sequenceFile[Int,String]("hdfs://node01:8020/sequdata/*")
sdata: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at sequenceFile at <console>:24

scala> sdata.collect()
res14: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))

对象文件输入输出

scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> data.saveAsObjectFile("hdfs://node01:8020/objfile")
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> val objrdd =sc.objectFile[(Int,String)]("hdfs://node01:8020/objfile/p*")
objrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[28] at objectFile at <console>:25

scala> objrdd.collect()
res20: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))

Hadoop输入输出格式

scala> import org.apache.hadoop.io._
import org.apache.hadoop.io._
scala> val data = sc.parallelize(Array((30,"hadoop"), (71,"hive"), (11,"cat")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at parallelize at <console>:35

scala> data.saveAsNewAPIHadoopFile("hdfs://node01:8020/output4/",classOf[LongWritable] ,classOf[Text] ,classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[LongWritable, Text]])

文件系统的输入输出

Spark 支持读写很多种文件系统, 像本地文件系统、Amazon S3、HDFS等。

数据库的输入输出

mysql读取

def main (args: Array[String] ) 
  val sparkConf = new SparkConf ().setMaster ("local[2]").setAppName ("JdbcApp")
  val sc = new SparkContext (sparkConf)

  val rdd = new org.apache.spark.rdd.JdbcRDD (
    sc,
    () => 
      Class.forName ("com.mysql.jdbc.Driver").newInstance()
      java.sql.DriverManager.getConnection ("jdbc:mysql://localhost:3306/rdd", "root", "hive")
    ,
    "select * from rddtable where id >= ? and id <= ?;",
    1,
    10,
    1,
    r => (r.getInt(1), r.getString(2)))

  println (rdd.count () )
  rdd.foreach (println (_) )
  sc.stop ()

MySQL写入

def main(args: Array[String]) 
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)
  val data = sc.parallelize(List("Female", "Male","Female"))

  data.foreachPartition(insertData)


def insertData(iterator: Iterator[String]): Unit = 
Class.forName ("com.mysql.jdbc.Driver").newInstance()
  val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/rdd", "root", "admin")
  iterator.foreach(data => 
    val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
    ps.setString(1, data) 
    ps.executeUpdate()
  )

Hbase数据库读取

def main(args: Array[String]) 
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)

  val conf = HBaseConfiguration.create()
  //HBase中的表名
  conf.set(TableInputFormat.INPUT_TABLE, "fruit")

  val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])

  val count = hBaseRDD.count()
  println("hBaseRDD RDD Count:"+ count)
  hBaseRDD.cache()
  hBaseRDD.foreach 
    case (_, result) =>
      val key = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
      val color = Bytes.toString(result.getValue("info".getBytes, "color".getBytes))
      println("Row key:" + key + " Name:" + name + " Color:" + color)
  
  sc.stop()

Hbase数据库写入

def main(args: Array[String]) 
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)

  val conf = HBaseConfiguration.create()
  val jobConf = new JobConf(conf)
  jobConf.setOutputFormat(classOf[TableOutputFormat])
  jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")

  val fruitTable = TableName.valueOf("fruit_spark")
  val tableDescr = new HTableDescriptor(fruitTable)
  tableDescr.addFamily(new HColumnDescriptor("info".getBytes))

  val admin = new HBaseAdmin(conf)
  if (admin.tableExists(fruitTable)) 
    admin.disableTable(fruitTable)
    admin.deleteTable(fruitTable)
  
  admin.createTable(tableDescr)

  def convert(triple: (Int, String, Int)) = 
    val put = new Put(Bytes.toBytes(triple._1))
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
    (new ImmutableBytesWritable, put)
  
  val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
  val localData = initialRDD.map(convert)

  localData.saveAsHadoopDataset(jobConf)

以上是关于SparkRDD常用算子的主要内容,如果未能解决你的问题,请参考以下文章

SparkRDD操作具体解释4——Action算子

sparkRDD 算子的创建和使用

SparkRDD操作具体解释2——值型Transformation算子

Spark常用算子合集一文搞定spark中的常用转换与行动算子

Spark常用算子合集一文搞定spark中的常用转换与行动算子

Spark算子系列第0篇:spark常用算子详解