spark任务的提交流程(yarn)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark任务的提交流程(yarn)相关的知识,希望对你有一定的参考价值。

参考技术A spark一般都是部署到yarn上使用的,所以就说y问的最多的就是arn的提交流程,两种模式最大的区别就是driver端的执行位置.

Yarn Client 模式

第一步,Driver端在任务提交的本地机上运行

第二步,Driver启动之后就会和ResourceManager通讯,申请启动一个ApplicationMaster

第三步,ResourceManager就会分配container容器,在合适的nodemanager上启动ApplicationMaster,负责向ResourceManager申请Executor内存

第四步,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程

第五步,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数

第六步,之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

Yarn Cluster 模式

第一步,在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster

第二步, 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。

第三步, Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程

第四步,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,

第五步,之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

如何通过Java程序提交yarn的MapReduce计算任务

  1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。

  2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<textpair,value>,TextPair的格式为<key1,key2>。

  3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用
参考技术A 1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的不同,这里变为了,TextPair的格式为。3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用 参考技术B

 下面为MapReduce主程序,有几点须要提一下:

1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不正确文件进行切分。


2、为了控制reduce的处理过程。map的输出键的格式为组合键格式。

与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。

3、为了适应组合键,又一次设定了分组函数。即GroupComparator。分组规则为,仅仅要TextPair中的key1同样(不要求key2同样),则数据被分配到一个reduce容器中。这样,当同样key1的数据进入reduce容器后,key2起到了一个数据标识的作用。


package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import util.Utils;

public class GEMIMain

public GEMIMain()
job = null;


public Job job;
public static class NamePartitioner extends
Partitioner<TextPair, BytesWritable>
@Override
public int getPartition(TextPair key, BytesWritable value,
int numPartitions)
return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;



/**
* 分组设置类。仅仅要两个TextPair的第一个key同样。他们就属于同一组。


他们的Value就放到一个Value迭代器中,
* 然后进入Reducer的reduce方法中。
*
* @author hduser
*
*/
public static class GroupComparator extends WritableComparator
public GroupComparator()
super(TextPair.class, true);


@Override
public int compare(WritableComparable a, WritableComparable b)
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
// 比较同样则返回0,比较不同则返回-1
return t1.getFirst().compareTo(t2.getFirst()); // 仅仅要是第一个字段同样的就分成为同一组




public  boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException

Configuration conf = new Configuration();
// 在conf中设置outputath变量,以在reduce函数中能够获取到该参数的值
conf.set("outputPath", args[args.length - 1].toString());
//设置HDFS中,每次任务生成产品的质量文件所在目录。args数组的倒数第二个原数为质量文件所在目录
conf.set("qualityFolder", args[args.length - 2].toString());
//假设在Server中执行。则须要获取web项目的根路径;假设以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件
//MapReduceProgress mprogress = new MapReduceProgress();
//String rootPath= mprogress.rootPath;
String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
conf.addResource(new Path(rootPath+"yarn-site.xml"));
conf.addResource(new Path(rootPath+"core-site.xml"));
conf.addResource(new Path(rootPath+"hdfs-site.xml"));
conf.addResource(new Path(rootPath+"mapred-site.xml"));
this.job = new Job(conf);

job.setJobName("Job name:" + args[0]);
job.setJarByClass(GEMIMain.class);

job.setMapperClass(GEMIMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(BytesWritable.class);
// 设置partition
job.setPartitionerClass(NamePartitioner.class);
// 在分区之后依照指定的条件分组
job.setGroupingComparatorClass(GroupComparator.class);

job.setReducerClass(GEMIReducer.class);

job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(8);


// 设置计算输入数据的路径
for (int i = 1; i < args.length - 2; i++)
FileInputFormat.addInputPath(job, new Path(args[i]));

// args数组的最后一个元素为输出路径
FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
boolean flag = job.waitForCompletion(true);
return flag;


@SuppressWarnings("static-access")
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException

String[] inputPaths = new String[] "normalizeJob",
"hdfs://192.168.168.101:9000/user/hduser/red1/",
"hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
"hdfs://192.168.168.101:9000/user/hduser/test" ;
GEMIMain test = new GEMIMain();
boolean result = test.runJob(inputPaths);      


下面为TextPair类


public class TextPair implements WritableComparable<TextPair>
private Text first;
private Text second;

public TextPair()
set(new Text(), new Text());


public TextPair(String first, String second)
set(new Text(first), new Text(second));


public TextPair(Text first, Text second)
set(first, second);


public void set(Text first, Text second)
this.first = first;
this.second = second;


public Text getFirst()
return first;


public Text getSecond()
return second;


@Override
public void write(DataOutput out) throws IOException
first.write(out);
second.write(out);


@Override
public void readFields(DataInput in) throws IOException
first.readFields(in);
second.readFields(in);


@Override
public int hashCode()
return first.hashCode() * 163 + second.hashCode();


@Override
public boolean equals(Object o)
if (o instanceof TextPair)
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);

return false;


@Override
public String toString()
return first + "\\t" + second;


@Override
/**A.compareTo(B)
* 假设比较同样,则比较结果为0
* 假设A大于B,则比较结果为1
* 假设A小于B。则比较结果为-1
*
*/
public int compareTo(TextPair tp)
int cmp = first.compareTo(tp.first);
if (cmp != 0)
return cmp;

//此时实现的是升序排列
return second.compareTo(tp.second);



下面为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分



package web.hadoop;

import java.io.IOException;  

import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>

@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException
// TODO Auto-generated method stub
return new WholeFileRecordReader();


@Override
protected boolean isSplitable(JobContext context, Path filename)
// TODO Auto-generated method stub
return false;
 
 

下面为WholeFileRecordReader类



package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable>

private FileSplit fileSplit;
private FSDataInputStream fis;

private Text key = null;
private BytesWritable value = null;

private boolean processed = false;

@Override
public void close() throws IOException
// TODO Auto-generated method stub
// fis.close();


@Override
public Text getCurrentKey() throws IOException, InterruptedException
// TODO Auto-generated method stub
return this.key;


@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException
// TODO Auto-generated method stub
return this.value;


@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
throws IOException, InterruptedException

fileSplit = (FileSplit) inputSplit;
Configuration job = tacontext.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
fis = fs.open(file);


@Override
public boolean nextKeyValue()

if (key == null)
key = new Text();


if (value == null)
value = new BytesWritable();


if (!processed)
byte[] content = new byte[(int) fileSplit.getLength()];

Path file = fileSplit.getPath();

System.out.println(file.getName());
key.set(file.getName());

try
IOUtils.readFully(fis, content, 0, content.length);
// value.set(content, 0, content.length);
value.set(new BytesWritable(content));
catch (IOException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
IOUtils.closeStream(fis);


processed = true;
return true;


return false;


@Override
public float getProgress() throws IOException, InterruptedException
// TODO Auto-generated method stub
return processed ? fileSplit.getLength() : 0;



以上是关于spark任务的提交流程(yarn)的主要内容,如果未能解决你的问题,请参考以下文章

Spark on yarn遇到的问题

如何通过Java程序提交yarn的MapReduce计算任务

如何通过Java程序提交yarn的MapReduce计算任务

Yarn基础组件及提交流程

Yarn任务提交流程(源码分析)

怎么查看spark正在运行的任务