1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet

Posted 吃地瓜的喵酱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet相关的知识,希望对你有一定的参考价值。

第1章 Spark SQL概述

  • Spark SQLSpark 用于结构化数据(structured data)处理的 Spark 模块
  • SparkSQL 可以简化 RDD 的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是 SparkSQLSpark SQL 为了简化 RDD 的开发,提高开发效率,提供了 2 个编程抽象,类似 Spark Core 中的 RDD:
    ➢ (1).DataFrame
    ➢ (2).DataSet

1.1 DataFrame(数据帧)简介

  • Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrameRDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。
  • DataFrame 是为数据提供了 Schema 的视图。可以把它当做数据库中的一张来对待。


上图直观地体现了 DataFrameRDD 的区别。


1.2 DataSet(数据集)简介

DataSet 是分布式数据集合。DataSetDataFrame 的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 mapflatMapfilter 等等)。

  • DataSetDataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象。
  • 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性。
  • 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称。
  • DataSet 是强类型的。比如可以有 DataSet[Car]DataSet[Person]
  • DataFrameDataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将DataFrame 转换为 DataSetRow 是一个类型,跟 CarPerson 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序。

第2章 Spark SQL核心编程

2.1 新的起点

  • Spark Core 中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContextSpark SQL其实可以理解为对 Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
  • SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContextHiveContext的组合,所以在 SQLContexHiveContext上可用的 APISparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 SparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 sparkSparkSession 对 象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样。

2.2 DataFrame(重要)

  • Spark SQLDataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation操作也有 action操作

2.2.1 创建 DataFrame

在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL入口,创建 DataFrame有三种方式:

  • (1) 通过 Spark 的数据源进行创建;
  • (2) 从一个存在的 RDD 进行转换;
  • (3) 从 Hive Table 进行查询返回。

2.2.1.1 从 Spark 数据源进行创建

  • (1) 查看 Spark 支持创建文件的数据源格式。
scala> spark.read.	
# 注意:"."后面按 Tab 键,而不是回车键

csv format jdbc json load option options orc parquet schema table text textFile

注:spark-shell.cmd 打开方式请参考: Spark运行模式 中第 5.2小节

  • (2) 在 spark 的 bin/input 目录中创建 user.json 文件。
{"username":"谢清照", "age":"21"}
{"username":"朱玮琦", "age":"21"}
{"username":"信鸽", "age":"20"}
  • (3) 读取 json 文件创建 DataFrame
scala> val df = spark.read.json("input/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。

  • (4) 展示结果
scala> df.show

+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|     信鸽|
+---+--------+

2.2.1.2 从 RDD 进行转换

  • 暂时跳过…

2.2.1.3 从 Hive Table 进行查询返回

  • 暂时跳过…

2.2.2 SQL 语法

SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。

  • (1) 读取 JSON 文件创建 DataFrame
scala> val df = spark.read.json("input/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
  • (2) 对 DataFrame 创建一个临时表
scala> df.createOrReplaceTempView("user")
  • (3) 通过 SQL 语句实现查询全表
scala> val sqlDF = spark.sql("select * from user")

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • (4) 结果展示
scala> sqlDF.show

+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|   小鸽子|
+---+--------+

注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使 用全局临时表时需要全路径访问,如:global_temp.people。

  • (5) 对于 DataFrame 创建一个全局表
scala> df.createOrReplaceGlobalTempView("user")

**注**:由于博主使用的Hadoop版本与当前使用的Spark版本不兼容,这里报了错误。重新安装Hadoop环境后,又由于Hadoop安装路径以及JDK安装路径上包含空格而报错,重新安装后解决了此问题。

  • (6) 通过 SQL 语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|   小鸽子|
+---+--------+

scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age|username|
+---+--------+
| 21|   谢清照|
| 21|   朱玮琦|
| 20|   小鸽子|
+---+--------+

2.2.3 DSL 语法(了解)

DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, PythonR 中使用 DSL,使用 DSL语法风格不必去创建临时视图了。

  • (1) 创建一个 DataFrame
scala> val df = spark.read.json("input/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • (2) 查看 DataFrameSchema 信息
scala> df.printSchema

root
 |-- age: long (nullable = true)
 |-- username: string (nullable = true)
  • (3) 只查看 “username” 列数据
scala> df.select("username").show()
+--------+
|username|
+--------+
|   谢清照|
|   朱玮琦|
|   小鸽子|
+--------+
  • (4) 查看 “username” 列数据以及 “age-1” 数据
    注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名。
scala> df.select($"username",$"age" - 1).show
scala> df.select('username, 'age - 1).show()
scala> df.select('username, 'age - 1 as "newage").show()

+--------+---------+
|username|(age - 1)|
+--------+---------+
|   谢清照|       20|
|   朱玮琦|       20|
|   小鸽子|       19|
+--------+---------+
  • (5) 查看 “age” 小于" 21" 的数据
scala> df.filter($"age"<21).show

+---+---------+
|age| username|
+---+---------+
| 20|    小鸽子|
+---+---------+
  • (6) 按照 “age” 分组,查看数据条数
scala> df.groupBy("age").count.show

+---+-----+
|age|count|
+---+-----+
| 21|    2|
| 20|    1|
+---+-----+

2.2.4 RDD 转换为 DataFrame

  • 在 IDEA 中开发程序时,如果需要 RDDDF 或者 DS 之间互相操作,那么需要引入import spark.implicits._
  • 这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持val 修饰的对象的引入。
  • spark-shell 中无需导入,自动完成此操作。
scala> val rdd = sc.textFile("input/id.txt")
scala> rdd.toDF("id").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+

实际开发中,一般通过样例类将 RDD 转换为 DataFrame。

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF.show
+--------+---+
|    name|age|
+--------+---+
|   谢清照| 21|
|   朱玮琦| 20|
+--------+---+

2.2.5 DataFrame 转换为 RDD

DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD

scala> val df = sc.makeRDD(List(("谢清照",21), ("朱玮琦",20))).map(t=>User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([谢清照,21], [朱玮琦,20])

注意:此时得到的 RDD 存储类型为 Row。

scala> array(0)
res28: org.apache.spark.sql.Row = [zhangsan,30]

scala> array(0)(0)
res29: Any = zhangsan

scala> array(0).getAs[String]("name")
res30: String = 谢清照

2.3 DataSet(重要)

  • DataSet是具有强类型的数据集合,需要提供对应的类型信息。

2.3.1 创建 DataSet

  • (1) 使用样例类集合创建 DataSet
scala> case class Person(name: String, age: Long)
defined class Person

scala> val list = List(Person("谢清照",21), Person("朱玮琦",20))
list: List[Person] = List(Person(谢清照,21), Person(朱玮琦,20))

scala> val ds = list.toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> ds.show
+------+---+
|  name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+
  • (2) 使用基本类型的序列创建 DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。

2.3.2 RDD 转换为 DataSet

  • SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSetcase 类定义了table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。
# 样例类
scala> case class User(name:String, age:Int)
defined class User

scala> val rdd = sc.makeRDD(List(User("谢清照", 21), User("朱玮琦", 20)))
rdd: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[42] at makeRDD at <console>:26

# RDD 转换为 DataSet
scala> val ds = rdd.toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> ds.show
+------+---+
|  name|age|
+------+---+
| 谢清照| 21|
| 朱玮琦| 20|
+------+---+

2.3.3 DataSet 转换为 RDD

DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD。

# ds 为上面例子中的 DataSet
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25

scala> rdd.collect
res14: Array[User] = Array(User(谢清照,21), User(朱玮琦,20))

2.4 DataFrame 和 DataSet 转换

  • DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。

2.4.1 DataFrame 转换为 DataSet

# DataFrame
scala>  val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

# 样例类
scala> case class User(age:Long, username:String)
defined class User

# 转换为 DataSet
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]

2.4.2 DataSet 转换为 DataFrame

# Dataset[User] => DataFrame 

# 转换为 DataFrame, 只保留了结构,去掉了 User 的数据类型
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

2.5 RDD、DataFrame、DataSet 三者的关系

SparkSQLSpark 为我们提供了两个新的抽象,分别是 DataFrameDataSet。他们和 RDD有什么区别呢?首先从版本的产生上来看:

  • Spark1.0 => RDD
  • Spark1.3 => DataFrame
  • Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中,DataSet 有可能会逐步取代 RDDDataFrame 成为唯一的 API 接口。

2.5.1 三者的区别

2.5.1.1 RDD

  • RDD 一般和 spark mllib 同时使用
  • RDD 不支持 sparksql 操作

2.5.1.2 DataFrame

  • RDDDataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
  • DataFrameDataSet 一般不与 spark mllib 同时使用
  • DataFrameDataSet 均支持 SparkSQL 的操作,比如 selectgroupby 之类,还能注册临时表/视窗,进行 sql 语句操作。
  • DataFrameDataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)。

2.5.1.3 DataSet

  • DatasetDataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
  • DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息

声明:本文是学习时记录的笔记,如有侵权请告知删除!
原视频地址:https://www.bilibili.com/video/BV11A411L7CK

以上是关于1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet的主要内容,如果未能解决你的问题,请参考以下文章

从多个 parquet 路径创建 Spark SQL 表

Spark Context 概述

Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))

Spark SQL 高级编程之 HadoopHiveSpark 环境搭建

Spark SQL 高级编程之 HadoopHiveSpark 环境搭建

Spark SQL执行计划到RDD全流程记录