Spark算子介绍

Posted 啊啊啊西吧

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子介绍相关的知识,希望对你有一定的参考价值。

Spark对RDD的操作可以整体分为两类: Transformation和Action

  • 转换操作(Transformation) (如:map,filter,groupBy,sortBy,join等),转换操作也叫懒操作,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到转换操作时只会记录需要这样的操作,并不会去执行,需要等到有执行操作的时候才会真正启动计算过程进行计算。 Transformation算子根据输入参数,又可细分为处理Value型和处理Key-Value型的。

    • Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
    • Key-Value数据类型的Transfromation算子,这种变换并不触发提交 作业,针对处理的数据项是Key-Value型的数据对。
  • 执行操作(Action) (如:count,collect,save,reduce等),执行操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子,例如:map 算子、reduce算子

Transformatio算子:

Action算子:

wordCount代码示例

// 第一步:创建SparkContext
val conf = new SparkConf() 
conf.setAppName("WordCountScala") // 设置任务名称
	.setMaster("local") // local表示在本地执行 
// 第二步:加载数据
var path = "~/hello.txt" 
if(args.length==1){
  path = args(0)
}
// 这里通过textFile()方法,针对外部文件创建了一个RDD,linesRDD,实际上,程序执行到这只是创建了一个指向文件的引用,并没有执行
val linesRDD = sc.textFile(path)
// 第三步:对数据进行切割,把一行数据切分成一个一个的单词 
// 这里通过flatMap算子对linesRDD进行了转换操作,把每一行数据中的单词切开,获取了一个逻辑上的wordsRDD
val wordsRDD = linesRDD.flatMap(_.split(" "))
// 第四步:迭代words,将每个word转化为(word,1)这种形式 
// 这个操作和前面分析的flatMap的操作是一样的,最终获取了一个逻辑上的pairRDD,此时里面没有任何数据
val pairRDD = wordsRDD.map((_,1))
// 第五步:根据key(其实就是word)进行分组聚合统计 
// 这个操作也是和前面分析的flatMap操作是一样的,最终获取了一个逻辑上的wordCountRDD,
val wordCountRDD = pairRDD.reduceByKey(_ + _)
// 第六步:将结果打印到控制台 
// 这行代码执行了一个action操作,foreach,此时会触发之前所有transformation算子的执行 
// 注意:只有当任务执行到这一行代码的时候任务才会真正开始执行计算,如果任务中没有这一行代码,前面的所有算子不会执行
wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))
// 第七步:停止SparkContext 
sc.stop()

Transformation代码示例

object TransformationOpScala {
	def main(args: Array[String]): Unit = { 
		val sc = getSparkContext 
		// map:对集合中每个元素乘以2 
		mapOp(sc) 
		// filter:过滤出集合中的偶数 
		filterOp(sc) 
		// flatMap:将行拆分为单词
		flatMapOp(sc) 
		// groupByKey:对每个大区的主播进行分组
		groupByKeyOp(sc) 
		groupByKeyOp2(sc) 
		// reduceByKey:统计每个大区的主播数量 
		reduceByKeyOp(sc) 
		// sortByKey:对主播的音浪收入排序
		sortByKeyOp(sc) 
		// join:打印每个主播的大区信息和音浪收入 
		joinOp(sc) 
		// distinct:统计当天开播的大区信息
		distinctOp(sc)
		sc.stop() 
	}
	
	def distinctOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004, "IN")))
		// 由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息 
		dataRDD.map(_._2).distinct().foreach(println(_))
	}
	
	def joinOp(sc: SparkContext): Unit = {
	    val dataRDD1 = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004, "IN")))
	    val dataRDD2 = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004, 100)))
	    
	    val joinRDD = dataRDD1.join(dataRDD2)
		joinRDD.foreach(tup=>{ 
			// 用户id
			val uid = tup._1
			val area_gold = tup._2
			// 大区
			val area = area_gold._1 //音浪收入
			val gold = area_gold._2 
			println(uid+"\\t"+area+"\\t"+gold)
		}) 
	}

	def sortByKeyOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004, 100)))  
		// 由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置互换 	
		dataRDD.map(tup=>(tup._2,tup._1))
			.sortByKey(false) // 默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false
			.foreach(println(_))
		// sortBy的使用:可以动态指定排序的字段,比较灵活 
		dataRDD.sortBy(_._2,false).foreach(println(_))
	}

	def reduceByKeyOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004, "IN"))) 
		// 由于这个需求只需要使用到大区信息,所以在map操作的时候只保留大区信息即可 
		// 为了计算大区的数量,所以在大区后面拼上了1,组装成了tuple2这种形式,这样就可以 
		dataRDD.map(tup=>(tup._2,1)).reduceByKey(_ + _).foreach(println(_))
	}


	def groupByKeyOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004, "IN")))
		// 需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作 		
		// 此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value 
		// 注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2的形式 
		// 此时map算子之后生成的新的数据格式是这样的:("US",150001) 
		// 如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2 
		dataRDD.map(tup=>(tup._2,tup._1)).groupByKey().foreach(tup=>{
			// 获取大区信息
			val area = tup._1 
			print(area+":") 
			// 获取同一个大区对应的所有用户id val it = tup._2
			for(uid <- it){
		      print(uid+" ")
		    }
			println() 
		})
	}


	def groupByKeyOp2(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array((150001,"US","male"),(150002,"CN","female"),(150003,"CN","male"),(150004, "IN","male")))
		// 如果tuple中的数据列数超过了2列怎么办? 
		// 把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下 
		// 此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male")) 
		// 注意:如果你的数据结构比较复杂,你可以在执行每一个算子之后都调用foreach打印一 
		dataRDD.map(tup=>(tup._2,(tup._1,tup._3))).groupByKey().foreach(tup=>{
			//获取大区信息
			val area = tup._1
			print(area+":") 
			// 获取同一个大区对应的所有用户id和性别信息 val it = tup._2
			for((uid,sex) <- it){
		    	print("<"+uid+","+sex+"> ")
		    }
			println() 
		})
	}

	def flatMapOp(sc: SparkContext): Unit = {
	 	val dataRDD = sc.parallelize(Array("good good study","day day up"))
		dataRDD.flatMap(_.split(" ")).foreach(println(_))
	}
	
	def filterOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array(1,2,3,4,5))
		dataRDD.filter(_ % 2 ==0).foreach(println(_))
	}
	
	def mapOp(sc: SparkContext): Unit ={
		val dataRDD = sc.parallelize(Array(1,2,3,4,5))
		dataRDD.map(_ * 2).foreach(println(_))
	}
	
	/**
	* 获取SparkContext
	*/
	private def getSparkContext = {
	    val conf = new SparkConf()
	    conf.setAppName("TransformationOpScala")
	      .setMaster("local")
	    new SparkContext(conf)
	}
}

Action代码示例

import org.apache.spark.{SparkConf, SparkContext}


object ActionOpScala {
	def main(args: Array[String]): Unit = {
    	val sc = getSparkContext
	    // reduce:聚合计算
	    reduceOp(sc)
	    // collect:获取元素集合
	    collectOp(sc)
	    // take(n):获取前n个元素
	    takeOp(sc)
	    // count:获取元素总数
	    countOp(sc)
	    // saveAsTextFile:保存文件
	    saveAsTextFileOp(sc)
	    // countByKey:统计相同的key出现多少次
	    countByKeyOp(sc)
	    // foreach:迭代遍历元素
	    foreachOp(sc)
	
	    sc.stop()
	}

	def foreachOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array(1,2,3,4,5))
		//注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的
		//实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach
		//在里面实现具体向外部输出数据的代码
		dataRDD.foreach(println(_))
	}

  	def countByKeyOp(sc: SparkContext): Unit = {
	    val daraRDD = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))
	    //返回的是一个map类型的数据
	    val res = daraRDD.countByKey()
	    for((k,v) <- res){
	    	println(k+","+v)
	    }
  	}

  	def saveAsTextFileOp(sc: SparkContext): Unit = {
	    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
	    //指定HDFS的路径信息即可,需要指定一个不存在的目录
	    dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out001")
  	}

	def countOp(sc: SparkContext): Unit = {
		val dataRDD = sc.parallelize(Array(1,2,3,4,5))
	    val res = dataRDD.count()
	    println(res)
    }

	def takeOp(sc: SparkContext): Unit = {	
		val dataRDD = sc.parallelize(Array(1,2,3,4,5))
  		//从RDD中获取前2个元素
  		val res = dataRDD.take(2)
  		for(item <- res){
    		println(item)
 		}
	}

	def collectOp(sc: SparkContext): Unit = {
	    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
	    //collect返回的是一个Array数组
	    //注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
	    //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
	    val res = dataRDD.collect()
   		for(item <- res){
     			println(item)
   		}
  	}

	def reduceOp(sc: SparkContext): Unit = {
	    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
	    val num = dataRDD.reduce(_ + _)
	    println(num)
	}

  	def getSparkContext = {
	    val conf = new SparkConf()
	    conf.setAppName("ActionOpScala")
	      .setMaster("local")
	    new SparkContext(conf)
  	}
}

以上是关于Spark算子介绍的主要内容,如果未能解决你的问题,请参考以下文章

Spark——RDD算子

Spark算子介绍

Spark算子介绍

Spark算子介绍

Spark算子介绍

Spark从入门到精通18:RDD常用高级算子介绍