Spark基础编程学习03
Posted Weikun Xing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础编程学习03相关的知识,希望对你有一定的参考价值。
文章目录
将汇总后的学生成绩存储为文本文件
Spark支持的一些常见文件格式
文本文件,JSON,CSV,SequenceFile,对象文件
JSON文件的读取与存储
JSON文件读取
sudo gedit testjson.json
"name":"jack","age":12
"name":"lili","age":22
"name":"cc","age":11
"name":"vv","age":13
"name":"lee","age":14
hdfs dfs -put testjson.json /user/root/
scala> import org.json4s._
import org.json4s._
scala> import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.JsonMethods._
scala> val input=sc.textFile("/user/root/testjson.json")
input: org.apache.spark.rdd.RDD[String] = /user/root/testjson.json MapPartitionsRDD[99] at textFile at <console>:30
scala> case class Person(name:String,age:Int)
defined class Person
scala> implicit val formats=DefaultFormats;
formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@3053af29
scala> val in_json=input.collect.mapx=>parse(x).extract[Person]
in_json: Array[Person] = Array(Person(jack,12), Person(lili,22), Person(cc,11), Person(vv,13), Person(lee,14))
JSON文件存储
scala> import org.json4s.JsonDSL._
import org.json4s.JsonDSL._
scala> val json=in_json.mapx=>
| ("name" -> x.name)~("age" -> x.age)
json: Array[org.json4s.JsonAST.JObject] = Array(JObject(List((name,JString(jack)), (age,JInt(12)))), JObject(List((name,JString(lili)), (age,JInt(22)))), JObject(List((name,JString(cc)), (age,JInt(11)))), JObject(List((name,JString(vv)), (age,JInt(13)))), JObject(List((name,JString(lee)), (age,JInt(14)))))
scala> val jsons=json.mapx=>compact(render(x))
jsons: Array[String] = Array("name":"jack","age":12, "name":"lili","age":22, "name":"cc","age":11, "name":"vv","age":13, "name":"lee","age":14)
scala> sc.parallelize(jsons).repartition(1).saveAsTextFile("/user/root/json_out.json")
hadoop@master:~$ hdfs dfs -cat /user/root/json_out.json/part-00000
"name":"jack","age":12
"name":"lili","age":22
"name":"cc","age":11
"name":"vv","age":13
"name":"lee","age":14
CSV文件的读取与存储
CSV文件的读取
hadoop@master:/home/dblab$ hdfs dfs -put testcsv.csv /user/root/
hadoop@master:/home/dblab$ hdfs dfs -cat /user/root/testcsv.csv
0,first,first line
scala> import java.io.StringReader
import java.io.StringReader
scala> import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVReader
scala> val input=sc.textFile("/user/root/testcsv.csv")
input: org.apache.spark.rdd.RDD[String] = /user/root/testcsv.csv MapPartitionsRDD[119] at textFile at <console>:37
scala> val result=input.mapline=>
| val reader=new CSVReader(new StringReader(line));
| reader.readNext();
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[120] at map at <console>:39
scala> result.collect
res37: Array[Array[String]] = Array(Array(0, first, first line), Array(1, second, second line))
读取嵌有换行符的CSV文件
scala> import java.io.StringReader
import java.io.StringReader
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVReader
scala> case class Data(index:String,title:String,content:String)
defined class Data
scala> val input=sc.wholeTextFiles("/user/root/testcsv.csv")
input: org.apache.spark.rdd.RDD[(String, String)] = /user/root/testcsv.csv MapPartitionsRDD[1] at wholeTextFiles at <console>:29
scala> val result=input.flatMapcase(_,txt)=>val reader=new CSVReader(new StringReader(txt));reader.readAll().map(x=>Data(x(0),x(1),x(2)))
result: org.apache.spark.rdd.RDD[Data] = MapPartitionsRDD[2] at flatMap at <console>:33
scala> result.collect
res1: Array[Data] = Array(Data(0,first,first line), Data(1,second,second line))
CSV文件的存储
scala> import java.io.StringReader,StringWriter
import java.io.StringReader, StringWriter
scala> import au.com.bytecode.opencsv.CSVReader,CSVWriter
import au.com.bytecode.opencsv.CSVReader, CSVWriter
scala> result.map(data=>List(data.index,data.title,data.content).toArray).mapPartitionsdata=>
| val stringWriter=new StringWriter();
| val csvWriter=new CSVWriter(stringWriter);
| csvWriter.writeAll(data.toList)
| Iterator(stringWriter.toString)
| .saveAsTextFile("/user/root/csv_out")
hdfs dfs -cat /user/root/csv_out/part-00000
"0","first","first line"
"1","second","second line"
SequenceFile的读取与存储
SequenceFile的存储
scala> import org.apache.hadoop.io.IntWritable,Text
import org.apache.hadoop.io.IntWritable, Text
scala> val rdd=sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:32
scala> rdd.saveAsSequenceFile("/user/root/outse")
SequenceFile文件的读取
scala> val output=sc.sequenceFile("/user/root/outse",classOf[Text],classOf[IntWritable]).mapcase (x,y)=>(x.toString,y.get())
output: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:32
scala> output.collect.foreach(println)
(Panda,3)
(Kay,6)
(Snail,2)
文本文件的读取与存储
文本文件的读取
scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt")
bigdata: org.apache.spark.rdd.RDD[String] = /user/root/result_bigdata.txt MapPartitionsRDD[11] at textFile at <console>:32
scala> val math=sc.textFile("/user/root/result_math.txt")
math: org.apache.spark.rdd.RDD[String] = /user/root/result_math.txt MapPartitionsRDD[13] at textFile at <console>:32
scala> bigdata.collect
res6: Array[String] = Array(1001 大数据基础 90, 1002 大数据基础 94, 1003 大数据基础 100, 1004 大数据基础 99, 1005 大数据基础 90, 1006 大数据基础 94, 1007 大数据基础 100, 1008 大数据基础 93, 1009 大数据基础 89, 1010 大数据基础 78, 1011 大数据基础 91, 1012 大数据基础 84)
文本文件的存储
scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt")
bigdata: org.apache.spark.rdd.RDD[String] = /user/root/result_bigdata.txt MapPartitionsRDD[15] at textFile at <console>:32
scala> val math=sc.textFile("/user/root/result_math.txt")
math: org.apache.spark.rdd.RDD[String] = /user/root/result_math.txt MapPartitionsRDD[17] at textFile at <console>:32
scala> val score=bigdata.union(math)
score: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at <console>:36
scala> score.repartition(1).saveAsTextFile("/user/root/scores")
任务实现
scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt").mapx=>val line=x.split("\\t");(line(0),line(1),line(2).toInt)
bigdata: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[91] at map at <console>:32
scala> val math=sc.textFile("/user/root/result_math.txt").mapx=>val line=x.split("\\t");(line(0),line(1),line(2).toInt)
math: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[94] at map at <console>:32
scala> val all_score=bigdata union math
all_score: org.apache.spark.rdd.RDD[(String, String, Int)] = UnionRDD[95] at union at <console>:36
scala> val score=all_score.map(x=>(x._1,x._3)).reduceByKey((a,b)=>a+b)
score: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[97] at reduceByKey at <console>:38
scala> val bigdata=sc.textFile("/user/root/result_bigdata.txt").mapx=>val line=x.split("\\t");(line(0),line(2).toInt)
bigdata: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[56] at map at <console>:32
scala> val math=sc.textFile("/user/root/result_math.txt").mapx=>val line=x.split("\\t");(line(0),line(2).toInt)
math: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[59] at map at <console>:32
scala> val scores=bigdata.union(math).map(x=>(x._1,x._2))
scores: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[61] at map at <console以上是关于Spark基础编程学习03的主要内容,如果未能解决你的问题,请参考以下文章