Spark SQL:结构化数据文件处理02
Posted Weikun Xing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL:结构化数据文件处理02相关的知识,希望对你有一定的参考价值。
文章目录
DataFrame查询操作
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val sqlContext=new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@30417948
scala> val people=sc.textFile("hdfs://master/user/root/sparksql/user.txt")
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at textFile at <console>:41
scala> val schemaString="name age"
schemaString: String = name age
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types.StructType,StructField,StringType
import org.apache.spark.sql.types.StructType, StructField, StringType
scala> val schema=StructType(schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
scala> val rowRDD=people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[18] at map at <console>:45
scala> val peopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)
peopleDataFrame: org.apache.spark.sql.DataFrame = [name: string, age: string]
因为我用的这是一个空文件,所以返回为空
scala> peopleDataFrame.registerTempTable("peopleTempTab")
scala> val personsRDD=sqlContext.sql("select name,age from peopleTempTab where age >20").rdd
personsRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[22] at rdd at <console>:45
scala> personsRDD.collect
res7: Array[org.apache.spark.sql.Row] = Array()
条件查询
where
scala> case class Rating(userId:Int,movieId:Int,rating:Int,timestamp:Long)
defined class Rating
scala> val ratingData=sc.textFile("hdfs://master/user/root/sparksql/ratings.dat").map(_.split("::"))
ratingData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[25] at map at <console>:43
scala> val rating=ratingData.map(r=>Rating(r(0).trim.toInt,r(1).trim.toInt,r(2).trim.toInt,r(3).trim.toLong)).toDF()
rating: org.apache.spark.sql.DataFrame = [userId: int, movieId: int, rating: int, timestamp: bigint]
scala> case class User(userId:Int,gender:String,age:Int,occupation:Int,zip:String)
defined class User
scala> val userData=sc.textFile("hdfs://master/user/root/sparksql/users.dat").map(_.split("::"))
userData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:43
scala> val user=userData.map(u=>User(u(0).trim.toInt,u(1),u(2).trim.toInt,u(3).trim.toInt,u(4))).toDF()
user: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
查询user对象中,性别为女且年龄为18岁的用户信息
返回前三个
scala> val userWhere=user.where("gender='F' and age=18")
userWhere: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> userWhere.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation| zip|
+------+------+---+----------+-----+
| 18| F| 18| 3|95825|
| 34| F| 18| 0|02135|
| 38| F| 18| 4|02215|
+------+------+---+----------+-----+
only showing top 3 rows
filter
scala> val userFilter=user.filter("gender='F' and age=18")
userFilter: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> userFilter.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation| zip|
+------+------+---+----------+-----+
| 18| F| 18| 3|95825|
| 34| F| 18| 0|02135|
| 38| F| 18| 4|02215|
+------+------+---+----------+-----+
only showing top 3 rows
查询指定字段的数据信息
select:获取指定字段值
scala> val userSelect=user.select("userId","gender")
userSelect: org.apache.spark.sql.DataFrame = [userId: int, gender: string]
scala> userSelect.show(3)
+------+------+
|userId|gender|
+------+------+
| 1| F|
| 2| M|
| 3| M|
+------+------+
only showing top 3 rows
col/apply
limit
scala> val userLimit=user.limit(3)
userLimit: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> userLimit.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation| zip|
+------+------+---+----------+-----+
| 1| F| 1| 10|48067|
| 2| M| 56| 16|70072|
| 3| M| 25| 15|55117|
+------+------+---+----------+-----+
orderBy/sort
orderBy默认为升序排序,如果要求降序排序,可以使用desc,或者$,-来操作,具体看下面代码。(根据userId对user对象进行降序排序)
scala> val userOrderBy=user.orderBy(desc("userId"))
userOrderBy: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> val userOrderBy=user.orderBy($"userId".desc)
userOrderBy: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> val userOrderBy=user.orderBy(-user("userId"))
userOrderBy: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> userOrderBy.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation| zip|
+------+------+---+----------+-----+
| 6041| M| 25| 7|11107|
| 6040| M| 25| 6|11106|
| 6039| F| 45| 0|01060|
+------+------+---+----------+-----+
only showing top 3 rows
sort方法(根据userId对user对象进行升序排序)
scala> val userSort=user.sort(asc("userId"))
userSort: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> val userSort=user.sort($"userId".asc)
userSort: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> val userSort=user.sort(user("userId"))
userSort: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]
scala> userSort.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation| zip|
+------+------+---+----------+-----+
| 1| F| 1| 10|48067|
| 2| M| 56| 16|70072|
| 3| M| 25| 15|55117|
+------+------+---+----------+-----+
only showing top 3 rows
groupBy
按照性别分组,统计男女人数
scala> val userGroupByCount=user.groupBy("gender").count
userGroupByCount: org.apache.spark.sql.DataFrame = [gender: string, count: bigint]
scala> userGroupByCount.show()
+------+-----+
|gender|count|
+------+-----+
| F| 1709|
| M| 4332|
+------+-----+
还可以对分组后的数据进行其他操作,max,min,mean,sum
join
scala> dfjoin.show(3)
+------+------+---+----------+-----+------+-------+------+---------+
|userId|gender|age|occupation| zip|userId|movieId|rating|timestamp|
+------+------+---+----------+-----+------+-------+------+---------+
| 1| F| 1| 10|48067| 1| 1193| 5|978300760|
| 1| F| 1| 10|48067| 1| 661| 3|978302109|
| 1| F| 1| 10|48067| 1| 914| 3|978301968|
+------sql 升序降序排列