spark计算引擎之SPARK详解

Posted 虎子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark计算引擎之SPARK详解相关的知识,希望对你有一定的参考价值。

一、Spark SQL

1Spark SQL概述

1.1Spark SQL的前世今生Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。Shark建立在Hive的代码基础上,并通过将Hive的部分物理执行计划交换出来。这个方法使得Shark的用户可以加速Hive的查询,但是Shark继承了Hive的大且复杂的代码使得Shark很难优化和维护,同时Shark依赖于Spark的版本。随着我们遇到了性能优化的上限,以及集成SQL的一些复杂的分析功能,我们发现HiveMapReduce设计的框架限制了Shark的发展。在201471日的SparkSummit上,Databricks宣布终止对Shark的开发,将重点放到SparkSQL上。

1.2.什么是Spark SQL

Spark SQLSpark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。相比于Spark RDD APISpark SQL包含了对结构化数据和在其上运算的更多信息,Spark SQL使用这些信息进行了额外的优

化,使对结构化数据的操作更加高效和方便。

有多种方式去使用Spark SQL,包括SQLDataFrames APIDatasets API。但无论是哪种API或者是编程语言,它们都是基于同样的执行引擎,因此你可以在不同的API之间随意切换,它们各有各的特点,看你喜欢那种风格。1.3.为什么要学习Spark SQL 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群中去执行,大大简化了编写MapReduce程序的复杂

性,由于MapReduce这种计算模型执行效率比较慢,所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群中去运行,执行效率非常快!1.易整合

sql查询与spark程序无缝混合,可以使用javascalapythonR等语言的API操作。2.统一的数据访问

以相同的方式连接到任何数据源。3.兼容Hive

支持hiveSQL的语法。4.标准的数据连接

2DataFrame

2.1 .什么是DataFrame

DataFrame 的前身是SchemaRDD ,从Spark 1.3.0 开始SchemaRDD 更名为DataFrame 。与SchemaRDD 的主要区别是:DataFrame 不再直接继承自RDD ,而是自己实现了RDD 的绝大多数功能。你仍旧可以在DataFrame 上调用rdd 方法将其转换为一个RDD

Spark 中,DataFrame 是一种以RDD 为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame 带有Schema 元信息,即DataFrame 所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。DataFrame 可以从很多数据源构建,比如:已经存在的RDD 、结构化文件、外部数据库、Hive 表。

2.2 DataFrame RDD 的区别

RDD 可看作是分布式的对象的集合,Spark 并不知道对象的详细模式信息,DataFrame 可看作是分布式的Row 对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL 可以进行某些形式的执行优化。DataFrame 和普通的RDD 的逻辑框架区别如下所示:

上图直观地体现了DataFrame RDD 的区别。左侧的RDD[Person] 虽然以Person 为类型参数,但Spark 框架本身不了解Person 类的内部结构。而右侧的DataFrame 却提供了详细的结构信息,使得Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各

是什么。DataFrame 多了数据的结构信息,即schema 。这样看起来就像一张表了,DataFrame 还配套了新的操作数据的方

法,DataFrame API(如df.select()) SQL(select id, name from xx_table where ...) 。此外DataFrame 还引入了off-heap, 意味着JVM 堆以外的内存, 这些内存直接受操作系统管理(而不是JVM )。Spark 能够以二进制的形式序列化数据(不包括结构)off-heap , 当要操作数据时, 就直接操作off-heap内存. 由于Spark 理解schema, 所以知道该如何操作。

RDD 是分布式的Java 对象的集合。DataFrame 是分布式的Row 对象的集合。DataFrame 除了提供了比RDD 更丰富的算子以

外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。有了DataFrame 这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL 来处理数据了,对开发者来说,易用性有了很大的提升。

不仅如此,通过DataFrame API SQL 处理数据,会自动经过Spark 优化器(Catalyst )的优化,即使你写的程序或SQL 不高效,也可以运行的很快。2.3 DataFrame RDD 的优缺点RDD 的优缺点:优点:

1)编译时类型安全编译时就能检查出类型错误(2)面向对象的编程风格直接通过对象调用方法的形式来操作数据缺点: 1)序列化和反序列化的性能开销无论是集群间的通信, 还是IO 操作都需要对对象的结构和数据进行序列化和反序列化。(2GC 的性能开销频繁的创建和销毁对象, 势必会增加GC DataFrame 通过引入schema off-heap (不在堆里面的内存,指的是除了不在堆的内存,使用操作系统上的内存),解决了

RDD 的缺点, Spark 通过schame 就能够读懂数据, 因此在通信和IO 时就只需要序列化和反序列化数据, 而结构的部分就可以省略

了;通过off-heap 引入,可以快速的操作数据,避免大量的GC 。但是却丢了RDD 的优点,DataFrame 不是类型安全的, APIń

不是面向对象风格的。

2.4 .读取数据源创建DataFram.
2.4.1
读取文本文件创建DataFram.
spark2.0 版本之前,Spark SQL SQLContext 是创建DataFrame 和执行SQL 的入口,利用hiveContext 通过hive sql 语句操
 

hive 表数据,兼容hive 操作,并且hiveContext 继承自SQLContext 。在spark2.0 之后,这些都统一于SparkSession SparkSession 封装了SparkContext SqlContext ,通过SparkSession 可以获取到SparkConetxt,SqlContext 对象。

1)在本地创建一个文件,有三列,分别是id name age ,用空格分隔,然后上传到hdfs 上。person.txt 内容为:上传数据文件到HDFS 上:hdfs dfs -put person.txt / 2)在spark shell 执行下面命令,读取数据,将每一行的数据使用列分隔符分割先执行spark-shell --master local[2] val lineRDD= sc.textFile("/person.txt").map(_.split(" "))

3)定义case class (相当于表的schemaİ case class Person(id:Int, name:String, age:Int)

4)将RDD case classŵ val personRDD = lineRDD.map(x => Person( x(0).toInt, x(1) , x(2).toInt )) 5)将RDD 转换成DataFrame val personDF = personRDD.toDF

personDF.printSchema

7)、通过SparkSession 构建DataFram.
使用spark-shell 中已经初始化好的SparkSession 对象spark 生成DataFram.
val dataFrame=spark.read.text("/person.txt"1

 

2.4.2 读取json 文件创建DataFrame 1)数据文件使用spark 安装包下的/opt/bigdata/spark/examples/src/main/resources / people.json ĉ 2)在spark shell 执行下面命令,读取数据

3)接下来就可以使用DataFrame 的函数操作

2.4.3 读取parquet 列式存储格式文件创建DataFrame 3)数据文件使用spark 安装包下的/opt/bigdata/spark/examples/src/main/resources / users.parquet ĉ

2)在spark shell 执行下面命令,读取数据

3)接下来就可以使用DataFrame 的函数操作

3.DataFrame 常用操作3.1. DSL 风格语法DataFrame 提供了一个领域特定语言(DSL) 以方便操作结构化数据。下面是一些使用示例(1)查看DataFrame 中的内容,通过调用show 方法personDF.show

2)查看DataFrame 部分列中的内容查看name 字段的数据personDF.select(personDF.col("name")).show

查看name 字段的另一种写法

查看name age 字段数据personDF.select(col("name") , col("age")) .show

3)打印DataFrame Schema 信息personDF.printSchema

(4)查询所有的name 和age ,并将age+1

personDF.select(col("id"), col("name"), col ("age") + 1) . show

personDF.select(personDF("id"), personDF("name") , personDF("age") + 1) . show

5)过滤age 大于等于25 的,使用filter 方法过滤personDF.filter(col("age") >= 25).show

6)统计年龄大于30 的人数personDF.filter(col("age")>30).count()

7)按年龄进行分组并统计相同年龄的人数personDF.groupBy("age").count().show

3.2. SQL 风格语法

 DataFrame 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL

查询,结果将作为一个DataFrame 返回。

如果想使用SQL 风格的语法,需要将DataFrame 注册成表,采用如下的方式:

personDF.registerTempTable("t_person")

1)查询年龄最大的前两名

spark.sql("select * from t_person order by age desc limit 2" ). show

2)显示表的Schema 信息spark.sql("desc t_person").show

3)查询年龄大于30 的人的信
spark.sql("select * from t_personwhere age > 30 ") .sho,

 

4.DataSet 4.1. 什么是DataSet DataSet 是分布式的数据集合。DataSet 是在Spark1.6 中添加的新的接口。它集中了RDD 的优点(强类型和可以用强大lambda

函数)以及Spark SQL 优化的执行引擎。DataSet 可以通过JVM 的对象进行构建,可以用函数式的转换(map/flatmap/filter İ 进行多种操作。4.2. DataFrame DataSet RDD 的区别假设RDD 中的两行数据长这样:

那么DataFrame 中的数据长这样:

那么Dataset 中的数据长这样:

或者长这样(每行数据是个Object:

DataSet 包含了DataFrame 的功能,Spark2.0 中两者统一,DataFrame 表示为DataSet[Row] ,即DataSet 的子集。(1DataSet 可以在编译时检查类型(2)并且是面向对象的编程接口相比DataFrame Dataset 提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运

行),到提交到集群运行时才发现错误,这会浪费大量的时间,这也是引入Dataset 的一个重要原因。4.3. DataFrame DataSet 的互转DataFrame DataSet 可以相互转化。(1DataFrame 转为DataSet df.as[ElementType] 这样可以把DataFrame 转化为DataSet 。(2DataSet 转为DataFrame ds.toDF() 这样可以把DataSet 转化为DataFrame 4.4. 创建DataSet 1)通过spark.createDataset 创建(2)通toDS 方法生成DataSet

3)通过DataFrame 转化生成使用as[] 转换为DataSet

三、以编程方式执行Spark SQL 查询1.编写Spark SQL 程序实现RDD 转换DataFrame 前面我们学习了如何在Spark Shell 中使用SQL 完成查询,现在我们来实现在自定义的程序中编写Spark SQL 查询程序。在Spark SQL 中有两种方式可以在DataFrame RDD 进行转换,第一种方法是利用反射机制,推导包含某种类型的RDD ,通

过反射将其转换为指定类型的DataFrame ,适用于提前知道RDD schema 。第二种方法通过编程接口与RDD 进行交互获取schema ,并动态创建DataFrame ,在运行时决定列及其类型。首先在maven 项目的pom.xml 中添加Spark SQL 的依赖1.1 .通过反射推断Schema Scala 支持使用case class 类型导入RDD 转换为DataFrame ,通过case class 创建schema case class 的参数名称会被反射读

取并成为表的列名。这种RDD 可以高效的转换为DataFrame 并注册为表。代码如下:1.2 .通过StructType 直接指定Schema case class 不能提前定义好时,可以通过以下三步通过代码创建DataFrame 1)将RDD 转为包含row 对象的RDD 2)基于structType 类型创建schema ,与第一步创建的RDD 相匹配(3)通过sparkSession createDataFrame 方法对第一步的RDD 应用schema 创建DataFrame 2.编写Spark SQL 程序操作HiveContext

HiveContext 是对应spark-hive 这个项目,hive 有部分耦合, 支持hql, SqlContext 的子类,也就是说兼容SqlContext; 2.1 .添加pom 依赖2.2 .代码实现四、数据源1JDBC Spark SQL 可以通过JDBC 从关系型数据库中读取数据的方式创建DataFrame ,通过对DataFrame 一系列的计算后,还可以将

数据再写回关系型数据库中。1.1 SparkSql mysql 中加载数据1.1.1 通过IDEA 编写SparkSql 代码执行查看效果:

1.1.2 通过spark-shell 运行(1)、启动spark-shell(必须指定mysql 的连接驱动包) 2)、从mysql 中加载数据(3)、执行查询

1.2 SparkSql 将数据写入到MySQL 1.2.1 通过IDEA 编写SparkSql 代码(1)编写代码(2)用maven 将程序打包通过IDEA 工具打包即可(3)将Jar 包提交到spark 集群

(4)查看mysql 中表的数据

 

以上是关于spark计算引擎之SPARK详解的主要内容,如果未能解决你的问题,请参考以下文章

上:Spark VS Flink – 下一代大数据计算引擎之争,谁主沉浮?

Spark比拼Flink:下一代大数据计算引擎之争,谁主沉浮?

Spark之 spark简介生态圈详解

FineBI学习系列之FineBI与Spark数据连接(图文详解)

大数据进阶之Spark计算运行流程

如何计算 Spark SQL(Databricks)中表中的列数?