Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))

Posted 大数据Manor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))相关的知识,希望对你有一定的参考价值。


stypora-copy-images-to: img
typora-root-url: ./

Spark Day07:Spark SQL

01-[了解]-昨日课程内容回顾

主要讲解2个方面内容:Spark 调度内核和SparkSQL 快速体验。

1、Spark 内核调度
	讲解Spark框架如何对1个Job作业进行调度执行,将1个Job如何拆分为Task任务,放到Executor上执行。
	【以大数据经典案例:词频统计WordCount】
	- 每个Job是RDD Action函数触发,比如foreachPartition、saveAsTextFile、count等等
	- 预备知识:
		RDD 依赖关系,宽依赖(1对N,又称为Shuffle依赖)和窄依赖(1对1)
		DAG图,每个Job依据触发Job执行RDD,采用回溯法倒推,整个Job中所有RDD及依赖关系,构建DAG图
		Stage阶段,采用回溯法,从后向前,依据RDD之间依赖关系,如果是宽依赖,划分一个Stage
		每个Stage中都是一组Task任务
			RDD中1个分区数据被1个Task处理,1个Task运行1Core CPU并且以线程方式运行
			Stage中每个Task任务以pipeline管道计算模式处理数据
	- 综合Job调度
		- DAGScheduler,将DAG图划分为Stage,按照RDD之间依赖为宽依赖
			Stage层面调度
		- TaskScheduler,将每个Stage中Task任务打包为TaskSet发送队列中,等待Executor运行Task
			Task层面调度
	
2、SparkSQL 快速体验
	- SparkSession 程序入口
		从Spark 2.0提供类,加载数据,底层还是SparkContext
		spark.read.textFile("datas/wordcount.data")
		DataFrame和Dataset = RDD + Schema(字段名称和字段类型)
	- 实现词频统计WordCount
		- 基于DSL编程
			将数据封装到DataFrame或Dataset,调用API实现
            val resultDS: DataFrame = inputDS
                // hadoop spark hadoop spark spark  -> 分割单词,并且扁平化
                .select(explode(split(trim($"value"), "\\\\s+")).as("word"))
                .groupBy("word").count()
		- 基于SQL编程
			将数据封装到DataFrame或Dataset,注册为临时视图,编写SQL分析
			inputDS.createOrReplaceTempView("tmp_view_line")
			val resultDF: DataFrame = spark.sql(
                """
                  |WITH tmp AS(
                  |  SELECT explode(split(trim(value), "\\\\s+")) AS word FROM tmp_view_line
                  |)
                  |SELECT word, COUNT(1) AS total FROM tmp GROUP BY word
                  |""".stripMargin)
            resultDF.show(10, truncate = false)

02-[了解]-今日课程内容提纲

主要2个方面内容:DataFrame是什么和数据分析(案例讲解)

1、DataFrame是什么
	SparkSQL模块前世今生、官方定义和特性
	DataFrame是什么
		DataFrame = RDD[Row] + Schema,Row表示每行数据,抽象的,并不知道每行Row数据有多少列,弱类型
	案例演示,spark-shell命令行
	Row 表示每行数据,如何获取各个列的值
	RDD如何转换为DataFrame
		- 反射推断
		- 自定义Schema
	调用toDF函数,创建DataFrame

2、数据分析(案例讲解)
	编写DSL,调用DataFrame API(类似RDD中函数,比如flatMap和类似SQL中关键词函数,比如select)
	编写SQL语句
		注册DataFrame为临时视图
		编写SQL语句,类似Hive中SQL语句
	使用函数:
		org.apache.spark.sql.functions._
	电影评分数据分析
		分别使用DSL和SQL

03-[了解]-SparkSQL 概述之前世今生

SparkSQL模块一直到Spark 2.0版本才算真正稳定,发挥其巨大功能,发展经历如下几个阶段。

1、Spark 1.0之前
	Shark = Hive + Spark
	将Hive框架源码,修改其中转换SQL为MapReduce,变为转换RDD操作,称为Shark
	问题:
		维护成本太高,没有更多精力在于框架性能提升
		
2、Spark 1.0开始提出SparkSQL模块
	重新编写引擎Catalyst,将SQL解析为优化逻辑计划Logical Plan
	此时数据结构:SchemaRDD
	测试开发版本,不能用于生产环境
	
3、Spark 1.3版本,SparkSQL成为Release版本
	数据结构DataFrame,借鉴与Python和R中dataframe
	提供外部数据源接口
		方便可以从任意外部数据源加载load和保存save数据

4、Spark 1.6版本,SparkSQL数据结构Dataset
	坊间流传,参考Flink中DataSet数据结构而来
	Dataset = RDD + schema
	

5、Spark 2.0版本,DataFrame和Dataset何为一体
	Dataset = RDD + schema
	DataFrame = Dataset[Row]

​ Spark 2.x发布时,将Dataset和DataFrame统一为一套API,以Dataset数据结构为主(Dataset= RDD + Schema),其中DataFrame = Dataset[Row]

04-[了解]-SparkSQL 概述之官方定义及特性

SparkSQL模块官方定义:针对结构化数据处理Spark Module模块

主要包含三层含义:

  • 第一、针对结构化数据处理,属于Spark框架一个部分

  • 第二、抽象数据结构:DataFrame
DataFrame = RDD[Row] + Schema信息;
  • 第三、分布式SQL引擎,类似Hive框架

从Hive框架继承而来,Hive中提供bin/hive交互式SQL命令行及HiveServer2服务,SparkSQL都可以;

Spark SQL模块架构示意图如下:

Spark SQL是Spark用来处理结构化数据的一个模块,主要四个特性:

官方文档:http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html

05-[掌握]-DataFrame是什么及案例演示

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型

使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率

DataFrame有如下特性:

范例演示加载json格式数据

[root@node1 spark]# bin/spark-shell --master local[2]
21/04/26 09:26:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node1.itcast.cn:4040
Spark context available as 'sc' (master = local[2], app id = local-1619400386041).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\\ \\/ _ \\/ _ `/ __/  '_/
   /___/ .__/\\_,_/_/ /_/\\_\\   version 2.4.5
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val empDF = spark.read.json("/datas/resources/employees.json")
empDF: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

scala> empDF.printSchema()
root
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)


scala> 

scala> empDF.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+


scala> 

scala> empDF.rdd
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at rdd at <console>:26

所以,可以看出:DataFrame = RDD[Row] + Schema信息

06-[掌握]-DataFrame中Schema和Row

查看DataFrame中Schema是什么,执行如下命令:

scala> empDF.schema

​ 可以发现Schema封装类:StructType,结构化类型,里面存储的每个字段封装的类型:StructField,结构化字段。

  • 其一、StructType 定义,是一个样例类,属性为StructField的数组

  • 其二、StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填

自定义Schema结构,官方提供实例代码:

DataFrame中每条数据封装在Row中,Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。

如何获取Row中每个字段的值呢????

  • 方式一:下标获取,从0开始,类似数组下标获取

  • 方式二:指定下标,知道类型

  • 方式三:通过As转换类型, 此种方式开发中使用最多

如何创建Row对象呢???要么是传递value,要么传递Seq

07-[掌握]-RDD转换DataFrame之反射类型推断

​ 实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:类型推断和自定义Schema

文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

范例演示说明:使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。

字段信息:user id 、 item id、 rating 、 timestamp。

​ 当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。

package cn.itcast.spark.convert

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * 采用反射的方式将RDD转换为DataFrame和Dataset
 */
object _01SparkRDDInferring {
	
	def main(args: Array[String]): Unit = {
		
		// 构建SparkSession实例对象,设置应用名称和master
		val spark: SparkSession = SparkSession.builder()
    		.appName(this.getClass.getSimpleName.stripSuffix("$"))
    		.master("local[3]")
    		.getOrCreate()
		import spark.implicits._
		
		// 1. 加载电影评分数据,封装数据结构RDD
		val rawRatingRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data")
		
		// 2. 将RDD数据类型转化为 MovieRating
		/*
			将原始RDD中每行数据(电影评分数据)封装到CaseClass样例类中
		 */
		val ratingRDD: RDD[MovieRating] = rawRatingRDD.mapPartitions { iter =>
			iter.map { line =>
				// 按照制表符分割
				val arr: Array[String] = line.trim.split("\\\\t")
				// 封装样例对象
				MovieRating(
					arr(0), arr(1), arr(2).toDouble, arr(3).toLong
				)
			}
		}
		
		// 3. 通过隐式转换,直接将CaseClass类型RDD转换为DataFrame
		val ratingDF: DataFrame = ratingRDD.toDF()
		//ratingDF.printSchema()
		//ratingDF.show(10, truncate = false)
		
		/*
			Dataset 从Spark1.6提出
				Dataset = RDD + Schema
				
				DataFrame = RDD[Row] + Schema
				
				Dataset[Row] = DataFrame
		 */
		// 将DataFrame转换为Dataset,只需要加上CaseClass强类型即可
		val ratingDS: Dataset[MovieRating] = ratingDF.as[MovieRating]
		ratingDS.printSchema()
		ratingDS.show(10, truncate = false)
		
		
		// TODO: 将RDD转换为Dataset,可以通过隐式转, 要求RDD数据类型必须是CaseClass
		val dataset: Dataset[MovieRating] = ratingRDD.toDS()
		dataset.printSchema()
		dataset.show(10, truncate = false)
		
		// 将Dataset直接转换为DataFrame
		val dataframe = dataset.toDF()
		dataframe.printSchema()
		dataframe.show(10, truncate = false)
		
		// 应用结束,关闭资源
		spark.stop()
	}
	
}

08-[掌握]-RDD转换DataFrame之自定义Schema

依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:

package cn.itcast.spark.convert

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * 自定义Schema方式转换RDD为DataFrame
 */
object _02SparkRDDSchema {
	
	def main(args: Array[String]): Unit = {
		
		// 构建SparkSession实例对象,设置应用名称和master
		val spark: SparkSession = SparkSession.builder()
			.appName(this.getClass.getSimpleName.stripSuffix("$"))
			.master("local[3]")
			.getOrCreate()
		import spark.implicits._
		
		// 1. 加载电影评分数据,封装数据结构RDD
		val rawRatingsRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2)
		
		// 2. TODO: step1. RDD中数据类型为Row:RDD[Row]
		val rowRDD: RDD[Row] = rawRatingsRDD.mapPartitions { iter =>
			iter.map {
				line =>
					// 按照制表符分割
					val arr: Array[String] = line.trim.split("\\\\t")
					// 封装样例对象
					Row(arr(0), arr(1), arr(2).toDouble, arr(3).toLong)
			}
		}
		
		// 3. TODO:step2. 针对Row中数据定义Schema:StructType
		val schema: StructType = StructType(
			Array(
				StructField("user_id", StringType, nullable = true),
				StructField("movie_id", StringType, nullable = true),
				StructField("rating", DoubleType, nullable = true),
				StructField("timestamp", LongType, nullable = true)
			)
		)
		
		// 4. TODO:step3. 使用SparkSession中方法将定义的Schema应用到RDD[Row]上
		val ratingDF: DataFrame = spark.createDataFrame(rowRDD, schema)
		ratingDF.printSchema()
		ratingDF.show(10, truncate = false)
		
		// 应用结束,关闭资源
		spark.stop()
	}
	
}

09-[掌握]-toDF函数指定列名称转换为DataFrame

​ SparkSQL中提供一个函数:toDF,通过指定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。

范例演示:将数据类型为元组的RDD或Seq直接转换为DataFrame。

package cn.itcast.spark.todf

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame
 */
object _03SparkSQLToDF {
	
	def main(args: Array[String]): Unit = {
		
		// 构建SparkSession实例对象,设置应用名称和master
		val spark: SparkSession = SparkSession.builder()
    		.appName(this.getClass.getSimpleName.stripSuffix("$"))
    		.master("local[3]")
    		.getOrCreate()
		import spark.implicits._
		
		// 1. 将数据类型为元组的RDD,转换为DataFrame
		val rdd: RDD[(Int, String, String)] = spark.sparkContext.parallelize(
			List((1001, "zhangsan", "male"), (1003, "lisi", "male"), (1003, "xiaohong", "female"))
		)
		// 调用toDF方法,指定列名称,将RDD转换为DataFrame
		val dataframe: DataFrame = rdd.toDF("id", "name", "gender")
		dataframe.printSchema()
		dataframe.show(10, truncate = false)
		
		
		println("==========================================================")
		
		// 定义一个Seq序列,其中数据类型为元组
		val seq: Seq[(Int, String, String)] = Seq(
			(1001, "zhangsan", "male"), (1003, "lisi", "male"), (1003, "xiaohong", "female")
		)
		// 将数据类型为元组Seq序列转换为DataFrame
		val df: DataFrame = seq.toDF("id", "name", "gender")
		df.printSchema()
		df.show(10, truncate = false)
		
		// 应用结束,关闭资源
		spark.stop()
	}
	
}

10-[了解]-SparkSQL中数据处理方式

​ 在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析处理数据,正如前面案例【词频统计WordCount】两种方式:

数据分析人员,尤其使用Python数据分析人员

尤其DBA和数据仓库分析人员擅长编写SQL语句,采用SQL编程

11-[掌握]-基于DSL分析(函数说明)和SQL分析

  • 基于DSL分析
    • 调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL
      语句函数,部分截图如下:

  • 基于SQL分析
    • 将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:

其中SQL语句类似Hive中SQL语句,查看Hive官方文档,SQL查询分析语句语法,官方文档文档:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

12-[掌握]-电影评分数据分析之需求说明和加载数据

​ 使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6ypUaVpL-1627176341890)(/img/image-20210426105132291.png)]

数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:

数据处理分析步骤如下:

将分析结果,分别保存到mysql数据库表中及CSV文本文件中。

首先加载电影评分数据,封装到RDD中

		// 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
			.master("local[4]")
			.appName(this.getClass.getSimpleName.stripSuffix("$"))
			.getOrCreate()
		// 导入隐式转换
		import spark.implicits._
		
		// TODO: step1. 读取电影评分数据,从本地文件系统读取,封装数据至RDD中
		val ratingRDD: RDD[String] = spark.read.textFile("datas/ml-1m/ratings.dat").rdd
		println(s"Count = ${ratingRDD.count()}")
		println(s"first:\\n\\t${ratingRDD.first()}")

13-[掌握]-电影评分数据分析之数据ETL

读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,采用toDF函数,代码下:

		val ratingDF: DataFrame = ratingRDD
			.filter(line => null != line && line.trim.split("::").length == 4)
			.mapPartitions{iter =>
				iter.map{line =>
					// a. 解析每条数据
					val arr = line.trim.split("::")
					// b. 构建元组对象
					(arr(0).toInt, arr(1).toInt, arr(2).toDouble, arr(3).toLong)
				}
			}
			// c. 调用toDF函数,指定列名称
    		.toDF("user_id", "item_id", "rating", "timestamp")
		ratingDF.printSchema()
		ratingDF.show(10, truncate = false)
		println(s"count = ${ratingDF.count()}")

将RDD转换为DataFrame数据集,方便采用DSL或SQL分析数据。

14-[掌握]-电影评分数据分析之SQL分析

首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下;

		// TODO: step3. 基于SQL方式分析
		/*
			a. 注册为临时视图
			b. 编写SQL,执行分析
		 */
		// a. 将DataFrame注册为临时视图
		ratingDF.createOrReplaceTempView("view_temp_ratings")
		
		// b. 编写SQL,执行分析
		val top10MovieDF: DataFrame = spark.sql(
			"""
			  |SELECT
			  |  item_id, ROUND(AVG(rating), 2) AS avg_rating, COUNT(1) AS cnt_rating
			  |FROM
			  |  view_temp_ratings
			  |GROUP BY
			  |  item_id
			  |HAVING
			  |  cnt_rating >= 2000
			  |ORDER BY
			  |  avg_rating DESC
			  |LIMIT
			  |  10
			  |""".stripMargin)
		/*
			root
			 |-- item_id: integer (nullable = false)
			 |-- avg_rating: double (nullable = true)
			 |-- count_rating: long (nullable = false)
		 */
		top10MovieDF.printSchema()
		/*
			+--------+----------+------------+
			|movie_id|avg_rating|count_rating|
			+--------+----------+------------+
			|318     |4.55      |2227        |
			|858     |4.52      |2223        |
			|527     |4.51      |2304        |
			|1198    |4.48      |2514        |
			|260     |4.45      |2991        |
			|2762    |4.41      |2459        |
			|593     |4.35      |2578        |
			|2028    |4.34      |2653        |
			|2858    |4.32      |3428        |
			|2571    |4.32      |2590        |
			+--------+----------+------------+
		 */
		top10MovieDF.show(10, truncate = false)

15-[掌握]-电影评分数据分析之Shuffle分区数

​ 运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。在构建SparkSession实例对象时,设置参数的值

好消息:在Spark3.0开始,不用关心参数值,程序自动依据Shuffle时数据量,合理设置分区数目

16-[掌握]-电影评分数据分析之DSL分析

调用Dataset中函数,采用链式编程分析数据,核心代码如下:

		val resultDF: DataFrame = ratingDF
			// a. 按照电影ID分组
			.groupBy($"item_id")
			// b. 组合数据进行聚合,评分平均值和评分的次数
			.agg(
				round(avg($"rating"), 2).as("avg_rating"), //
				count($"user_id").as("cnt_rating")
			)
			// c. 过滤评分次数大于2000
			.filter($"cnt_rating" > 2000)
			// d. 按照评分平均值降序排序
			.orderBy($"avg_rating".desc)
			// e. 获取前10条数据
			.limit(10)
		resultDF.printSchema()
		resultDF.show(10, truncate = false)

使用需要导入函数库:import org.apache.spark.sql.functions._

使用DSL编程分析和SQL编程分析,哪一种方式性能更好呢?实际开发中如何选择呢???

无论是DSL编程还是SQL编程,性能一模一样,底层转换为RDD操作时,都是一样的:Catalyst

17-[掌握]-电影评分数据分析之保存结果至MySQL

将分析数据保持到MySQL表中,直接调用Dataframe中writer方法,写入数据到MYSQL表中

		// TODO: step 4. 将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中
		resultDF.persist(StorageLevel.MEMORY_AND_DISK)
		
		// 保存结果数据至MySQL表中
		val props = new Properties()
		props.put("user", "root")
		props.put("password", "123456")
		props.put("driver", "com.mysql.cj.jdbc.Driver")
		resultDF
			.coalesce(1) // 对结果数据考虑降低分区数
			.write
    		.mode(SaveMode.Overwrite)
			.jdbc(
				"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
				"db_test.tb_top10_movies", //
				props
			)
		/*
		mysql> select * from tb_top10_movies ;
		+---------+------------+------------+
		| item_id | avg_rating | cnt_rating |
		+---------+------------+------------+
		|     318 |       4.55 |       2227 |
		|     858 |       4.52 |       2223 |
		|     527 |       4.51 |       2304 |
		|    1198 |       4.48 |       2514 |
		|     260 |       4.45 |       2991 |
		|    2762 |       4.41 |       2459 |
		|     593 |       4.35 |       2578 |
		|    2028 |       4.34 |       2653 |
		|    2858 |       4.32 |       3428 |
		|    2571 |       4.32 |       2590 |
		+---------+------------+------------+
		 */
		

		// 保存结果数据至CSv文件中
		
		
		// 数据不在使用时,释放资源
		resultDF.unpersist()

18-[掌握]-电影评分数据分析之保存结果至CSV文件

将结果DataFrame保存值CSV文件中,文件首行为列名称,核心代码如下:

// 保存结果

以上是关于Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))的主要内容,如果未能解决你的问题,请参考以下文章

Note_Spark_Day11:Spark Streaming

Note_Spark_Day01:Spark 框架概述和Spark 快速入门

Note_Spark_Day10:Spark Streaming

Note_Spark_Day12: StructuredStreaming入门

Note_Spark_Day09:离线综合实战

Note_Spark_Day13:Structured Streaming