Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Posted 大数据Manor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))相关的知识,希望对你有一定的参考价值。
stypora-copy-images-to: img
typora-root-url: ./
Spark Day07:Spark SQL
01-[了解]-昨日课程内容回顾
主要讲解2个方面内容:Spark 调度内核和SparkSQL 快速体验。
1、Spark 内核调度
讲解Spark框架如何对1个Job作业进行调度执行,将1个Job如何拆分为Task任务,放到Executor上执行。
【以大数据经典案例:词频统计WordCount】
- 每个Job是RDD Action函数触发,比如foreachPartition、saveAsTextFile、count等等
- 预备知识:
RDD 依赖关系,宽依赖(1对N,又称为Shuffle依赖)和窄依赖(1对1)
DAG图,每个Job依据触发Job执行RDD,采用回溯法倒推,整个Job中所有RDD及依赖关系,构建DAG图
Stage阶段,采用回溯法,从后向前,依据RDD之间依赖关系,如果是宽依赖,划分一个Stage
每个Stage中都是一组Task任务
RDD中1个分区数据被1个Task处理,1个Task运行1Core CPU并且以线程方式运行
Stage中每个Task任务以pipeline管道计算模式处理数据
- 综合Job调度
- DAGScheduler,将DAG图划分为Stage,按照RDD之间依赖为宽依赖
Stage层面调度
- TaskScheduler,将每个Stage中Task任务打包为TaskSet发送队列中,等待Executor运行Task
Task层面调度
2、SparkSQL 快速体验
- SparkSession 程序入口
从Spark 2.0提供类,加载数据,底层还是SparkContext
spark.read.textFile("datas/wordcount.data")
DataFrame和Dataset = RDD + Schema(字段名称和字段类型)
- 实现词频统计WordCount
- 基于DSL编程
将数据封装到DataFrame或Dataset,调用API实现
val resultDS: DataFrame = inputDS
// hadoop spark hadoop spark spark -> 分割单词,并且扁平化
.select(explode(split(trim($"value"), "\\\\s+")).as("word"))
.groupBy("word").count()
- 基于SQL编程
将数据封装到DataFrame或Dataset,注册为临时视图,编写SQL分析
inputDS.createOrReplaceTempView("tmp_view_line")
val resultDF: DataFrame = spark.sql(
"""
|WITH tmp AS(
| SELECT explode(split(trim(value), "\\\\s+")) AS word FROM tmp_view_line
|)
|SELECT word, COUNT(1) AS total FROM tmp GROUP BY word
|""".stripMargin)
resultDF.show(10, truncate = false)
02-[了解]-今日课程内容提纲
主要2个方面内容:DataFrame是什么和数据分析(案例讲解)
1、DataFrame是什么
SparkSQL模块前世今生、官方定义和特性
DataFrame是什么
DataFrame = RDD[Row] + Schema,Row表示每行数据,抽象的,并不知道每行Row数据有多少列,弱类型
案例演示,spark-shell命令行
Row 表示每行数据,如何获取各个列的值
RDD如何转换为DataFrame
- 反射推断
- 自定义Schema
调用toDF函数,创建DataFrame
2、数据分析(案例讲解)
编写DSL,调用DataFrame API(类似RDD中函数,比如flatMap和类似SQL中关键词函数,比如select)
编写SQL语句
注册DataFrame为临时视图
编写SQL语句,类似Hive中SQL语句
使用函数:
org.apache.spark.sql.functions._
电影评分数据分析
分别使用DSL和SQL
03-[了解]-SparkSQL 概述之前世今生
SparkSQL模块一直到Spark 2.0版本才算真正稳定,发挥其巨大功能,发展经历如下几个阶段。
1、Spark 1.0之前
Shark = Hive + Spark
将Hive框架源码,修改其中转换SQL为MapReduce,变为转换RDD操作,称为Shark
问题:
维护成本太高,没有更多精力在于框架性能提升
2、Spark 1.0开始提出SparkSQL模块
重新编写引擎Catalyst,将SQL解析为优化逻辑计划Logical Plan
此时数据结构:SchemaRDD
测试开发版本,不能用于生产环境
3、Spark 1.3版本,SparkSQL成为Release版本
数据结构DataFrame,借鉴与Python和R中dataframe
提供外部数据源接口
方便可以从任意外部数据源加载load和保存save数据
4、Spark 1.6版本,SparkSQL数据结构Dataset
坊间流传,参考Flink中DataSet数据结构而来
Dataset = RDD + schema
5、Spark 2.0版本,DataFrame和Dataset何为一体
Dataset = RDD + schema
DataFrame = Dataset[Row]
Spark 2.x发布时,将Dataset和DataFrame统一为一套API,以Dataset数据结构为主(Dataset= RDD + Schema),其中
DataFrame = Dataset[Row]
。
04-[了解]-SparkSQL 概述之官方定义及特性
SparkSQL模块官方定义:
针对结构化数据处理Spark Module模块
。
主要包含三层含义:
- 第一、针对结构化数据处理,属于Spark框架一个部分
- 第二、抽象数据结构:DataFrame
DataFrame = RDD[Row] + Schema信息;
- 第三、分布式SQL引擎,类似Hive框架
从Hive框架继承而来,Hive中提供bin/hive交互式SQL命令行及HiveServer2服务,SparkSQL都可以;
Spark SQL模块架构示意图如下:
Spark SQL是Spark用来处理结构化数据的一个模块,主要四个特性:
官方文档:http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html
05-[掌握]-DataFrame是什么及案例演示
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame与RDD的主要区别在于,前者
带有schema元信息
,即DataFrame所表示的二维表数据集的每一列都带有名称和类型
。
使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率
DataFrame有如下特性:
范例演示:
加载json格式数据
[root@node1 spark]# bin/spark-shell --master local[2]
21/04/26 09:26:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node1.itcast.cn:4040
Spark context available as 'sc' (master = local[2], app id = local-1619400386041).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\\ \\/ _ \\/ _ `/ __/ '_/
/___/ .__/\\_,_/_/ /_/\\_\\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val empDF = spark.read.json("/datas/resources/employees.json")
empDF: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> empDF.printSchema()
root
|-- name: string (nullable = true)
|-- salary: long (nullable = true)
scala>
scala> empDF.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
scala>
scala> empDF.rdd
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at rdd at <console>:26
所以,可以看出:DataFrame = RDD[Row] + Schema信息
06-[掌握]-DataFrame中Schema和Row
查看DataFrame中Schema是什么,执行如下命令:
scala> empDF.schema
可以发现Schema封装类:
StructType
,结构化类型,里面存储的每个字段封装的类型:StructField
,结构化字段。
- 其一、StructType 定义,是一个样例类,属性为StructField的数组
- 其二、StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填
自定义Schema结构,官方提供实例代码:
DataFrame中每条数据封装在Row中,Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。
如何获取Row中每个字段的值呢????
- 方式一:下标获取,从0开始,类似数组下标获取
- 方式二:指定下标,知道类型
- 方式三:通过As转换类型, 此种方式开发中使用最多
如何创建Row对象呢???要么是传递value,要么传递Seq
07-[掌握]-RDD转换DataFrame之反射类型推断
实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:
类型推断和自定义Schema
。文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds
范例演示说明:
使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。
字段信息:user id 、 item id、 rating 、 timestamp。
当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。
package cn.itcast.spark.convert
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 采用反射的方式将RDD转换为DataFrame和Dataset
*/
object _01SparkRDDInferring {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,设置应用名称和master
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// 1. 加载电影评分数据,封装数据结构RDD
val rawRatingRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data")
// 2. 将RDD数据类型转化为 MovieRating
/*
将原始RDD中每行数据(电影评分数据)封装到CaseClass样例类中
*/
val ratingRDD: RDD[MovieRating] = rawRatingRDD.mapPartitions { iter =>
iter.map { line =>
// 按照制表符分割
val arr: Array[String] = line.trim.split("\\\\t")
// 封装样例对象
MovieRating(
arr(0), arr(1), arr(2).toDouble, arr(3).toLong
)
}
}
// 3. 通过隐式转换,直接将CaseClass类型RDD转换为DataFrame
val ratingDF: DataFrame = ratingRDD.toDF()
//ratingDF.printSchema()
//ratingDF.show(10, truncate = false)
/*
Dataset 从Spark1.6提出
Dataset = RDD + Schema
DataFrame = RDD[Row] + Schema
Dataset[Row] = DataFrame
*/
// 将DataFrame转换为Dataset,只需要加上CaseClass强类型即可
val ratingDS: Dataset[MovieRating] = ratingDF.as[MovieRating]
ratingDS.printSchema()
ratingDS.show(10, truncate = false)
// TODO: 将RDD转换为Dataset,可以通过隐式转, 要求RDD数据类型必须是CaseClass
val dataset: Dataset[MovieRating] = ratingRDD.toDS()
dataset.printSchema()
dataset.show(10, truncate = false)
// 将Dataset直接转换为DataFrame
val dataframe = dataset.toDF()
dataframe.printSchema()
dataframe.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}
08-[掌握]-RDD转换DataFrame之自定义Schema
依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:
package cn.itcast.spark.convert
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* 自定义Schema方式转换RDD为DataFrame
*/
object _02SparkRDDSchema {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,设置应用名称和master
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// 1. 加载电影评分数据,封装数据结构RDD
val rawRatingsRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2)
// 2. TODO: step1. RDD中数据类型为Row:RDD[Row]
val rowRDD: RDD[Row] = rawRatingsRDD.mapPartitions { iter =>
iter.map {
line =>
// 按照制表符分割
val arr: Array[String] = line.trim.split("\\\\t")
// 封装样例对象
Row(arr(0), arr(1), arr(2).toDouble, arr(3).toLong)
}
}
// 3. TODO:step2. 针对Row中数据定义Schema:StructType
val schema: StructType = StructType(
Array(
StructField("user_id", StringType, nullable = true),
StructField("movie_id", StringType, nullable = true),
StructField("rating", DoubleType, nullable = true),
StructField("timestamp", LongType, nullable = true)
)
)
// 4. TODO:step3. 使用SparkSession中方法将定义的Schema应用到RDD[Row]上
val ratingDF: DataFrame = spark.createDataFrame(rowRDD, schema)
ratingDF.printSchema()
ratingDF.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}
09-[掌握]-toDF函数指定列名称转换为DataFrame
SparkSQL中提供一个函数:
toDF
,通过指定列名称,将数据类型为元组
的RDD或Seq转换为DataFrame,实际开发中也常常使用。
范例演示:将数据类型为元组的RDD或Seq直接转换为DataFrame。
package cn.itcast.spark.todf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame
*/
object _03SparkSQLToDF {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,设置应用名称和master
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// 1. 将数据类型为元组的RDD,转换为DataFrame
val rdd: RDD[(Int, String, String)] = spark.sparkContext.parallelize(
List((1001, "zhangsan", "male"), (1003, "lisi", "male"), (1003, "xiaohong", "female"))
)
// 调用toDF方法,指定列名称,将RDD转换为DataFrame
val dataframe: DataFrame = rdd.toDF("id", "name", "gender")
dataframe.printSchema()
dataframe.show(10, truncate = false)
println("==========================================================")
// 定义一个Seq序列,其中数据类型为元组
val seq: Seq[(Int, String, String)] = Seq(
(1001, "zhangsan", "male"), (1003, "lisi", "male"), (1003, "xiaohong", "female")
)
// 将数据类型为元组Seq序列转换为DataFrame
val df: DataFrame = seq.toDF("id", "name", "gender")
df.printSchema()
df.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}
10-[了解]-SparkSQL中数据处理方式
在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析处理数据,正如前面案例【词频统计WordCount】两种方式:
- 第一种:DSL(domain-specific language)编程
- 调用DataFrame/Dataset API(函数),类似RDD中函数;
- DSL编程中,调用函数更多是类似SQL语句关键词函数,比如select、groupBy,同时要使用函数处理
数据分析人员,尤其使用Python数据分析人员
- 第二种:SQL 编程
- 将DataFrame/Dataset注册为临时视图或表,编写SQL语句,类似HiveQL;
- 分为2步操作,先将DataFrame注册为临时视图,然后再编写SQL
尤其DBA和数据仓库分析人员擅长编写SQL语句,采用SQL编程
11-[掌握]-基于DSL分析(函数说明)和SQL分析
- 基于DSL分析
- 调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL
语句函数,部分截图如下:
- 基于SQL分析
- 将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:
其中SQL语句类似Hive中SQL语句,查看Hive官方文档,SQL查询分析语句语法,官方文档文档:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
12-[掌握]-电影评分数据分析之需求说明和加载数据
使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6ypUaVpL-1627176341890)(/img/image-20210426105132291.png)]
数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:
数据处理分析步骤如下:
将分析结果,分别保存到mysql数据库表中及CSV文本文件中。
首先加载电影评分数据,封装到RDD中
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// TODO: step1. 读取电影评分数据,从本地文件系统读取,封装数据至RDD中
val ratingRDD: RDD[String] = spark.read.textFile("datas/ml-1m/ratings.dat").rdd
println(s"Count = ${ratingRDD.count()}")
println(s"first:\\n\\t${ratingRDD.first()}")
13-[掌握]-电影评分数据分析之数据ETL
读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,采用toDF函数,代码下:
val ratingDF: DataFrame = ratingRDD
.filter(line => null != line && line.trim.split("::").length == 4)
.mapPartitions{iter =>
iter.map{line =>
// a. 解析每条数据
val arr = line.trim.split("::")
// b. 构建元组对象
(arr(0).toInt, arr(1).toInt, arr(2).toDouble, arr(3).toLong)
}
}
// c. 调用toDF函数,指定列名称
.toDF("user_id", "item_id", "rating", "timestamp")
ratingDF.printSchema()
ratingDF.show(10, truncate = false)
println(s"count = ${ratingDF.count()}")
将RDD转换为DataFrame数据集,方便采用DSL或SQL分析数据。
14-[掌握]-电影评分数据分析之SQL分析
首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下;
// TODO: step3. 基于SQL方式分析
/*
a. 注册为临时视图
b. 编写SQL,执行分析
*/
// a. 将DataFrame注册为临时视图
ratingDF.createOrReplaceTempView("view_temp_ratings")
// b. 编写SQL,执行分析
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| item_id, ROUND(AVG(rating), 2) AS avg_rating, COUNT(1) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| item_id
|HAVING
| cnt_rating >= 2000
|ORDER BY
| avg_rating DESC
|LIMIT
| 10
|""".stripMargin)
/*
root
|-- item_id: integer (nullable = false)
|-- avg_rating: double (nullable = true)
|-- count_rating: long (nullable = false)
*/
top10MovieDF.printSchema()
/*
+--------+----------+------------+
|movie_id|avg_rating|count_rating|
+--------+----------+------------+
|318 |4.55 |2227 |
|858 |4.52 |2223 |
|527 |4.51 |2304 |
|1198 |4.48 |2514 |
|260 |4.45 |2991 |
|2762 |4.41 |2459 |
|593 |4.35 |2578 |
|2028 |4.34 |2653 |
|2858 |4.32 |3428 |
|2571 |4.32 |2590 |
+--------+----------+------------+
*/
top10MovieDF.show(10, truncate = false)
15-[掌握]-电影评分数据分析之Shuffle分区数
运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(
spark.sql.shuffle.partitions
)为200
,在实际项目中要合理的设置
。在构建SparkSession实例对象时,设置参数的值
好消息:
在Spark3.0开始,不用关心参数值,程序自动依据Shuffle时数据量,合理设置分区数目
。
16-[掌握]-电影评分数据分析之DSL分析
调用Dataset中函数,采用链式编程分析数据,核心代码如下:
val resultDF: DataFrame = ratingDF
// a. 按照电影ID分组
.groupBy($"item_id")
// b. 组合数据进行聚合,评分平均值和评分的次数
.agg(
round(avg($"rating"), 2).as("avg_rating"), //
count($"user_id").as("cnt_rating")
)
// c. 过滤评分次数大于2000
.filter($"cnt_rating" > 2000)
// d. 按照评分平均值降序排序
.orderBy($"avg_rating".desc)
// e. 获取前10条数据
.limit(10)
resultDF.printSchema()
resultDF.show(10, truncate = false)
使用需要导入函数库:import org.apache.spark.sql.functions._
使用DSL编程分析和SQL编程分析,哪一种方式性能更好呢?实际开发中如何选择呢???
17-[掌握]-电影评分数据分析之保存结果至MySQL
将分析数据保持到MySQL表中,直接调用Dataframe中writer方法,写入数据到MYSQL表中
// TODO: step 4. 将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 保存结果数据至MySQL表中
val props = new Properties()
props.put("user", "root")
props.put("password", "123456")
props.put("driver", "com.mysql.cj.jdbc.Driver")
resultDF
.coalesce(1) // 对结果数据考虑降低分区数
.write
.mode(SaveMode.Overwrite)
.jdbc(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
"db_test.tb_top10_movies", //
props
)
/*
mysql> select * from tb_top10_movies ;
+---------+------------+------------+
| item_id | avg_rating | cnt_rating |
+---------+------------+------------+
| 318 | 4.55 | 2227 |
| 858 | 4.52 | 2223 |
| 527 | 4.51 | 2304 |
| 1198 | 4.48 | 2514 |
| 260 | 4.45 | 2991 |
| 2762 | 4.41 | 2459 |
| 593 | 4.35 | 2578 |
| 2028 | 4.34 | 2653 |
| 2858 | 4.32 | 3428 |
| 2571 | 4.32 | 2590 |
+---------+------------+------------+
*/
// 保存结果数据至CSv文件中
// 数据不在使用时,释放资源
resultDF.unpersist()
18-[掌握]-电影评分数据分析之保存结果至CSV文件
将结果DataFrame保存值CSV文件中,文件首行为列名称,核心代码如下:
// 保存结果以上是关于Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))的主要内容,如果未能解决你的问题,请参考以下文章
Note_Spark_Day11:Spark Streaming
Note_Spark_Day01:Spark 框架概述和Spark 快速入门
Note_Spark_Day10:Spark Streaming