可能是全网最详细的 Spark Sql Aggregate 源码剖析
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了可能是全网最详细的 Spark Sql Aggregate 源码剖析相关的知识,希望对你有一定的参考价值。
参考技术A纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。
本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的
创建聚合分为两个阶段:
AggregateExpression 共有以下几种 mode:
Q:是否支持使用 hash based agg 是如何判断的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为
这样就能进入 HashAggregateExec 的分支
构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化
在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:
可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。
此迭代器:
注:UnsafeKVExternalSorter 的实现可以参考:
UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:
使用 UnsafeRow 的收益:
构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现
上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:
上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类
AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。
DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:
我们再次以容易理解的 Count 来举例说明:
通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:
再来看看 AggregationIterator#processRow
AggregationIterator#processRow 会调用
生成用于处理一行数据(row)的函数
说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:
比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:
对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:
对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:
对于 Final 的 Collect 来说就是调用其 merge 方法,即:
我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。
分为两种情况,分别是:
Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~
这可能是全网最详细的python安装教程(windows)
python安装是学习pyhon第一步,很多刚入门小白不清楚如何安装python,今天我来带大家完成python安装与配置,跟着我一步步来,很简单,你肯定能完成。
第一部分:python安装
(一)准备工作
1、下载和安装python(认准官方网站)
(二)开始安装
对于Windows操作系统,可以下载“executable installer”。需要注意的是,如果在Windows 7环境下安装Python 3,需要先安装ServicePack 1补丁包,大家可以在Windows的“运行”中输入winver命令,从弹出的窗口上可以看到你的系统是否安装了该补丁包。如果没有该补丁包,一定要先通过“Windows Update”或者类似“CCleaner”这样的工具自动安装该补丁包,安装完成后通常需要重启你的Windows系统,然后再开始安装Python环境。
双击运行刚才下载的安装程序,会打开Python环境的安装向导。在执行安装向导的时候,记得勾选“Add Python 3.x to PATH”选项,这个选项会帮助我们将Python的解释器添加到PATH环境变量中(不理解没关系,照做就行),具体的步骤如下图所示。
安装完成后可以按win+R(win就是开始菜单的那个键)打开Windows的“命令行提示符”工具并输入python --version或python -V来检查安装是否成功,命令行提示符可以在“运行”中输入cmd来打开或者在“开始菜单”的附件中找到它。如果看了Python解释器对应的版本号(如:Python 3.7.8),说明你的安装已经成功了,如下图所示。
说明:如果安装过程显示安装失败或执行上面的命令报错,很有可能是因为你的Windows系统缺失了一些动态链接库文件而导致的问题。如果系统显示api-ms-win-crt*.dll文件缺失,可以在微软官网下载Visual C++ Redistributable for Visual Studio 2015文件进行修复,64位的系统需要下载有x64标记的安装文件。如果是因为安装游戏时更新了Windows的DirectX之后导致某些动态链接库文件缺失问题,可以下载一个DirectX修复工具进行修复。
这样我们python软件已经下载安装好了,电脑已经学会了python语言。
输入print(‘Hello World!’),写下你的第一句Python代码。
(三)增加环境变量
下面介绍下怎么手动添加环境变量,因为不添加python没法用。首先得知道在哪添加,按照这个顺序点进去就能找到环境变量在怎么增加呢 很简单,
首先找到你python的安装路径——复制哪增加。
怎么增加呢 很简单,首先找到你python的安装路径——复制
再找到刚刚那个环境变量的窗口,找到Path进行编辑,然后新增,将路径复制一路保存就好了
现在我们就可以愉快的使用python了!
第二部分:安装编辑器:pycharm安装
(一)准备工作
1、官网下载一个安装包
这个工具也可以在文末获取,同时相关的学习资料一起免费分享!
(二)开始安装
安装的时候讲几个点。
一直点NEXT就可以了,等待他安装完毕即可。
之后我们打开Pycharm,进去后先创建一个新项目。
这里不要把项目文件往C盘里面存,占内存还影响电脑运行。
之后我们来测试一下是否安装完毕,在编辑框输入print(‘测试’)。
按下shift+F10,运行一下。
没有出现乱码的话,就代表着测试成功!
如果你跟着我以上的步骤完整执行下来,大概率是不会出什么问题,有的话可以评论区留言,看到就会帮你解决。
关于Python技术储备
学好 Python 不论是就业还是做副业赚钱都不错,但要学会 Python 还是要有一个学习规划。最后大家分享一份全套的 Python 学习资料,给那些想学习 Python 的小伙伴们一点帮助!
一、Python所有方向的学习路线
Python所有方向路线就是把Python常用的技术点做整理,形成各个领域的知识点汇总,它的用处就在于,你可以按照上面的知识点去找对应的学习资源,保证自己学得较为全面。
二、学习软件
工欲善其事必先利其器。学习Python常用的开发软件都在这里了,给大家节省了很多时间。
三、入门学习视频
我们在看视频学习的时候,不能光动眼动脑不动手,比较科学的学习方法是在理解之后运用它们,这时候练手项目就很适合了。
四、实战案例
光学理论是没用的,要学会跟着一起敲,要动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。
五、面试资料
我们学习Python必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有阿里大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。
这份完整版的Python全套学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费
】
以上是关于可能是全网最详细的 Spark Sql Aggregate 源码剖析的主要内容,如果未能解决你的问题,请参考以下文章
Python进阶可能是全网最详细的defaultdict讲解
Python进阶可能是全网最详细的defaultdict讲解
Python进阶可能是全网最详细的defaultdict讲解