Spark SQL的介绍和DataFrame的建立及使用
Posted Code_exploration
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL的介绍和DataFrame的建立及使用相关的知识,希望对你有一定的参考价值。
1.
Spark SQL定位处理结构化数据的模块。SparkSQL提供相应的优化机制,并支持不同语言的开发API。
java、scala、Python,类SQL的方法调用(DSL)
2.
RDD与Spark SQL的比较说明:
使用Spark SQL的优势:a.面向结构化数据;b.优化机制;
RDD缺点:a.没有优化机制,如对RDD执行Filter操作;
b.RDD类型转换后无法进行模式推断
3.
DataFrame/SchemaRDD
DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。
Dateframe=RDD(数据集)+Schema(元数据/模型)
SchemaRDD就是DataFrame的前身,在1.3.0版本后。
DataFrame存放的是ROW对象。每个Row 对象代表一行记录。
SchemaRDD还包含记录的结构信息(即数据字段)
4.
创建Spark SQL环境
a.将SparkSQL依赖库添加至pom.xml文件中
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.2</version>
</dependency>
b.创建SparkSQL Context-->SparkSession
通过SparkSession.builder()创建构造器;
并调用.appName("sparkSQL").master("local")设置集群模式以及app名称
最后必须调用getOrCreate()方法创建SparkSession对象。
val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
c.加载外部数据源:
通过SparkSession的read()方法加载不同的数据源:json、CVS、jdbc、textfile、parquert等
val df = spark.read.textFile("file:///d:/测试数据/users.txt").toDF()
df.show()
DF的创建方式 |
(1)通过SparkSession的createDataFrame(...)方法创建DF对象
a.将Seq序列转换为DF
b.将RDD[Product]多元素转换为DF
(2)通过SparkSession的read读取外部文件调用toDF()
(3)通过导入隐式转换,可直接将Scala中的序列转换为DF
val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
import spark.implicits._
val list = List(("zhangsan",12,"changchun"),("lilei",25,"haerbin"))
val df_implicits = list.toDF()
查看DF的Schema |
1.案例说明:
val rdd = sc.textFile("file:///d:/测试数据/users.txt").map(x=>x.split(" ")).map(x=>(x(0),x(1),x(2)))
val df_rdd = spark.createDataFrame(rdd)
df_rdd.show()
df_rdd.select("_1","_2").where("_1 like ‘%o%‘").show()
df_rdd.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
通过case用例类可以对DF进行Schema匹配
case class Person(name:String,age:Int,address:String)
val rdd = sc.textFile("file:///d:/测试数据/users.txt").map(x=>x.split(" ")).map(x=>new Person(x(0),x(1).toInt,x(2)))
val df_rdd = spark.createDataFrame(rdd)
df_rdd.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: string (nullable = true)
df_rdd.show()
+------+---+-------+
| name|age|address|
+------+---+-------+
| anne| 22| NY|
| joe| 39| CO|
|alison| 35| NY|
+------+---+-------+
2.实现简单的select操作
df_rdd.select("name","age").where("name like ‘%o%‘").show()
+------+---+
| name|age|
+------+---+
| joe| 39|
|alison| 35|
| bob| 71|
+------+---+
DF的操作方式 |
1.显示:
df_rdd.show()
2.查询:
df_rdd.select("name").show()
3.条件查询:
df_rdd.select($"name",$"age").where("name like ‘%o%‘").show() //注:引入spark.implicits._
+------+---+
| name|age|
+------+---+
| joe| 39|
|alison| 35|
| bob| 71|
+------+---+
4.条件查询:
df_rdd.select($"name",$"age"+1).where("name like ‘%o%‘").show() //$是scala的用法,需要隐式转换 import spark.implicits._
+------+---------+
| name|(age + 1)|
+------+---------+
| joe| 40|
|alison| 36|
| bob| 72|
+------+---------+
5.过滤操作
a.通过过滤表达式:
df_rdd.filter("age > 36").show()
b.通过func式编程进行处理,DF中每个元素均为ROW
df_rdd.filter(x=>{if(x.getAs[Int]("age") > 36) true else false }).show()
6.分组操作
df_rdd.groupBy("address").count().show
+-------+-----+
|address|count|
+-------+-----+
| OR| 2|
| VA| 2|
| CA| 2|
| NY| 3|
| CO| 1|
+-------+-----+
以上是关于Spark SQL的介绍和DataFrame的建立及使用的主要内容,如果未能解决你的问题,请参考以下文章
[Spark][Python][DataFrame][SQL]Spark对DataFrame直接执行SQL处理的例子