Hadoop之JobControl管理多个Job
Posted 健哥说编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop之JobControl管理多个Job相关的知识,希望对你有一定的参考价值。
1、JobControl简介
JobControl用于将多个Job串联起提交。这些Job之间通过ControlledJob实现之间互相依赖的关系。
在MapReduce1版本中,使用的JobControl为:
org.apache.hadoop.mapred.jobcontrol.Job
org.apache.hadoop.mapred.jobcontrol.JobControl
在MapReduce2中JobControl和ControlledJob为:
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl
org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob
见下图:
2、JobControl实现一个简单的应用
此JobControl包含两个Job。每一个Job进行简单的WordCount。包含一个mapper和一个Reducer。第二个Job在第一个Job执行完成以后,再将单词按出现的次数进行排序,所以只包含一个Mapper,且使用IntWritable做为key输出,则自动实现排序。
重点的实现的内容如下:
1、先声明一个Job并设置这个Job的信息。如:Job job1 = Job.getInstane(config,”Job1”);
2、再声明一个Job并设置这个Job的信息。如:Job job2 = Job.getInstance(config,”job2”);
3、声明两个ControlledJob将上面的两个Job。如:
ControlledJob cjob1 = new ControlledJob(job1);
ControlledJob cjob2 = new ControlledJob(job2);
4、现在就可以可设置这两个Job之间的关系了,如:
cjob2.addDepedingJob(cjob1);
5、将上面声明的两个ControlledJob添加到JobControl中去,如:
JobControl jobControl = new JobControl(“SomeName”);
jobControl.addJob(cjob1);
jobControl.addJob(cjob2);
6、由于JobControl实现了Runnabler接口,所以,以线程的方式启动这个JobControl即可。即:
Thread thread = new Thread(jobControl);
thread.start();
源代码:
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 使用JobControl实现一组Mapreduce的控制
* @author wangjian
* @version 1.0 2018/5/13 0013
*/
public class Demo06_JobControl extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("请输入至少两个参数:<input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
//1:定义Config
Configuration config = new Configuration();
//2:定义两个Job
Job job1 = Job.getInstance(config, "Job1");
Job job2 = Job.getInstance(config, "Job2");
//3:定义第一个Job的具体功能
job1.setJarByClass(Demo06_JobControl.class);
job1.setMapperClass(Mapper1.class);//设置mapper
job1.setReducerClass(Reducer1.class);//设置Reducer
job1.setOutputKeyClass(Text.class);//设置最终输出类型key
job1.setOutputValueClass(IntWritable.class);//设置最终输出类型value
job1.setInputFormatClass(TextInputFormat.class);//设置输入输出读取的文件类型
job1.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));//设置输入输出的目录
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
//4:声明第一个受控的Job
ControlledJob controlledJob1 = new ControlledJob(config);
controlledJob1.setJob(job1);
//7:设置第二个Job
job2.setJarByClass(Demo06_JobControl.class);
job2.setMapperClass(Mapper2.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job2,new Path(args[1]));//上一个job的输出,就是这个Job的输入
FileOutputFormat.setOutputPath(job2,new Path(args[1]+"_2"));//再设置一个输出
//5:声明JobControl
JobControl jobControl = new JobControl("My JobControl");
jobControl.addJob(controlledJob1);
//8:声明第二个Job的ControlledJob
ControlledJob controlledJob2 = new ControlledJob(config);
controlledJob2.setJob(job2);
//9:将第二个ControlledJob添加到JobControlled中
jobControl.addJob(controlledJob2);
//10:创建各个ControlledJob之间的关系,即jab2依赖于job1
controlledJob2.addDependingJob(controlledJob1);
//6:声明线程,因为JobControll继承了Runnable类
Thread thread = new Thread(jobControl);
thread.start();
int code = -1;
while (true) {
if (jobControl.allFinished()) {
System.out.println("都运行完成了:" + jobControl.getSuccessfulJobList());
jobControl.stop();
code = 0;
break;
}
if (jobControl.getFailedJobList().size() > 0) {//如果有错误的话
System.out.println("有错误的信息:" + jobControl.getFailedJobList());
code = 1;
break;
}
}
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo06_JobControl(), args);
System.exit(code);
}
//定义第一个Mapper,用于读取txt文件中的内容
public static class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text key = new Text();
private IntWritable val = new IntWritable();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\s+");
for (String str : strs) {
this.key.set(str);
val.set(1);
context.write(this.key, val);
}
}
}
//定义第一个Reducer
public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable sum = new IntWritable(0);
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable w : values) {
sum += w.get();
}
this.sum.set(sum);
context.write(key,this.sum);
}
}
//定义第二个Job使用的Mapper,将按出现的次数进行排序
public static class Mapper2 extends Mapper<LongWritable,Text,IntWritable,Text>{
private IntWritable key = new IntWritable(0);
private Text val = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] str = value.toString().split("\s+");
this.key.set(Integer.parseInt(str[1]));
this.val.set(str[0]);
context.write(this.key,this.val);
}
}
}
本地测试执行的结果:
都运行完成了:[job name:Job1
job id:My JobControl0
job state:SUCCESS
job mapred id:job_local1896504438_0001
job message:just initialized
job has no depending job:
, job name:Job2
job id:My JobControl1
job state:SUCCESS
job mapred id:job_local2112760435_0002
job message:just initialized
job has 1 dependeng jobs:
depending job 0:Job1
]
在本地测试,输出的目录:
在hdfs上测试:
$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo06_JobControl /test/ /out001
输出的目录:
[wangjian@hadoop31 ~]$ hdfs dfs -ls /
Found 4 items
drwxr-xr-x - wangjian supergroup 0 2018-05-13 19:24 /out001
drwxr-xr-x - wangjian supergroup 0 2018-05-13 19:25 /out001_2
drwxr-xr-x - wangjian supergroup 0 2018-05-13 19:15 /test
以上是关于Hadoop之JobControl管理多个Job的主要内容,如果未能解决你的问题,请参考以下文章
hadoop学习;datajoin;chain签名;combine()