Spark基础学习笔记23:DataFrame与Dataset

Posted howard2005

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础学习笔记23:DataFrame与Dataset相关的知识,希望对你有一定的参考价值。

文章目录

零、本讲学习目标

  1. 了解Spark SQL的基本概念
  2. 掌握DataFrame的基本概念
  3. 掌握Dataset的基本概念
  4. 会基于DataFrame执行SQL查询

一、Spark SQL

(一)Spark SQL概述

  • Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如JSONParquetAvroCSV格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

(二)Spark SQL主要特点

1、将SQL查询与Spark应用程序无缝组合

  • Spark SQL允许使用SQL或熟悉的DataFrame API在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce的;而Spark SQL底层使用的是Spark RDD。
  • 在Spark应用程序中嵌入SQL语句
val results = spark.sql( "SELECT * FROM users")

2、Spark SQL以相同方式连接多种数据源

  • Spark SQL提供了访问各种数据源的通用方法,数据源包括HiveAvroParquetORCJSONJDBC等。
  • 读取HDFS中的JSON文件,基于文件内容创建临时视图,最后与其他表根据指定的字段关联查询
// 读取JSON文件
val userScoreDF = spark.read.json("hdfs://master:9000/users.json")
// 创建临时视图user_score
userScoreDF.createTempView("user_score")
// 根据name关联查询
val resDF = spark.sql("SELECT i.age, i.name, c.score FROM user_info i INNER JOIN user_score c ON i.name = c.name")

3、在现有数据仓库上运行SQL或HiveQL查询

  • Spark SQL支持HiveQL语法以及Hive SerDesUDF(用户自定义函数),允许访问现有的Hive仓库。

二、数据帧 - DataFrame

(一)DataFrame概述

  • DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合,但与RDD不同,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。

(二)将RDD转成DataFrame

  • DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息),因此看起来更像是一张数据库表。
  • 一个RDD中有3行数据
  • 将该RDD转成DataFrame后,数据可能如下图所示
  • 使用DataFrame API结合SQL处理结构化数据比RDD更加容易,而且通过DataFrame API或SQL处理数据,Spark优化器会自动对其优化,即使写的程序或SQL不高效,也可以运行得很快。

三、数据集 - Dataset

(一)Dataset概述

  • Dataset是一个分布式数据集,是Spark 1.6中添加的一个新的API。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用Dataset API同样会经过Spark SQL优化器的优化,从而提高程序执行效率。

(二)将RDD转成DataSet

  • 一个RDD中有3行数据
  • 将其转换为Dataset后的数据可能如下图所示

(三)DataFrame与Dataset的关系

  • 在Spark中,一个DataFrame所代表的是一个元素类型为RowDataset,即DataFrame只是Dataset[Row]的一个类型别名。

四、简单使用Spark SQL

(一)了解SparkSession

  • Spark Shell启动时除了默认创建一个名为scSparkContext的实例外,还创建了一个名为sparkSparkSession实例,该spark变量可以在Spark Shell中直接使用。
  • SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。

(二)准备数据文件

  • 创建student.txt文件并上传到HDFS的/input目录

(三)加载数据为Dataset

1、读取文本文件,返回数据集

  • 调用SparkSession的API read.textFile()可以读取指定路径中的文件内容,并加载为一个Dataset
  • 执行命令:val ds = spark.read.textFile("hdfs://master:9000/input/student.txt")
  • 从变量ds的类型可以看出,textFile()方法将读取的数据转为了Dataset。除了使用textFile()方法读取文本内容外,还可以使用csv()jdbc()json()等方法读取CSV文件、JDBC数据源、JSON文件等数据。

2、显示数据集的内容

  • 调用Dataset中的show()方法可以输出Dataset中的数据内容
  • 执行命令:ds.show()
  • 从上述内容可以看出,Dataset将文件中的每一行看作一个元素,并且所有元素组成了一列,列名默认为value

(四)给数据集添加元数据信息

1、定义样例类

  • 定义一个样例类Student,用于存放数据描述信息(Schema
  • 执行命令:case class Student(id: Int, name: String, age: Int)

2、导入隐式转换

  • 导入SparkSession的隐式转换,以便后续可以使用Dataset的算子
  • 执行命令:import spark.implicits._

3、将数据集内容存入样例类

  • 调用Dataset的map()算子将每一个元素拆分并存入Student类中
  • 执行命令
 val studentDataset = ds.map(line => 
     val fields = line.split(",")
     val id = fields(0).toInt
     val name = fields(1)
     val age = fields(2).toInt
     Student(id, name, age)
)

4、查看数据集内容

  • 执行命令:studentDataset.show()
  • 可以看到,studentDataset中的数据类似于一张关系型数据库的表。

(五)将数据集转为数据帧

  • Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame。
  • 执行命令:val sdf = studentDataset.toDF()

(六)基于数据帧进行SQL查询

1、基于数据帧创建临时视图

  • 在DataFrame上创建一个临时视图student_view
  • 执行命令:sdf.createTempView("student_view")

2、使用SparkSession对象执行SQL查询

  • 执行命令:val result = spark.sql("select * from student_view order by age desc")
  • 执行命令:result.show(),显示查询结果
  • 可以看到,结果数据已按照age字段降序排列。

以上是关于Spark基础学习笔记23:DataFrame与Dataset的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Spark—— Spark SQL应用—— Spark DataSet基础操作

Spark基础学习笔记32:Spark Streaming概述

2022年Spark基础学习笔记

2022年Spark基础学习笔记目录

Spark基础学习笔记04:Scala简介与安装

学习笔记Spark—— Spark编程基础(创建RDDRDD算子文件读取与存储)