sona:Spark on Angel任务启动流程分析
Posted zhongrui_fzr
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sona:Spark on Angel任务启动流程分析相关的知识,希望对你有一定的参考价值。
本文主要分析腾讯的分布式机器学习平台angel3.0版本在spark上的启动流程,本文讲解的比较详细甚至啰嗦,所以建议新手同学认真阅读,不足之处还请大佬指教 ~~
本文也有一些暂时还未理解十分透彻的地方,欢迎指正~~
官方文档参考:https://www.bookstack.cn/read/angel-v3.0/README_cn.md
以sona自带的JsonRunnerExamples为例分析sona的启动流程
下面这几行代码是用于启动spark并启动angel的
val spark = SparkSession.builder()
.appName("AngelClassification")
.getOrCreate()
val sparkConf = spark.sparkContext.getConf
val driverCtx = DriverContext.get(sparkConf)
driverCtx.startAngelAndPSAgent()
这段脚本是用于提交sona任务的,
source ./spark-on-angel-env.sh
$SPARK_HOME/bin/spark-submit \\
--master yarn-cluster \\
--conf spark.ps.jars=$SONA_ANGEL_JARS \\
--conf spark.ps.instances=10 \\
--conf spark.ps.cores=2 \\
--conf spark.ps.memory=6g \\
--conf spark.hadoop.fs.defaultFS=hdfs://ns3-backup \\
--conf spark.hadoop.angel.staging.dir=hdfs://ns3-backup/user \\
--conf spark.hadoop.angel.tmp.output.path.prefix=/user/XXX_bigdata_push/angel_stage \\
--conf spark.angel.save.model.path=/user/XXX_bigdata_push/zhongruix/angel_model \\
--conf spark.hadoop.angel.tmp.output.path=/user/XXX_bigdata_push/angel_stage \\
--conf spark.hadoop.sona.tmp=/user/XXX_bigdata_push/zhongruix/tmp2 \\
--conf spark.hadoop.angel.save.model.path=/user/XXX_bigdata_push/zhongruix/angel_model/ \\
--conf spark.sql.warehouse.dir=/user/XXX_bigdata_push/zhongrui3_spark_sql_warehouse/ \\
--jars $SONA_SPARK_JARS\\
--name "LR-spark-on-angel" \\
--files ./../jsons/logreg.json \\
--driver-memory 10g \\
--num-executors 10 \\
--executor-cores 2 \\
--executor-memory 4g \\
--class com.tencent.angel.sona.examples.JsonRunnerExamples \\
./../lib/angelml-$SONA_VERSION.jar \\
lr:0.01 \\
data:/user/XXX_bigdata_push/zhongruix/angel3_home/sona-test/data/mllib/sample_libsvm_data.txt \\
modelPath:hdfs://ns3-backup/user/XXX_bigdata_push/zhongruix/angel_model/ \\
jsonFile:./logreg.json
接下来会先分析这段脚本的含义然后分析这几行scala代码后面的所有启动流程
提交脚本
配置参数说明
1、脚本中的第一句’source ./spark-on-angel-env.sh’用于初始化sona的一些环境变量,包括Hadoop,spark,sona的本机安装目录以及sona的hdfs目录(用于上传sona的依赖包)以及sona,angle所需的所有jar包列表
2、第二句是提交angel任务,因为angel任务本质上也是一个spark application,所以任务提交方式跟spark一致。spark-submit后面的参数配置需要格外注意,在这些参数设置上真的是踩了好多的坑。
- '–master yarn-cluster’指定集群模式,这个是常规设置,比较简单
- ‘–conf spark.ps.jars=$SONA_ANGEL_JARS’ 这个是配置parameter server需要的jar包列表,其中SONA_ANGEL_JARS变量就是上面spark-on-angel-env文件里配置好的,注意3.0版本的angel里这个变量和之前版本里是不一样的,但是因为提交前都有一句source,所以不同版本也不会有影响
- ‘–conf spark.ps.instances=10’, ‘–conf spark.ps.cores=2’, ‘–conf spark.ps.memory=6g’ 配置parameter server的实例数,核数,内存
- '–conf spark.hadoop.fs.defaultFS=hdfs://ns3-backup’配置spark任务中使用的hdfs文件系统
- ‘–conf spark.hadoop.angel.staging.dir=hdfs://ns3-backup/user’ 配置angel的staging目录,关于angel的staging目录的具体功能可以参考另外一篇讲解(暂时还没写,稍后写~~ ),注意这里最后一级目录/user不能替换成/user_ext,由于我们集群上这两个目录都可以访问并且访问的同一个路径,但在这里配置的时候只能配置/user,原因尚未深究
- '–conf spark.hadoop.angel.tmp.output.path.prefix=/user/XXX_bigdata_push/angel_stage '配置angel的临时目录中输出目录的前缀路径
- ‘–conf spark.angel.save.model.path=/user/XXX_bigdata_push/zhongruix/angel_model’ 配置angel的模型保存路径,训练阶段使用,angel的模型保存路径最好是个全新且不存在的路径,默认会先清空路径,再创建一个全新的路径,所以如果有重要文件,请先备份!请先备份!请先备份!重要的事情说三遍。这个问题经过踩坑并多次验证后证实确实是angel默认情况会删除文件。
- ‘–conf spark.sql.warehouse.dir=/user/XXX_bigdata_push/zhongrui3_spark_sql_warehouse/’ spark sql的执行保存路径,加这个配置是因为默认的路径下执行程序的用户没有权限写
- ‘–jars $SONA_SPARK_JARS’ spark任务依赖的jar包,SONA_SPARK_JARS在spark-on-angel-env.sh中已经配置好的,这个变量也是3.0和之前版本取值不一样,所以submit之前都需要有脚本中的第一句
- ‘–name “LR-spark-on-angel”’ 任务名称
- ‘–files ./…/jsons/logreg.json’ 执行spark应用所需的文件,这个路径可以是机器本地路径,执行程序前会上传到前面配置的hdfs上的/tmp目录中,这里上传了angel模型需要的json文件,文件中定义了模型的结构信息,下面程序执行参数会用到
- ‘–driver-memory 10g’, ‘–num-executors 10’, ‘–executor-cores 2’, ‘–executor-memory 4g’ 配置spark任务所需的机器资源
- ‘–class com.tencent.angel.sona.examples.JsonRunnerExamples’, ’ ./…/lib/angelml-$SONA_VERSION.jar 'spark任务主类入口以及执行程序jar包,类路径需要是全路径
- ‘lr:0.01’ 学习率,执行程序参数,可以在代码中读取的参数
- ‘data:/user/XXX_bigdata_push/zhongruix/angel3_home/sona-test/data/mllib/sample_libsvm_data.txt’ 模型训练所需的数据文件,hdfs路径
- 'modelPath:hdfs://ns3-backup/user/XXX_bigdata_push/zhongruix/angel_model/ ’ 模型保存路径,这里可以设置模型保存路径,上面的conf参数中也可以设置,但要注意代码里的用法是不一样的,看个人需要
- ‘jsonFile:./logreg.json’ 指定json文件,用于从中读取angel模型的结构信息,这里angel程序执行时的目录是前面的临时目录,所以直接使用当前目录获取文件即可。
注意:要先确认清楚需要的配置应该以angel,ml,spark.ps,spark.hadoop还是spark.angel开头
任务提交
spark应用启动
- 读取配置信息,创建目录,上传文件,设置配置值等
- spark应用启动,分配资源,driver调度等,此处略过,参考spark启动流程
Angel启动
- driverContext初始化前会注册几个spark矩阵向量相关的用户自定义类型UDT
- 读取SparkConf对象中的spark配置信息,这里是我觉得angel比较坑的地方,angel在读取这些配置信息的时候设置了一些默认值,而在后面每个设置的使用场景中也有一些默认值,两个地方的默认值不全一致,导致在看源码的时候容易混淆,可以认为ConfUtils.scala中的默认值才是真正的默认值
# 以angel保存模型的源码为例,这里如果路径存在,会根据deleteOnExist判断是否删除现有路径,而
if (outFs.exists(outputPath))
if (deleteOnExist)
outFs.delete(outputPath, true);
if (outFs.exists(outputPath))
throw new IOException(
"output path " + outputPath + " already exist, remove it failed!!!");
# 这里deleteOnExist的默认值,AngelConf.DEFAULT_ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST=False,可以参考angel的AngelConf.java中的设置
boolean deleteOnExist = conf.getBoolean(AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST, AngelConf.DEFAULT_ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST);
## 综合上面的代码看,这里默认是不会将目录删除的
#但在sona的ConfUtils.scala类中,已经在读取spark配置的过程中将这个值默认设置为了TRUE,导致后面模型保存过程中的默认值根本无用,而且会误导看源码的同学
val psOutOverwrite = conf.getBoolean("spark.ps.out.overwrite", defaultValue = true)
hadoopConf.setBoolean(ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST, psOutOverwrite)
配置读取的时候会把"spark.ps",“spark.hadoop”,“angel”, "ml"开头的配置名都读入
具体的各项默认配置可参考当前使用版本sona项目的ConfUtil.scala文件中的定义,新手容易有各种参数的功能和使用上的困惑也可以参考这个文件。
-
获取配置中的共享配置,主要体现在以"ml."和"angel."开头的配置,或者有额外指定的json配置文件
-
根据上述配置生成DriverContext对象,在driver端进行全局的初始化和启动工作
-
启动angel和PS
- 基于上述配置创建AngelPSClient
- 然后是创建并启动AngelClient,根据angel的部署模式配置值选择实现AngelLocalClient,或是AngelKubernetesClient或是AngelYarnClient,默认配置是YARN模式
- 设置参数服务器的实例数,angel的运行模式(默认angel_ps, angel_ps_worker, angel_local)
- AngelClient中增加参数矩阵MatrixContext到nameToMatrixMap中,该矩阵名称是"init",行数为1,列数为参数服务器的实例数,块内最大行数为1,块内最大列数也为1,关于这个参数矩阵的功能,之后再细究。MatrixContext就是angel中模型参数矩阵的类型
- 启动PS服务器,本例中是执行AngelYarnClient的startPSServer()函数
- 设置用户,设置本地地址,设置staging目录(默认是提交脚本中配置的staging目录或者默认"/tmp/hadoop-yarn"、用户名、".staging"拼接而成的路径)
- 创建YarnClient对象,并根据配置信息配置yarnclient,然后启动一个yarn application并记录appId
- 在前面的staging目录基础上拼接这个appId得到任务目录,赋值给"angel.job.dir"配置
- 设置输入目录,使用dummy格式的输入数据或者使用angel_ps模式运行时这步跳过
- 设置输出目录,根据actionType设置配置中关键字"angel.output.path"的值,对于已经存在的路径会根据"angel.output.path.deleteonexist"配置决定要不要删除(覆盖写),这里需要重点强调默认配置值是true,并不是最里层模块代码里的false,所以此处目录最好指定一个全新不存在的目录。根据前面生成的path目录和配置的tmp路径前缀以及appId设置配置中"angel.tmp.output.path"的值
- predict:配置中关键字"angel.predict.out.path"对应的路径
- serving:配置中关键字"angel.serving.temp.path"对应的路径
- train或inctrain:配置中关键字"angel.save.model.path"对应的路径
- 添加用户认证信息,Hadoop的namenode token信息
- 验证conf配置中各项参数值是否合法,例如,cpu核数是否大于0,内存数值是否大于0等
- 处理废弃参数,angel.am.memory.mb废弃,angel.am.memory.gb替代,angel.worker.memory.mb废弃,angel.worker.memory.gb替代,angel.ps.memory.mb废弃,angel.ps.memory.gb替代
- 将依赖的资源文件如jar包等上传到上面的"angel.job.dir"路径下
- 读取文件式的配置信息并合并到conf中
- 基于conf中各项配置创建一个应用提交环境(ApplicationSubmissionContext)
- 将这个上下文提交到yarn,等待各ps实例的回复,回复成功则表示ps server启动成功
- 创建矩阵:初始化nameToMatrixMap中的参数矩阵的元数据
- 实例化分区类为RangePartitioner,分区类的初始化需要根据配置值设置属性
- 默认分区大小:根据"angel.model.partitioner.partition.size"设置,默认是50万
- 最大分区数:根据"angel.mdoel.partitioner.max.partition.number"设置,默认是10000
- ps实例数:根据"angel.ps.number"设置,默认1
- 每个实例上的分区数:根据"angel.model.partitioner.partition.number.perserver"设置,默认-1
- 实际的最大分区数取上面设置的最大分区数和ps实例数与每个实例上的分区数之积的较小值
- 从配置"angel.load.model.path"中的模型路径中加载参数矩阵的元数据,其中元数据格式可参考类MatrixFileMeta
- 从加载到的参数矩阵元数据中计算模型其他属性,如列数,行数,起始索引,结束索引等
- 校验行数列数起止索引等值与rowType是否兼容
- 将nameToMatrixMap中的每个矩阵元数据信息序列化为protobuf格式并重建到MasterProtocol中,这个过程比较冗长复杂,一时看的不是很懂,之后再详细梳理
- 等待重建完成
- 实例化分区类为RangePartitioner,分区类的初始化需要根据配置值设置属性
- 启动MasterService,执行App即worker内容,对于Kubernetes下angel_ps_worker模式需要额外配置executor的角色为worker
- 根据conf配置和master地址新建AngelContext并返回
-创建并初始化PS代理 - 根据master的IP,端口,分区id,任务环境创建PSAgent对象
- 启动PS代理
- 建立LocationManager用于管理代理环境
- 建立masterClient用于连接masterService,masterService中在前面的参数矩阵序列化过程中出现过,保存了所有序列化的矩阵元数据
- masterclient向masterService请求矩阵元数据, 并将元数据加入当前对象的矩阵元数据管理器matrixMetaManager中
- 保存其他ParameterServer的连接地址并按索引排序
- 建立矩阵传输连接,用户请求适配器,矩阵缓存等进程
- 如果是angel_ps_worker模式,还需要建立矩阵操作opLog缓存和时钟缓存等进程,并且建立矩阵存储管理器和一致性controller
-
返回AngelClient,angel和ps启动工作完成
-
进入正常的数据读取预处理以及模型训练保存等过程
以上是关于sona:Spark on Angel任务启动流程分析的主要内容,如果未能解决你的问题,请参考以下文章
sona:Spark on Angel大规模分布式机器学习平台介绍