Job提交流程
Posted lyr999736
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Job提交流程相关的知识,希望对你有一定的参考价值。
1.【Driver.class】-- Job job = Job.getInstance(conf);
-->【job.class】getInstance(conf)
--> new JobConf(conf) //构建一个空集群配置对象
说明:将默认configuration(4个配置文件)包装成Jobconf
2.设置相关参数项:
job.setJarByClass(AirMapper.class); --> 【MRJobConfig】JobContext.JAR:mapreduce.job.jar
job.setJobName("Text local"); --> 【MRJobConfig】JobContext.JOB_NAME:mapreduce.job.name
job.setMapperClass(AirMapper.class); --> 【MRJobConfig】MAP_CLASS_ATTR:mapreduce.job.map.class
job.setReducerClass(AirReducer.class); --> 【MRJobConfig】REDUCE_CLASS_ATTR:mapreduce.job.reduce.class
job.setOutputKeyClass(Text.class); --> 【MRJobConfig】JobContext.OUTPUT_KEY_CLASS:mapreduce.job.output.key.class
job.setOutputValueClass(IntWritable.class); --> 【MRJobConfig】JobContext.OUTPUT_VALUE_CLASS:mapreduce.job.output.value.class
job.setCombinerClass(AirCombiner.class); --> 【MRJobConfig】COMBINE_CLASS_ATTR:mapreduce.job.combine.class
FileInputFormat.addInputPath(job, new Path("file:///D:/airdata"));
-->【FileInputFormat.class】 INPUT_DIR:"mapreduce.input.fileinputformat.inputdir";
FileOutputFormat.setOutputPath(job,outfile);
-->【FileOutputFormat.class】FileOutputFormat.OUTDIR :mapreduce.output.fileoutputformat.outputdir
3.job.waitForCompletion(true)
-->【job.class】waitForCompletion() //作用:提交job至cluster,并等待完成
-->判定当前State是否为JobState.DEFINE()定义阶段,如果定义阶段调用submit()
-->【job.class】submit()
-->【job.class】connect() //说明:通过UGI(用户组信息对象)构建集群对象(cluster)
-->【job.class】new Cluster(getConfiguration());
-->【 Cluster.class】通过静态代码块,加载【mapred-site.xml和yarn-site.xml】和默认的default文件
通过conf的mapreduce.framework.name的值来返回构建cluster集群对象的客户端协议:local=LocalJobRunner;yarn=YARNRunner
-->【job.class】JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
说明:构建job提交器,然后执行submitter.submitJobInternal()
-->【JobSubmitter.class】submitter.submitJobInternal()
作用:
1.检查作业的输入和输出规范
2.计算作业的InputSplit(逻辑分片)。
3.如果需要,设置必要的缓存信息;
4.将作业的jar和配置复制到map-reduce系统分布文件系统上的目录。
5.提交job至ResourceManager,并监控其状态
说明:检查;生成JobID,设置job相关参数:
a.MRJobConfig.JOB_SUBMITHOST :mapreduce.job.submithostname
b.MRJobConfig.JOB_SUBMITHOSTADDR :mapreduce.job.submithostaddress
c.MRJobConfig.USER_NAME :mapreduce.job.user.name
d.MRJobConfig.MAPREDUCE_JOB_DIR : mapreduce.job.dir 【/tmp/hadoop-centos/mapred/staging/centos1104417307/.staging/job_local1104417307_0001】
e.MRJobConfig.NUM_MAPS : mapreduce.job.maps
-->【JobSubmitter.class】copyAndConfigureFiles(job, submitJobDir);
-->【JobResourceUploader.class】 rUploader.uploadFiles(job, jobSubmitDir);
说明:上传jobjar,配置信息等内容;
-->【JobSubmitter.class】writeSplits(job, submitJobDir);
说明:计算split;
-->【JobSubmitter.class】 writeConf(conf, submitJobFile);
说明:上传job.xml文件
-->【JobSubmitter.class】 submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
说明:真正提交Job
-->
-->如果State为JobState.RUNNING运行阶段,轮询job,按5000ms时间间隔
【mapreduce.client.completion.pollinterval=5000】
JOB提交流程:
创建一个job-->resourcemanager获得一个应用-->将运行作业所需要的资源复制到文件系统中-->提交作业-->
调度器分配一个容器-->启动APPMaster-->初始化作业-->接受输入分片-->若作业不适合作为uber任务运行,Appmaster向资源管理器请求容器-->
Appmaster启动容器-->资源本地化-->运行map与reduce
资源调度的相关配置属性:
------------------------------------------------
1.mapreduce.map.memory.mb = 1024 //每个map任务的调度程序请求的内存数量
2.mapreduce.map.cpu.vcores = 1 //每个map任务从调度程序请求的虚拟内核的数量。
3.mapreduce.reduce.memory.mb = 1024 //每个reduce任务的调度程序请求的内存数量
4.mapreduce.reduce.cpu.vcores = 1 //每个reduce任务从调度程序请求的虚拟内核的数量。
5.mapreduce.tasktracker.map.tasks.maximum = 2 //Nodemanager同时运行的map任务的最大数量。
6.mapreduce.tasktracker.reduce.tasks.maximum = 2 //Nodemanager同时运行的reduce任务的最大数量。
7.hadoop中每个守护进程(5个)默认分配1000m内存大小;
修改【hadoop-env.sh】中,export HADOOP_HEAPSIZE=1000(单位MB)
修改ResourceManager堆大小:【yarn-env.sh】export YARN_RESOURCEMANAGER_HEAPSIZE=1000
namenode内存计算原则:参照【P291】
修改namenode:export HADOOP_NAMENODE_OPTS="-Xmx2000m -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_NAMENODE_OPTS"
修改Sencondarynamenode:export HADOOP_SECONDARYNAMENODE_OPTS="-Xmx2000m -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
修改datanode:export HADOOP_DATANODE_OPTS=
8.yarn.nodemanager.resource.memory-mb = 8192m //每个节点可用物理内存,单位MB,为容器
9.yarn.scheduler.minimum-allocation-mb = 1024m //单个任务可申请最少内存,默认1024MB
10.yarn.scheduler.maximum-allocation-mb = 8192m //单个任务可申请最大内存,默认8192MB
11.yarn.scheduler.minimum-allocation-vcores = 4 //单个任务可申请最小cpu核数,默认1
12.yarn.scheduler.maximum-allocation-vcores = 32 //单个任务可申请最大cpu核数,默认32
注意:yarn.scheduler.minimum-allocation-mb <= mapreduce.map.memory.mb <= yarn.scheduler.maximum-allocation-mb,否则抛InvalidResourceRequestException
一个nodemanager节点运行的map数量 <= yarn.nodemanager.resource.memory-mb / mapreduce.map.memory.mb = 8
一个nodemanager节点运行的reduce数量 <= yarn.nodemanager.resource.memory-mb / mapreduce.reduce.memory.mb = 8
以上是关于Job提交流程的主要内容,如果未能解决你的问题,请参考以下文章