Spark基础编程学习03

Posted Weikun Xing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础编程学习03相关的知识,希望对你有一定的参考价值。

文章目录

将汇总后的学生成绩存储为文本文件

Spark支持的一些常见文件格式

文本文件,JSON,CSV,SequenceFile,对象文件

JSON文件的读取与存储

JSON文件读取

sudo gedit testjson.json

"name":"jack","age":12
"name":"lili","age":22
"name":"cc","age":11
"name":"vv","age":13
"name":"lee","age":14
hdfs dfs -put testjson.json /user/root/
scala> import org.json4s._
import org.json4s._

scala> import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.JsonMethods._

scala> val input=sc.textFile("/user/root/testjson.json")
input: org.apache.spark.rdd.RDD[String] = /user/root/testjson.json MapPartitionsRDD[99] at textFile at <console>:30

scala> case class Person(name:String,age:Int)
defined class Person

scala> implicit val formats=DefaultFormats;
formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@3053af29

scala> val in_json=input.collect.mapx=>parse(x).extract[Person]
in_json: Array[Person] = Array(Person(jack,12), Person(lili,22), Person(cc,11), Person(vv,13), Person(lee,14))

JSON文件存储

scala> import org.json4s.JsonDSL._
import org.json4s.JsonDSL._

scala> val json=in_json.mapx=>
     | ("name" -> x.name)~("age" -> x.age)
json: Array[org.json4s.JsonAST.JObject] = Array(JObject(List((name,JString(jack)), (age,JInt(12)))), JObject(List((name,JString(lili)), (age,JInt(22)))), JObject(List((name,JString(cc)), (age,JInt(11)))), JObject(List((name,JString(vv)), (age,JInt(13)))), JObject(List((name,JString(lee)), (age,JInt(14)))))

scala> val jsons=json.mapx=>compact(render(x))
jsons: Array[String] = Array("name":"jack","age":12, "name":"lili","age":22, "name":"cc","age":11, "name":"vv","age":13, "name":"lee","age":14)

scala> sc.parallelize(jsons).repartition(1).saveAsTextFile("/user/root/json_out.json")
hadoop@master:~$ hdfs dfs -cat /user/root/json_out.json/part-00000
"name":"jack","age":12
"name":"lili","age":22
"name":"cc","age":11
"name":"vv","age":13
"name":"lee","age":14

CSV文件的读取与存储

CSV文件的读取

hadoop@master:/home/dblab$ hdfs dfs -put testcsv.csv /user/root/
hadoop@master:/home/dblab$ hdfs dfs -cat /user/root/testcsv.csv
0,first,first line

scala> import java.io.StringReader
import java.io.StringReader

scala> import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVReader

scala> val input=sc.textFile("/user/root/testcsv.csv")
input: org.apache.spark.rdd.RDD[String] = /user/root/testcsv.csv MapPartitionsRDD[119] at textFile at <console>:37

scala> val result=input.mapline=>
     | val reader=new CSVReader(new StringReader(line));
     | reader.readNext();
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[120] at map at <console>:39

scala> result.collect
res37: Array[Array[String]] = Array(Array(0, first, first line), Array(1, second, second line))

读取嵌有换行符的CSV文件

scala> import java.io.StringReader
import java.io.StringReader

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVReader

scala> case class Data(index:String,title:String,content:String)
defined class Data

scala> val input=sc.wholeTextFiles("/user/root/testcsv.csv")
input: org.apache.spark.rdd.RDD[(String, String)] = /user/root/testcsv.csv MapPartitionsRDD[1] at wholeTextFiles at <console>:29

scala> val result=input.flatMapcase(_,txt)=>val reader=new CSVReader(new StringReader(txt));reader.readAll().map(x=>Data(x(0),x(1),x(2)))
result: org.apache.spark.rdd.RDD[Data] = MapPartitionsRDD[2] at flatMap at <console>:33

scala> result.collect
res1: Array[Data] = Array(Data(0,first,first line), Data(1,second,second line))

CSV文件的存储

scala> import java.io.StringReader,StringWriter
import java.io.StringReader, StringWriter

scala> import au.com.bytecode.opencsv.CSVReader,CSVWriter
import au.com.bytecode.opencsv.CSVReader, CSVWriter

scala> result.map(data=>List(data.index,data.title,data.content).toArray).mapPartitionsdata=>
     | val stringWriter=new StringWriter();
     | val csvWriter=new CSVWriter(stringWriter);
     | csvWriter.writeAll(data.toList)
     | Iterator(stringWriter.toString)
     | .saveAsTextFile("/user/root/csv_out")

hdfs dfs -cat /user/root/csv_out/part-00000
"0","first","first line"
"1","second","second line"

SequenceFile的读取与存储

SequenceFile的存储

scala> import org.apache.hadoop.io.IntWritable,Text
import org.apache.hadoop.io.IntWritable, Text

scala> val rdd=sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:32

scala> rdd.saveAsSequenceFile("/user/root/outse")

SequenceFile文件的读取

scala> val output=sc.sequenceFile("/user/root/outse",classOf[Text],classOf[IntWritable]).mapcase (x,y)=>(x.toString,y.get())
output: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:32

scala> output.collect.foreach(println)
(Panda,3)
(Kay,6)
(Snail,2)

文本文件的读取与存储

文本文件的读取

scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt")
bigdata: org.apache.spark.rdd.RDD[String] = /user/root/result_bigdata.txt MapPartitionsRDD[11] at textFile at <console>:32

scala> val math=sc.textFile("/user/root/result_math.txt")
math: org.apache.spark.rdd.RDD[String] = /user/root/result_math.txt MapPartitionsRDD[13] at textFile at <console>:32

scala> bigdata.collect
res6: Array[String] = Array(1001	大数据基础	90, 1002	大数据基础	94, 1003	大数据基础	100, 1004	大数据基础	99, 1005	大数据基础	90, 1006	大数据基础	94, 1007	大数据基础	100, 1008	大数据基础	93, 1009	大数据基础	89, 1010	大数据基础	78, 1011	大数据基础	91, 1012	大数据基础	84)

文本文件的存储

scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt")
bigdata: org.apache.spark.rdd.RDD[String] = /user/root/result_bigdata.txt MapPartitionsRDD[15] at textFile at <console>:32

scala> val math=sc.textFile("/user/root/result_math.txt")
math: org.apache.spark.rdd.RDD[String] = /user/root/result_math.txt MapPartitionsRDD[17] at textFile at <console>:32

scala> val score=bigdata.union(math)
score: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at <console>:36

scala> score.repartition(1).saveAsTextFile("/user/root/scores")

任务实现

scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt").mapx=>val line=x.split("\\t");(line(0),line(1),line(2).toInt)
bigdata: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[91] at map at <console>:32

scala> val math=sc.textFile("/user/root/result_math.txt").mapx=>val line=x.split("\\t");(line(0),line(1),line(2).toInt)
math: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[94] at map at <console>:32

scala> val all_score=bigdata union math
all_score: org.apache.spark.rdd.RDD[(String, String, Int)] = UnionRDD[95] at union at <console>:36

scala> val score=all_score.map(x=>(x._1,x._3)).reduceByKey((a,b)=>a+b)
score: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[97] at reduceByKey at <console>:38

scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt").mapx=>val line=x.split("\\t");(line(0),line(2).toInt)
bigdata: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[56] at map at <console>:32

scala> val math=sc.textFile("/user/root/result_math.txt").mapx=>val line=x.split("\\t");(line(0),line(2).toInt)
math: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[59] at map at <console>:32

scala> val scores=bigdata.union(math).map(x=>(x._1,x._2))
scores: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[61] at map at <console以上是关于Spark基础编程学习03的主要内容,如果未能解决你的问题,请参考以下文章

Spark基础学习笔记03:Spark运行架构及原理

Spark基础编程学习01

2022年Spark基础学习笔记目录

Spark基础编程学习02

spark 深入学习 05RDD编程之旅基础篇02-Spaek shell

学习笔记Spark—— Spark编程基础(创建RDDRDD算子文件读取与存储)