1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet
Posted 吃地瓜的喵酱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet相关的知识,希望对你有一定的参考价值。
本文目录如下:
第1章 Spark SQL概述
Spark SQL
是Spark
用于结构化数据(structured data)处理的Spark
模块。SparkSQL
可以简化RDD
的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是SparkSQL
。Spark SQL
为了简化RDD
的开发,提高开发效率,提供了 2 个编程抽象,类似Spark Core
中的RDD
:
➢ (1).DataFrame
➢ (2).DataSet
1.1 DataFrame(数据帧)简介
- 在
Spark
中,DataFrame
是一种以RDD
为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame
与RDD
的主要区别在于,前者带有schema
元信息,即DataFrame
所表示的二维表数据集的每一列都带有名称和类型。 DataFrame
是为数据提供了Schema
的视图。可以把它当做数据库中的一张表来对待。
上图直观地体现了 DataFrame
和 RDD
的区别。
1.2 DataSet(数据集)简介
DataSet
是分布式数据集合。DataSet
是 DataFrame
的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda
函数的能力)以及 Spark SQL
优化执行引擎的优点。DataSet
也可以使用功能性的转换(操作 map
,flatMap
,filter
等等)。
DataSet
是DataFrame API
的一个扩展,是SparkSQL
最新的数据抽象。- 用户友好的
API
风格,既具有类型安全检查也具有DataFrame
的查询优化特性。 - 用样例类来对
DataSet
中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet
中的字段名称。 DataSet
是强类型的。比如可以有DataSet[Car]
,DataSet[Person]
。DataFrame
是DataSet
的特列,DataFrame=DataSet[Row]
,所以可以通过as
方法将DataFrame
转换为DataSet
。Row
是一个类型,跟Car
、Person
这些的类型一样,所有的表结构信息都用Row
来表示。获取数据时需要指定顺序。
第2章 Spark SQL核心编程
2.1 新的起点
Spark Core
中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext
,Spark SQL
其实可以理解为对Spark Core
的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。SparkSession
是 Spark 最新的 SQL 查询起始点,实质上是SQLContext
和HiveContext
的组合,所以在SQLContex
和HiveContext
上可用的API
在SparkSession
上同样是可以使用的。SparkSession
内部封装了SparkContext
,所以计算实际上是由SparkContext
完成的。当我们使用spark-shell
的时候,spark
框架会自动的创建一个名称叫做spark
的SparkSession
对 象, 就像我们以前可以自动获取到一个sc
来表示SparkContext
对象一样。
2.2 DataFrame
(重要)
Spark SQL
的DataFrame API
允许我们使用DataFrame
而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API
既有transformation操作
也有action操作
。
2.2.1 创建 DataFrame
在 Spark SQL 中 SparkSession
是创建 DataFrame
和执行 SQL
的入口,创建 DataFrame
有三种方式:
- (1) 通过 Spark 的数据源进行创建;
- (2) 从一个存在的 RDD 进行转换;
- (3) 从 Hive Table 进行查询返回。
2.2.1.1 从 Spark 数据源进行创建
- (1) 查看
Spark
支持创建文件的数据源格式。
scala> spark.read.
# 注意:"."后面按 Tab 键,而不是回车键
csv format jdbc json load option options orc parquet schema table text textFile
注:spark-shell.cmd
打开方式请参考: Spark运行模式 中第 5.2小节
。
- (2) 在 spark 的
bin/input
目录中创建user.json
文件。
{"username":"谢清照", "age":"21"}
{"username":"朱玮琦", "age":"21"}
{"username":"信鸽", "age":"20"}
- (3) 读取
json
文件创建DataFrame
。
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。
- (4) 展示结果
scala> df.show
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 信鸽|
+---+--------+
2.2.1.2 从 RDD 进行转换
- 暂时跳过…
2.2.1.3 从 Hive Table 进行查询返回
- 暂时跳过…
2.2.2 SQL 语法
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
- (1) 读取
JSON
文件创建DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
- (2) 对
DataFrame
创建一个临时表
scala> df.createOrReplaceTempView("user")
- (3) 通过
SQL
语句实现查询全表
scala> val sqlDF = spark.sql("select * from user")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- (4) 结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 小鸽子|
+---+--------+
注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使 用全局临时表时需要全路径访问,如:global_temp.people。
- (5) 对于
DataFrame
创建一个全局表
scala> df.createOrReplaceGlobalTempView("user")
**注**
:由于博主使用的Hadoop
版本与当前使用的Spark
版本不兼容,这里报了错误。重新安装Hadoop
环境后,又由于Hadoop
安装路径以及JDK
安装路径上包含空格而报错,重新安装后解决了此问题。
- (6) 通过
SQL
语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 小鸽子|
+---+--------+
scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21| 谢清照|
| 21| 朱玮琦|
| 20| 小鸽子|
+---+--------+
2.2.3 DSL 语法(了解)
DataFrame
提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala
, Java
, Python
和 R
中使用 DSL
,使用 DSL
语法风格不必去创建临时视图了。
- (1) 创建一个
DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- (2) 查看
DataFrame
的Schema
信息
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- username: string (nullable = true)
- (3) 只查看 “
username
” 列数据
scala> df.select("username").show()
+--------+
|username|
+--------+
| 谢清照|
| 朱玮琦|
| 小鸽子|
+--------+
- (4) 查看 “
username
” 列数据以及 “age-1
” 数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名。
scala> df.select($"username",$"age" - 1).show
scala> df.select('username, 'age - 1).show()
scala> df.select('username, 'age - 1 as "newage").show()
+--------+---------+
|username|(age - 1)|
+--------+---------+
| 谢清照| 20|
| 朱玮琦| 20|
| 小鸽子| 19|
+--------+---------+
- (5) 查看 “
age
” 小于"21
" 的数据
scala> df.filter($"age"<21).show
+---+---------+
|age| username|
+---+---------+
| 20| 小鸽子|
+---+---------+
- (6) 按照 “
age
” 分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 21| 2|
| 20| 1|
+---+-----+
2.2.4 RDD 转换为 DataFrame
- 在 IDEA 中开发程序时,如果需要
RDD
与DF
或者DS
之间互相操作,那么需要引入import spark.implicits._
- 这里的
spark
不是Scala
中的包名,而是创建的sparkSession
对象的变量名称,所以必须先创建SparkSession
对象再导入。这里的spark
对象不能使用var
声明,因为Scala
只支持val
修饰的对象的引入。 spark-shell
中无需导入,自动完成此操作。
scala> val rdd = sc.textFile("input/id.txt")
scala> rdd.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
实际开发中,一般通过样例类将 RDD 转换为 DataFrame。
scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF.show
+--------+---+
| name|age|
+--------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+--------+---+
2.2.5 DataFrame 转换为 RDD
DataFrame
其实就是对 RDD
的封装,所以可以直接获取内部的 RDD
。
scala> val df = sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25
scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([谢清照,21], [朱玮琦,20])
注意:此时得到的 RDD 存储类型为 Row。
scala> array(0)
res28: org.apache.spark.sql.Row = [zhangsan,30]
scala> array(0)(0)
res29: Any = zhangsan
scala> array(0).getAs[String]("name")
res30: String = 谢清照
2.3 DataSet(重要)
DataSet
是具有强类型的数据集合,需要提供对应的类型信息。
2.3.1 创建 DataSet
- (1) 使用样例类集合创建
DataSet
scala> case class Person(name: String, age: Long)
defined class Person
scala> val list = List(Person("谢清照",21), Person("朱玮琦",20))
list: List[Person] = List(Person(谢清照,21), Person(朱玮琦,20))
scala> val ds = list.toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> ds.show
+------+---+
| name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
- (2) 使用基本类型的序列创建
DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。
2.3.2 RDD 转换为 DataSet
SparkSQL
能够自动将包含有case
类的RDD
转换成DataSet
,case
类定义了table
的结构,case
类属性通过反射变成了表的列名。Case
类可以包含诸如Seq
或者Array
等复杂的结构。
# 样例类
scala> case class User(name:String, age:Int)
defined class User
scala> val rdd = sc.makeRDD(List(User("谢清照", 21), User("朱玮琦", 20)))
rdd: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[42] at makeRDD at <console>:26
# RDD 转换为 DataSet
scala> val ds = rdd.toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> ds.show
+------+---+
| name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
2.3.3 DataSet 转换为 RDD
DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD。
# ds 为上面例子中的 DataSet
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25
scala> rdd.collect
res14: Array[User] = Array(User(谢清照,21), User(朱玮琦,20))
2.4 DataFrame 和 DataSet 转换
DataFrame
其实是DataSet
的特例,所以它们之间是可以互相转换的。
2.4.1 DataFrame 转换为 DataSet
# DataFrame
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
# 样例类
scala> case class User(age:Long, username:String)
defined class User
# 转换为 DataSet
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]
2.4.2 DataSet 转换为 DataFrame
# Dataset[User] => DataFrame
# 转换为 DataFrame, 只保留了结构,去掉了 User 的数据类型
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
2.5 RDD、DataFrame、DataSet 三者的关系
在 SparkSQL
中 Spark
为我们提供了两个新的抽象,分别是 DataFrame
和 DataSet
。他们和 RDD
有什么区别呢?首先从版本的产生上来看:
- Spark1.0 => RDD
- Spark1.3 => DataFrame
- Spark1.6 => Dataset
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark
版本中,DataSet
有可能会逐步取代 RDD
和 DataFrame
成为唯一的 API 接口。
2.5.1 三者的区别
2.5.1.1 RDD
RDD
一般和spark mllib
同时使用RDD
不支持sparksql
操作
2.5.1.2 DataFrame
- 与
RDD
和Dataset
不同,DataFrame
每一行的类型固定为Row
,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。 DataFrame
与DataSet
一般不与spark mllib
同时使用DataFrame
与DataSet
均支持SparkSQL
的操作,比如select
,groupby
之类,还能注册临时表/视窗,进行sql
语句操作。DataFrame
与DataSet
支持一些特别方便的保存方式,比如保存成csv
,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)。
2.5.1.3 DataSet
Dataset
和DataFrame
拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame
其实就是DataSet
的一个特例type DataFrame = Dataset[Row]
。DataFrame
也可以叫Dataset[Row]
,每一行的类型是Row
,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS
方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset
中,每一行是什么类型是不一定的,在自定义了case class
之后可以很自由的获得每一行的信息
声明:本文是学习时记录的笔记,如有侵权请告知删除!
原视频地址:https://www.bilibili.com/video/BV11A411L7CK
以上是关于1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet的主要内容,如果未能解决你的问题,请参考以下文章
Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark SQL 高级编程之 HadoopHiveSpark 环境搭建