大数据高级开发工程师——Spark学习笔记
Posted 斗志昂-杨先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据高级开发工程师——Spark学习笔记相关的知识,希望对你有一定的参考价值。
文章目录
Spark内存计算框架
Spark SQL
SparkSQL架构设计
- SparkSQL 是 Spark 技术栈当中又一非常出彩的模块,通过引入 SQL 的支持,大大降低了开发人员和学习人员的使用成本,让我们开发人员直接使用 SQL 的方式就能够实现大数据的开发。
- 它同时支持 DSL 以及 SQL 的语法风格,目前在 spark 的整个架构设计当中,所有的 spark 模块,例如SQL、SparkML、sparkGrahpx 以及 Structed Streaming 等都是基于 Catalyst Optimization & Tungsten Execution模块之上运行,如下图所示就显示了 spark 的整体架构模块设计。
1. SparkSQL的架构设计实现
- 总的来讲,SparkSQL 执行会进过:
- SQL Parser 解析;
- Catalyst 优化器优化处理;
- Spark 执行
- 而 Catalyst 的过程又分为很多过程,其中包括:
- 由 SQL Parser 将 SQL 通过词法、语法解析(检查表、字段是否存在)生成未绑定的逻辑执行计划(Unresolved Logical plan);
- **Analyzer(分析器)**使用 Analysis Rules,配合元数据(Catalog 信息或 Hive Metastore 信息)将 Unresolved Logical plan 解析成 Analyzed logical plan(解析的逻辑计划);——解析SQL语句
- Logical Optimizer(逻辑计划调优器) 使用一些 Optimization Rules(调优规则:合并、列裁剪、过滤器下推等)将 Analyzed logical plan 解析成 Optimized Logical Plan(优化的逻辑计划);——逻辑计划调优过程
- Physical Planner(物理计划生成器) 使用 Planning Strategies:——物理执行计划
- 将前面的 optimized logical plan(还不能被 Spark 执行) 生成可执行的物理计划 physical plan;
- 这个过程是把 logical plan 转换成多个 physical plans;
- 根据过去的性能统计数据,利用代价模型(cost model),选择最佳的物理执行计划 physical plan。
- Code Generation(代码生成器):这个过程会把 SQL 查询生成 Java 字节码。——代码生成阶段
- 举个🌰:以下面这段SQL为例,数据文件 source_data.sql
-- 根据班级查询学生成绩平均值和总和
SELECT tmp.class, SUM(tmp.degree), AVG(tmp.degree)
FROM (
SELECT
students.sno AS ssno,
students.sname,
students.ssex,
students.sbirthday,
students.class,
scores.sno,
scores.degree,
scores.cno
FROM students
LEFT JOIN scores ON students.sno = scores.sno
WHERE degree > 60 AND sbirthday > '1973-01-01 00:00:00'
) tmp GROUP BY tmp.class
- 代码实现:通过 explain 方法来查看 sql 的执行计划
object Case14_DataFrommysqlPlan
def main(args: Array[String]): Unit =
// 1. 创建 SparkConf 对象
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
// 2. 创建 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
// 3. 读取 MySQL 表
val url = "jdbc:mysql://192.168.254.132:3306/mydb?characterEncoding=UTF-8"
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
val studentDF: DataFrame = spark.read.jdbc(url, "students", props)
val scoreDF: DataFrame = spark.read.jdbc(url, "scores", props)
// 4. 将DataFrame注册成临时视图
studentDF.createTempView("students")
scoreDF.createTempView("scores")
// 5. 使用Spark SQL查询数据
val resultDF: DataFrame = spark.sql(
"""
|SELECT tmp.class, SUM(tmp.degree), AVG(tmp.degree)
|FROM (
| SELECT
| students.sno AS ssno,
| students.sname,
| students.ssex,
| students.sbirthday,
| students.class,
| scores.sno,
| scores.degree,
| scores.cno
| FROM students
| LEFT JOIN scores ON students.sno = scores.sno
| WHERE degree > 60 AND sbirthday > '1973-01-01 00:00:00'
|) tmp GROUP BY tmp.class
""".stripMargin)
// 6. 查看执行计划
resultDF.explain(true)
// 7. 展示数据
resultDF.show()
spark.stop()
2. Catalyst执行过程
- 从上面的查询计划我们可以看得出来,我们编写的 sql 语句,经过多次转换,最终进行编译成为字节码文件进行执行,这一整个过程经过了好多个步骤,其中包括以下几个重要步骤:
- SQL 解析阶段 Parser
- 绑定逻辑计划 Analyzer
- SQL 语句调优阶段 Optimizer
- 生成物理查询计划 Planner
SQL 解析阶段 Parser
- 在 spark 2.x 的版本当中,为了解析 sparkSQL 的 sql 语句,引入了Antlr。
- Antlr 是一款强大的语法生成器工具,可用于读取、处理、执行和翻译结构化的文本或二进制文件,是当前 Java 语言中使用最为广泛的语法生成器工具。
- 我们常见的大数据 SQL 解析都用到了这个工具,包括 Hive、Cassandra、Phoenix、Pig 以及 presto 等。
- 目前最新版本的 Spark 使用的是ANTLR4,通过这个对 SQL 进行词法分析并构建语法树。
- 可以通过github去查看spark的源码,具体路径如下:https://github.com/apache/spark/tree/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser。
- 查看得到 sparkSQL 支持的 SQL 语法,所有 sparkSQL 支持的语法都定义在了这个文件当中。
- 如果我们需要重构 sparkSQL 的语法,那么我们只需要重新定义好相关语法,然后使用 Antlr4 对 SqlBaseLexer.g4 和 SqlBaseParser.g4 进行语法解析,生成相关的 java 类:语法解析器 SqlBaseLexer.java 和 语法解析器SqlBaseParser.java。
- 在我们运行上面的java的时候,第一步就是通过SqlBaseLexer来解析关键词以及各种标识符,然后使用SqlBaseParser来构建语法树。
- 最终通过 Lexer 以及 parse 解析之后,生成语法树。生成语法树之后,使用 AstBuilder 将语法树转换成为 LogicalPlan,这个LogicalPlan 也被称为 Unresolved LogicalPlan。解析之后的逻辑计划如下:
- 从上图可以看得到,两个表被 join 之后生成了 UnresolvedRelation。选择的列以及聚合的字段都有了,sql 解析的第一个阶段就已经完成,接着准备进入到第二个阶段。
绑定逻辑计划 Analyzer
- 在 sql 解析 parse 阶段,生成了很多的 unresolvedalias、UnresolvedRelation 等很多未解析出来的某些关键字,这些都是属于 Unresolved LogicalPlan 解析的部分。
- Unresolved LogicalPlan 仅是一种数据结构,不包含任何数据信息(例如:数据源、数据类型、不同的列来自哪张表等)。
- Analyzer 阶段会使用事先定义好的 Rule 以及 SessionCatalog 等信息对 Unresolved LogicalPlan 进行 transform。
- SessionCatalog 主要用于各种函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。而 Rule 是定义在 Analyzer 里面的,具体的类的源码路径:https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
- 查看源码可以看出,多个性质类似的 Rule 组成一个 Batch,而多个 Batch 构成一个 batches。这些 batches 会由 RuleExecutor 执行,先按一个一个 Batch 顺序执行,然后对 Batch 里面的每个 Rule 顺序执行。每个 Batch 会执行一次(Once)或多次(FixedPoint,由spark.sql.optimizer.maxIterations参数决定),执行过程如下:
- 所以上面的 SQL 经过这个阶段生成的 Analyzed Logical Plan 如下:
总结来看 Analyzed Logical Plan 主要就是干了一些这些事情:
- ① 确定最终返回字段名称以及返回类型:class: string, sum(degree): decimal(20,1), avg(degree): decimal(14,5)
- ② 确定聚合函数:Aggregate [class#4], [class#4, sum(degree#12) AS sum(degree)#27, avg(degree#12) AS avg(degree)#28]
- ③ 确定表当中获取的查询字段:Project [sno#0 AS ssno#16, sname#1, ssex#2, sbirthday#3, class#4, sno#10, degree#12, cno#11]
- ④ 确定过滤条件:Filter ((cast(degree#12 as decimal(10,1)) > cast(cast(60 as decimal(2,0)) as decimal(10,1))) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00))
- ⑤ 确定join方式:Join LeftOuter, (sno#0 = sno#10)
- ⑥ 确定表当中的数据来源以及分区个数:JDBCRelation(students) [numPartitions=1] 和 JDBCRelation(scores) [numPartitions=1]
到这里, Analyzed LogicalPlan 就完全生成了。
逻辑优化阶段 Optimizer
- 上边对 Unresolved LogicalPlan 进行相关 transform 操作得到了 Analyzed Logical Plan,这个 Analyzed Logical Plan 是可以直接转换成 Physical Plan 然后在 Spark 中执行的。
- 但如果直接这么弄的话,得到的 Physical Plan 很可能不是最优的,因为在实际应用中,很多低效的写法会带来执行效率的问题,需要进一步对 Analyzed Logical Plan 进行处理,得到更优的逻辑算子树。于是, 针对 SQL 逻辑算子树的优化器 Optimizer 应运而生。
- 这个阶段的优化器主要是基于规则的(Rule-Based Optimizer,简称 RBO),而绝大部分的规则都是启发式规则,也就是基于直观或经验而得出的规则,比如列裁剪(过滤掉查询不需要使用到的列)、谓词下推(将过滤尽可能地下沉到数据源端)、常量累加(比如 1 + 2 这种事先计算好) 以及常量替换(比如
SELECT * FROM table WHERE i = 5 AND j = i + 3
可以转换成SELECT * FROM table WHERE i = 5 AND j = 8
)等等。 - 与前文介绍绑定逻辑计划阶段类似,这个阶段所有的规则也是实现 Rule 抽象类,多个规则组成一个 Batch,多个 Batch 组成一个 batches,同样也是在 RuleExecutor 中进行执行。
- 谓词下推:在 SparkQL 是由 PushDownPredicate 实现的,这个过程主要将过滤条件尽可能地下推到底层,最好是数据源。所以针对我们上面介绍的 SQL,使用谓词下推优化得到的逻辑计划:上图是从下往上看,谓词下推将 Filter 算子直接下推到 Join 之前了。也就是在扫描 student 表的时候使用条件过滤条件过滤出满足条件的数据;同时在扫描 score 表的时候会先使用过滤条件过滤出满足条件的数据。经过这样的操作,可以大大减少 Join 算子处理的数据量,从而加快计算速度。
- **列裁剪:**在 Spark SQL 是由 ColumnPruning 实现的。因为我们查询的表可能有很多个字段,但是每次查询我们很大可能不需要扫描出所有的字段,这个时候利用列裁剪可以把那些查询不需要的字段过滤掉,使得扫描的数据量减少。所以针对我们上面介绍的 SQL,使用列裁剪优化得到的逻辑计划:
- 经过列裁剪后,students 表只需要查询 sno和 class 两个字段;scores 表只需要查询 sno 和 degree 字段。
- 这样减少了数据的传输,而且如果底层的文件格式为列存(比如 Parquet),可以大大提高数据的扫描速度的。
- **常量替换:**在 Spark SQL 是由 ConstantPropagation 实现的。也就是将变量替换成常量,比如
SELECT * FROM table WHERE i = 5 AND j = i + 3
可以转换成SELECT * FROM table WHERE i = 5 AND j = 8
。这个看起来好像没什么的,但是如果扫描的行数非常多可以减少很多的计算时间的开销的。 - **常量累加:**在 Spark SQL 是由 ConstantFolding 实现的。这个和常量替换类似,也是在这个阶段把一些常量表达式事先计算好。这个看起来改动的不大,但是在数据量非常大的时候可以减少大量的计算,减少 CPU 等资源的使用。
- 到此为止,优化逻辑阶段基本完成,另外更多的其他优化,参见spark源码:https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L59
生成可执行的物理计划阶段 Physical Plan
- 经过前面多个步骤,包括 parse,analyzer 以及 Optimizer 等多个阶段,得到经过优化之后的sql语句,但是这个sql语句仍然不能执行。
- 为了能够执行这个sql,最终必须得要翻译成为可以被执行的物理计划,到这个阶段 spark 就知道该如何执行这个 sql 了。
- 和前面逻辑计划绑定和优化不一样,这个阶段使用的是 strategy(策略),而且经过前面介绍的逻辑计划绑定和 Transformations 动作之后,树的类型并没有改变。也就是说:Expression(表达式) 经过 Transformations 之后得到的还是 Expression(表达式) ;Logical Plan 经过 Transformations 之后得到的还是 Logical Plan。
- 而到了这个阶段,经过 Transformations 动作之后,树的类型改变了,由 Logical Plan 转换成 Physical Plan 了。一个逻辑计划(Logical Plan)经过一系列的策略处理之后,得到多个物理计划(Physical Plans)。
- 物理计划在 Spark 是由 SparkPlan 实现的。多个物理计划再经过代价模型(Cost Model)得到选择后的物理计划(Selected Physical Plan),整个过程如下所示:
- Cost Model 对应的就是基于代价的优化(Cost-based Optimizations,CBO,主要由华为的大佬们实现的,详见 SPARK-16026:https://www.iteblog.com/redirect.php?url=aHR0cHM6Ly9pc3N1ZXMuYXBhY2hlLm9yZy9qaXJhL2Jyb3dzZS9TUEFSSy0xNjAyNg==&article=true ,核心思想是计算每个物理计划的代价,然后得到最优的物理计划。
- SPARK-16026 引入的 CBO 优化主要是在前面介绍的优化逻辑计划阶段 - Optimizer 阶段进行的,对应的 Rule 为 CostBasedJoinReorder,并且默认是关闭的,需要通过 spark.sql.cbo.enabled或 spark.sql.cbo.joinReorder.enabled 参数开启。
- 最后得到的物理计划如下:
- 从上面的结果可以看出,物理计划阶段已经知道数据源是从 JDBC 里面读取了,也知道文件的路径,数据类型等。而且在读取文件的时候,直接将过滤条件(PushedFilters)加进去了。同时,这个 Join 变成了 SortMergeJoin。
- 到这里, Physical Plan 就完全生成了。
3. 代码生成阶段
- 从以上多个过程执行完成之后,最终我们得到的物理执行计划。这个物理执行计划表明了整个的代码执行过程当中我们代码层面的执行过程,以及最终要得到的数据字段以及字段类型,也包含了我们对应的数据源的位置。
- 虽然得到了物理执行计划,但是这个物理执行计划想要被执行,最终还是得要生成完整的代码,底层还是基于 spark RDD 去进行处理的。
- spark 最后也还会有一些 Rule 对生成的物理执行计划进行处理,这个处理过程就是 prepareForExecution,这些 rule 规则定义在 org.apache.spark.sql.execution.QueryExecution 这个类当中的 preparations 方法
protected def prepareForExecution(plan: SparkPlan): SparkPlan =
preparations.foldLeft(plan) case (sp, rule) => rule.apply(sp)
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
python.ExtractPythonUDFs, //抽取python的自定义函数
PlanSubqueries(sparkSession), //子查询物理计划处理
EnsureRequirements(sparkSession.sessionState.conf), //确保执行计划分区排序正确
CollapseCodegenStages(sparkSession.sessionState.conf), //收集生成代码
ReuseExchange(sparkSession.sessionState.conf), //节点重用
ReuseSubquery(sparkSession.sessionState.conf)) //子查询重用
- 上面的 Rule 中 CollapseCodegenStages 是重头戏,这就是大家熟知的全代码阶段生成,Catalyst 全阶段代码生成的入口就是这个规则。当然,如果需要 Spark 进行全阶段代码生成,需要将 spark.sql.codegen.wholeStage 设置为 true(默认)。
生成代码与sql解析引擎的区别
- 在 SparkSQL 中,通过生成代码,来实现 sql 语句的最终生成,说白了最后底层执行的还是代码。那么为什么要这么麻烦,使用代码的方式来执行我们的 sql 语句,难道没有 sql 的解析引擎直接执行 sql 语句嘛?
- 当然是有的,在spark2.0版本之前使用的都是基于 Volcano Iterator Model(参见 《Volcano-An Extensible and Parallel Query Evaluation System》http://paperhub.s3.amazonaws.com/dace52a42c07f7f8348b08dc2b186061.pdf) 来实现 sql 的解析的,这个是由 Goetz Graefe 在 1993 年提出的,当今绝大多数数据库系统处理 SQL 在底层都是基于这个模型的。这个模型的执行可以概括为:
- 首先数据库引擎会将 SQL 翻译成一系列的关系代数算子或表达式,然后依赖这些关系代数算子逐条处理输入数据并产生结果。
- 每个算子在底层都实现同样的接口,比如都实现了 next() 方法,然后最顶层的算子 next() 调用子算子的 next(),子算子的 next() 在调用孙算子的 next(),直到最底层的 next(),具体过程如下图表示。
- Volcano Iterator Model 的优点是抽象起来很简单,很容易实现,而且可以通过任意组合算子来表达复杂的查询。但是缺点也很明显,存在大量的虚函数调用,会引起 CPU 的中断,最终影响了执行效率。
- databricks的官方博客对比过使用Volcano Iterator Model和手写代码的执行效率,结果发现手写的代码执行效率要高出十倍!
- 所以总结起来就是将 sql 解析成为代码,比 sql 引擎直接解析 sql 语句效率要快,所以 spark2.0 最终选择使用代码生成的方式来执行 sql 语句。
- 基于上面的发现,从 Apache Spark 2.0 开始,社区开始引入了 Whole-stage Code Generation,参见 SPARK-12795,主要就是想通过这个来模拟手写代码,从而提升 Spark SQL 的执行效率。Whole-stage Code Generation 来自于2011年 Thomas Neumann 发表的 Efficiently Compiling Efficient Query Plans for Modern Hardware论文,这个也是 Tungsten 计划的一部分。
Tungsten 代码生成
Tungsten 代码生成分为三部分
- 表达式代码生成(expression codegen)
- 全阶段代码生成(Whole-stage Code Generation)
- 加速序列化和反序列化(speed up serialization/deserialization)
- 表达式代码生成(expression codegen)
- 这个其实在 Spark 1.x 就有了,表达式代码生成的基类是
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
,其下有七个子类。 - 前文的 SQL 生成的逻辑计划中的 **(isnotnull(sbirthday#3) && (cast(sbirthday#3 as string) > 1973-01-01 00:00:00)**就是最基本的表达式。
- 它也是一种 Predicate(谓词),所以会调用
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
来生成表达式的代码。
- 这个其实在 Spark 1.x 就有了,表达式代码生成的基类是
- 全阶段代码生成(Whole-stage Code Generation)
- 全阶段代码生成用来将多个处理逻辑整合到单个代码模块中,其中也会用到上面的表达式代码生成。
- 和前面介绍的表达式代码生成不一样,这个是对整个 SQL 过程进行代码生成,前面的表达式代码生成仅对于表达式的。
- 全阶段代码生成都是继承自
org.apache.spark.sql.execution.BufferedRowIterator
的,生成的代码需要实现 processNext() 方法,这个方法会在org.apache.spark.sql.execution.WholeStageCodegenExec
里面的 doExecute 方法里面被调用。 - 而这个方法(doExecute)里面的 rdd 会将数据传进生成的代码里面 ,比如我们上文 SQL 这个例子的数据源是 JDBC文件,底层使用
org.apache.spark.sql.execution.RowDataSourceScanExec
这个类读取文件,然后生成 inputRDD,这个 rdd 在 WholeStageCodegenExec 类中的 doExecute 方法里面调用生成的代码,然后执行我们各种判断得到最后的结果。WholeStageCodegenExec 源码注释说明如下:
- 相比 Volcano Iterator Model,全阶段代码生成的执行过程如下:通过引入全阶段代码生成,大大减少了虚函数的调用,减少了 CPU 的调用,使得 SQL 的执行速度有很大提升。
代码编译
- 生成代码之后需要解决的另一个问题是如何将生成的代码进行编译然后加载到同一个 JVM中去。
- 在早期 Spark 版本是使用 Scala 的 Reflection 和 Quasiquotes 机制来实现代码生成的。Quasiquotes 是一个简洁的符号,可以让我们轻松操作 Scala 语法树,具体参见 https://docs.scala-lang.org/overviews/quasiquotes/intro.html。虽然 Quasiquotes 可以很好的为我们解决代码生成等相关的问题,但是带来的新问题是编译代码时间比较长(大约 50ms - 500ms)!所以社区不得不默认关闭表达式代码生成。
- 为了解决这个问题,Spark 引入了 Janino 项目,参见 SPARK-7956。Janino 是一个超级小但又超级快的 Java™ 编译器,它不仅能像 javac 工具那样将一组源文件编译成字节码文件,还可以对一些 Java 表达式,代码块,类中的文本(class body)或者内存中源文件进行编译,并把编译后的字节码直接加载到同一个 JVM中运行。
- Janino 不是一个开发工具,而是作为运行时的嵌入式编译器,比如作为表达式求值的翻译器或类似于 JSP 的服务端页面引擎,关于 Janino 的更多知识请参见:https://janino-compiler.github.io/janino/。
- 通过引入了 Janino 来编译生成的代码,结果显示 SQL 表达式的编译时间减少到 5ms。在 Spark 中使用了 ClassBodyEvaluator 来编译生成之后的代码,参见
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
。 - 注意:代码生成是在 Driver 端进行的,而代码编译是在 Executor 端进行的。
SQL执行
- 终于到了 SQL 真正执行的地方了。这个时候 Spark 会执行上阶段生成的代码,然后得到最终的结果,DAG 执行图如下:
4. SparkSQL执行过程深度总结
SparkSQL调优
1. 数据缓存
- 主要是将数据放入内存中操作,spark 缓存注册表的方法
// 缓存表
spark.catalog.cacheTable("tableName")
// 释放缓存表
spark.catalog.uncacheTable("tableName")
2. 性能优化相关参数
- Spark sql仅仅会缓存必要的列,并且自动调整压缩算法来减少内存和GC压力。
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。 |
spark.sql.files.maxPartitionBytes | 128MB | 读取文件时单个分区可容纳的最大字节数(不推荐手动修改) |
spark.sql.files.openCostInBytes | 4M | 打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。 |
3. 表数据广播
- 在进行表 join 的时候,将小表广播可以提高性能,spark2.x 中可以调整以下参数
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.broadcastTimeout | 300 | 广播等待超时时间,单位秒 |
spark.sql.autoBroadcastJoinThreshold | 10M | 用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE < tableName > COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。 |
4. 分区数的控制
- spark 任务并行度的设置中,有两个参数可以设置
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.shuffle.partitions | 200 | 用于配置 join 或aggregate shuffle数据时使用的分区数。 |
spark.default.parallelism | 对于分布式shuffle操作像reduceByKey和join,父RDD中分区的最大数目。对于无父RDD的并行化等操作,它取决于群集管理器: ①本地模式:本地计算机上的核心数 ②Mesos fine grained mode:8 ③其他:所有执行节点上的核心总数或2,以较大者为准 | 分布式shuffle操作的分区数 |
- 看起来它们的定义似乎也很相似,但在实际测试中
- spark.default.parallelism 只有在处理 RDD 时才会起作用,对 Spark SQL 的无效。
- spark.sql.shuffle.partitions 则是对 sparks SQL 专用的设置
5. 文件与分区
- 有两个参数可以调整:
- 读取文件的时候一个分区接受多少数据;
- 文件打开的开销,通俗理解就是小文件合并的阈值。
- 文件打开是有开销的,开销的衡量,Spark 采用了一个比较好的方式就是打开文件的开销,用相同时间能扫描的数据的字节数来衡量。
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | 打包传入一个分区的最大字节,在读取文件的时候 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | 用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度) |
-
spark.sql.files.maxPartitionBytes 该值的调整要结合你想要的并发度及内存的大小来进行。
-
spark.sql.files.openCostInBytes 说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并
6. 数据的本地性
- 分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。
- 移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络 IO,也消耗了磁盘 IO,降低了整个计算的效率。
- 为了提高数据的本地性,除了优化算法(也就是修改 spark 内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值- 。
- 来看看一个 stage 里所有 task 运行的一些性能指标:
- Scheduler Delay:spark 分配 task 所花费的时间
- Executor Computing Time:executor 执行 task 所花费的时间
- Getting Result Time:获取 task 执行结果所花费的时间
- Result Serialization Time:task 执行结果序列化时间
- Task Deserialization Time:task 反序列化时间
- Shuffle Write Time:shuffle 写数据时间
- Shuffle Read Time:shuffle 读数据所花费时间
- Input Size:每个批次处理输入数据大小
- Locality Level:读取级别,通常读取数据 PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY,尽量使数据以PROCESS_LOCAL 或 NODE_LOCAL 方式读取。其中 PROCESS_LOCAL 还和 cache 有关。
- PROCESS_LOCAL:是指读取缓存在本地节点的数据
- NODE_LOCAL:是指读取本地节点硬盘数据
- RACK_LOCAL:是指读取所在物理机架节点数据
- ANY:是指读取非本地节点数据
7. SparkSQL参数调优总结
- Hive 参数:
// 是否允许动态生成分区
set hive.exec.dynamic.partition=true;
// 是否容忍指定分区全部动态生成
set hive.exec.dynamic.partition.mode=nonstrict;
// 动态生成的最多分区数
set hive.exec.max.dynamic.partitions = 100;
- 运行行为参数:
// 大表 JOIN 小表,小表做广播的阈值
set spark.sql.autoBroadcastJoinThreshold;
// 开启动态资源分配
set spark.dynamicAllocation.enabled;
// 开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.maxExecutors;
// 开启动态资源分配后,最少可分配的Executor数
set spark.dynamicAllocation.minExecutors;
// 需要shuffle是mapper端写出的partition个数
set spark.sql.shuffle.partitions;
// 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.enabled;
// 开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize;
// 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.sql.adaptive.minNumPostShufflePartitions;
// 当几个stripe的大小大于该值时,会合并到一个task中处理
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;
- Executor 参数:
// executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.executor.memory;
// Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.yarn.executor.memoryOverhead;
// 当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.sql.windowExec.buffer.spill.threshold;
// 单个executor上可以同时运行的task数
set spark.executor.cores;
以上是关于大数据高级开发工程师——Spark学习笔记的主要内容,如果未能解决你的问题,请参考以下文章