Spark基础编程学习02

Posted Weikun Xing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础编程学习02相关的知识,希望对你有一定的参考价值。

文章目录

输出单科成绩为100分的学生ID

使用union()合并多个RDD

scala> val rdd1=sc.parallelize(List(('a',1),('b',2),('c',3)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2=sc.parallelize(List(('a',1),('d',4),('e',5)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd1.union(rdd2).collect
res0: Array[(Char, Int)] = Array((a,1), (b,2), (c,3), (a,1), (d,4), (e,5))  

使用filter()进行过滤

scala> val rdd1=sc.parallelize(List(('a',1),('b',2),('c',3)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd1.filter(_._2>1).collect
res1: Array[(Char, Int)] = Array((b,2), (c,3))

scala> rdd1.filter(x=>x._2>1).collect
res2: Array[(Char, Int)] = Array((b,2), (c,3))

使用distinct()进行去重

scala> val rdd=sc.makeRDD(List(('a',1),('b',1),('a',1),('c',1)))
rdd: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at makeRDD at <console>:24

scala> rdd.distinct().collect
res3: Array[(Char, Int)] = Array((b,1), (a,1), (c,1))

简单的集合操作

intersection()

找出两个RDD的交集

scala> val c_rdd1=sc.parallelize(List(('a',1),('b',1),('a',1),('c',1)))
c_rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> val c_rdd2=sc.parallelize(List(('a',1),('b',1),('d',1)))
c_rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> c_rdd1.intersection(c_rdd2).collect
res4: Array[(Char, Int)] = Array((b,1), (a,1))

subtract()

将前一个RDD中在后一个RDD出现的元素删除

scala> val rdd1=sc.parallelize(List(('a',1),('b',1),('c',1)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> val rdd2=sc.parallelize(List(('d',1),('e',1),('c',1)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd1.subtract(rdd2).collect
res5: Array[(Char, Int)] = Array((b,1), (a,1))

scala> rdd2.subtract(rdd1).collect
res6: Array[(Char, Int)] = Array((d,1), (e,1))      

cartesian()

笛卡尔积

scala> val rdd1=sc.makeRDD(List(1,3,5,7))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at makeRDD at <console>:24

scala> val rdd2=sc.makeRDD(List(2,4,6,8))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24

scala> rdd1.cartesian(rdd2).collect
res7: Array[(Int, Int)] = Array((1,2), (1,4), (3,2), (3,4), (1,6), (1,8), (3,6), (3,8), (5,2), (5,4), (7,2), (7,4), (5,6), (5,8), (7,6), (7,8))

scala> rdd2.cartesian(rdd1).collect
res8: Array[(Int, Int)] = Array((2,1), (2,3), (4,1), (4,3), (2,5), (2,7), (4,5), (4,7), (6,1), (6,3), (8,1), (8,3), (6,5), (6,7), (8,5), (8,7))

任务实现

要找出单科成绩为100分的学生ID,首先需要过滤出两个RDD中成绩为100的学生数据,然后提取学生ID。将两个表得到的学生ID合并到一个RDD中,对学生ID去重,就可以得到所有至少有一科成绩为100分的学生ID。具体实现如下。

创建数据RDD

scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt").mapx=>val line=x.split("\\t");(line(0),line(1),line(2).toInt)
bigdata: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[36] at map at <console>:24

scala> val math=sc.textFile("/user/root/result_math.txt").mapx=>val line=x.split("\\t");(line(0),line(1),line(2).toInt)
math: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[39] at map at <console>:24

通过filter操作过滤出成绩为100分的学生数据,并通过map提取学生ID

scala> val bigdata_ID=bigdata.filter(x=>x._3==100).map(x=>x._1)
bigdata_ID: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at map at <console>:26

scala> val math_ID=math.filter(x=>x._3==100).map(x=>x._1)
math_ID: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at map at <console>:26

通过union操作合并所有ID,并利用distinct去重

scala> val id=bigdata_ID.union(math_ID).distinct()
id: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[47] at distinct at <console>:32

scala> id.collect
res9: Array[String] = Array(1003, 1007, 1004)        

可以看出,返回的学生ID为1003,1007,1004

输出每位学生所有科目的总成绩

创建键值对RDD

对一个由英语单词组成的文本行,提取其中的第一个单词作为Key,将整个句子作为Value,建立PairRDD

scala> val rdd=sc.parallelize(List("this is a test","how are you","I am fine","can you tell me"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> val words=rdd.map(x=>(x.split(" ")(0),x));
words: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[49] at map at <console>:26

scala> words.collect
res10: Array[(String, String)] = Array((this,this is a test), (how,how are you), (I,I am fine), (can,can you tell me))

转换操作keys与values

scala> val key=words.keys
key: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at keys at <console>:28

scala> key.collect
res11: Array[String] = Array(this, how, I, can)

scala> val value=words.values
value: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[51] at values at <console>:28

scala> value.collect
res12: Array[String] = Array(this is a test, how are you, I am fine, can you tell me)

转换操作reduceByKey()

scala> val r_rdd=sc.parallelize(List(('a',1),('a',2),('b',1),('c',1),('c',1))).map(x=>(x._1,x._2))
r_rdd: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[53] at map at <console>:24

scala> val re_rdd=r_rdd.reduceByKey((a,b)=>a+b)
re_rdd: org.apache.spark.rdd.RDD[(Char, Int)] = ShuffledRDD[54] at reduceByKey at <console>:26

scala> re_rdd.collect
res13: Array[(Char, Int)] = Array((b,1), (a,3), (c,2))

转换操作groupByKey()

对具有相同键的值进行分组

scala> val g_rdd=r_rdd.groupByKey()
g_rdd: org.apache.spark.rdd.RDD[(Char, Iterable[Int])] = ShuffledRDD[55] at groupByKey at <console>:26

scala> g_rdd.collect
res14: Array[(Char, Iterable[Int])] = Array((b,CompactBuffer(1)), (a,CompactBuffer(1, 2)), (c,CompactBuffer(1, 1)))

scala> g_rdd.map(x=>(x._1,x._2.size))2022年Spark基础学习笔记目录

Spark基础编程学习03

Spark:基础知识02

Spark:基础知识02

Spark:基础知识02

Spark基础学习笔记02:搭建Spark环境