Spark SQL:结构化数据文件处理02

Posted Weikun Xing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL:结构化数据文件处理02相关的知识,希望对你有一定的参考价值。

文章目录

DataFrame查询操作

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlContext=new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@30417948

scala> val people=sc.textFile("hdfs://master/user/root/sparksql/user.txt")
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at textFile at <console>:41

scala> val schemaString="name age"
schemaString: String = name age

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types.StructType,StructField,StringType
import org.apache.spark.sql.types.StructType, StructField, StringType

scala> val schema=StructType(schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))

scala> val rowRDD=people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[18] at map at <console>:45

scala> val peopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)
peopleDataFrame: org.apache.spark.sql.DataFrame = [name: string, age: string]

因为我用的这是一个空文件,所以返回为空

scala> peopleDataFrame.registerTempTable("peopleTempTab")

scala> val personsRDD=sqlContext.sql("select name,age from peopleTempTab where age >20").rdd
personsRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[22] at rdd at <console>:45

scala> personsRDD.collect
res7: Array[org.apache.spark.sql.Row] = Array()

条件查询

where

scala> case class Rating(userId:Int,movieId:Int,rating:Int,timestamp:Long)
defined class Rating

scala> val ratingData=sc.textFile("hdfs://master/user/root/sparksql/ratings.dat").map(_.split("::"))
ratingData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[25] at map at <console>:43

scala> val rating=ratingData.map(r=>Rating(r(0).trim.toInt,r(1).trim.toInt,r(2).trim.toInt,r(3).trim.toLong)).toDF()
rating: org.apache.spark.sql.DataFrame = [userId: int, movieId: int, rating: int, timestamp: bigint]

scala> case class User(userId:Int,gender:String,age:Int,occupation:Int,zip:String)
defined class User

scala> val userData=sc.textFile("hdfs://master/user/root/sparksql/users.dat").map(_.split("::"))
userData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:43

scala> val user=userData.map(u=>User(u(0).trim.toInt,u(1),u(2).trim.toInt,u(3).trim.toInt,u(4))).toDF()
user: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

查询user对象中,性别为女且年龄为18岁的用户信息
返回前三个

scala> val userWhere=user.where("gender='F' and age=18")
userWhere: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> userWhere.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|    18|     F| 18|         3|95825|
|    34|     F| 18|         0|02135|
|    38|     F| 18|         4|02215|
+------+------+---+----------+-----+
only showing top 3 rows

filter

scala> val userFilter=user.filter("gender='F' and age=18")
userFilter: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> userFilter.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|    18|     F| 18|         3|95825|
|    34|     F| 18|         0|02135|
|    38|     F| 18|         4|02215|
+------+------+---+----------+-----+
only showing top 3 rows

查询指定字段的数据信息

select:获取指定字段值

scala> val userSelect=user.select("userId","gender")
userSelect: org.apache.spark.sql.DataFrame = [userId: int, gender: string]

scala> userSelect.show(3)
+------+------+
|userId|gender|
+------+------+
|     1|     F|
|     2|     M|
|     3|     M|
+------+------+
only showing top 3 rows

col/apply

limit

scala> val userLimit=user.limit(3)
userLimit: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> userLimit.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|     1|     F|  1|        10|48067|
|     2|     M| 56|        16|70072|
|     3|     M| 25|        15|55117|
+------+------+---+----------+-----+

orderBy/sort

orderBy默认为升序排序,如果要求降序排序,可以使用desc,或者$,-来操作,具体看下面代码。(根据userId对user对象进行降序排序)

scala> val userOrderBy=user.orderBy(desc("userId"))
userOrderBy: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> val userOrderBy=user.orderBy($"userId".desc)
userOrderBy: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> val userOrderBy=user.orderBy(-user("userId"))
userOrderBy: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> userOrderBy.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|  6041|     M| 25|         7|11107|
|  6040|     M| 25|         6|11106|
|  6039|     F| 45|         0|01060|
+------+------+---+----------+-----+
only showing top 3 rows

sort方法(根据userId对user对象进行升序排序)

scala> val userSort=user.sort(asc("userId"))
userSort: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> val userSort=user.sort($"userId".asc)
userSort: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> val userSort=user.sort(user("userId"))
userSort: org.apache.spark.sql.DataFrame = [userId: int, gender: string, age: int, occupation: int, zip: string]

scala> userSort.show(3)
+------+------+---+----------+-----+
|userId|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|     1|     F|  1|        10|48067|
|     2|     M| 56|        16|70072|
|     3|     M| 25|        15|55117|
+------+------+---+----------+-----+
only showing top 3 rows

groupBy

按照性别分组,统计男女人数

scala> val userGroupByCount=user.groupBy("gender").count
userGroupByCount: org.apache.spark.sql.DataFrame = [gender: string, count: bigint]

scala> userGroupByCount.show()
+------+-----+                                                                  
|gender|count|
+------+-----+
|     F| 1709|
|     M| 4332|
+------+-----+

还可以对分组后的数据进行其他操作,max,min,mean,sum

join

scala> dfjoin.show(3)
+------+------+---+----------+-----+------+-------+------+---------+
|userId|gender|age|occupation|  zip|userId|movieId|rating|timestamp|
+------+------+---+----------+-----+------+-------+------+---------+
|     1|     F|  1|        10|48067|     1|   1193|     5|978300760|
|     1|     F|  1|        10|48067|     1|    661|     3|978302109|
|     1|     F|  1|        10|48067|     1|    914|     3|978301968|
+------sql 升序降序排列

sql查询降序和升序的问题

sql语句怎么按照字段1排序后再在本字段内按时间排序?

SQL语言中的升序,降序,是怎么会事

Spark SQL:结构化数据文件处理03

Spark SQL 结构化数据文件处理