Hadoop之JobControl管理多个Job

Posted 健哥说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop之JobControl管理多个Job相关的知识,希望对你有一定的参考价值。

1、JobControl简介

JobControl用于将多个Job串联起提交。这些Job之间通过ControlledJob实现之间互相依赖的关系。


 

MapReduce1版本中,使用的JobControl为:

org.apache.hadoop.mapred.jobcontrol.Job

org.apache.hadoop.mapred.jobcontrol.JobControl

 

Hadoop之JobControl管理多个Job


MapReduce2JobControlControlledJob为:

org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl

org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob

见下图:

 


 

 

2、JobControl实现一个简单的应用

JobControl包含两个Job。每一个Job进行简单的WordCount。包含一个mapper和一个Reducer。第二个Job在第一个Job执行完成以后,再将单词按出现的次数进行排序,所以只包含一个Mapper,且使用IntWritable做为key输出,则自动实现排序。

重点的实现的内容如下:

1、先声明一个Job并设置这个Job的信息。如:Job job1 = Job.getInstane(config,Job1);

2、再声明一个Job并设置这个Job的信息。如:Job job2 = Job.getInstance(config,job2);

3、声明两个ControlledJob将上面的两个Job。如:

ControlledJob cjob1 = new ControlledJob(job1);

ControlledJob cjob2 = new ControlledJob(job2);

4、现在就可以可设置这两个Job之间的关系了,如:

     cjob2.addDepedingJob(cjob1);

5、将上面声明的两个ControlledJob添加到JobControl中去,如:

JobControl jobControl = new JobControl(SomeName);

jobControl.addJob(cjob1);

jobControl.addJob(cjob2);

6、由于JobControl实现了Runnabler接口,所以,以线程的方式启动这个JobControl即可。即:

Thread thread = new Thread(jobControl);

thread.start();

 

源代码:

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * 使用JobControl实现一组Mapreduce的控制
 * @author wangjian
 * @version 1.0 2018/5/13 0013
 */
public class Demo06_JobControl extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("请输入至少两个参数:<input> <output>");
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
        }
        //1:定义Config
        Configuration config = new Configuration();
        //2:定义两个Job
        Job job1 = Job.getInstance(config, "Job1");
        Job job2 = Job.getInstance(config, "Job2");
        //3:定义第一个Job的具体功能
        job1.setJarByClass(Demo06_JobControl.class);
        job1.setMapperClass(Mapper1.class);//设置mapper
        job1.setReducerClass(Reducer1.class);//设置Reducer
        job1.setOutputKeyClass(Text.class);//设置最终输出类型key
        job1.setOutputValueClass(IntWritable.class);//设置最终输出类型value
        job1.setInputFormatClass(TextInputFormat.class);//设置输入输出读取的文件类型
        job1.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job1, new Path(args[0]));//设置输入输出的目录
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));

        //4:声明第一个受控的Job
        ControlledJob controlledJob1 = new ControlledJob(config);
        controlledJob1.setJob(job1);

        //7:设置第二个Job
        job2.setJarByClass(Demo06_JobControl.class);
        job2.setMapperClass(Mapper2.class);
        job2.setOutputKeyClass(IntWritable.class);
        job2.setOutputValueClass(Text.class);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job2,new Path(args[1]));//上一个job的输出,就是这个Job的输入
        FileOutputFormat.setOutputPath(job2,new Path(args[1]+"_2"));//再设置一个输出

        //5:声明JobControl
        JobControl jobControl = new JobControl("My JobControl");
        jobControl.addJob(controlledJob1);

        //8:声明第二个JobControlledJob
        ControlledJob controlledJob2 = new ControlledJob(config);
        controlledJob2.setJob(job2);

        //9:将第二个ControlledJob添加到JobControlled
        jobControl.addJob(controlledJob2);

        //10:创建各个ControlledJob之间的关系,jab2依赖于job1
        controlledJob2.addDependingJob(controlledJob1);
        //6:声明线程,因为JobControll继承了Runnable
        Thread thread = new Thread(jobControl);
        thread.start();
        int code = -1;
        while (true) {
            if (jobControl.allFinished()) {
                System.out.println("都运行完成了:" + jobControl.getSuccessfulJobList());
                jobControl.stop();
                code = 0;
                break;
            }
            if (jobControl.getFailedJobList().size() > 0) {//如果有错误的话
                System.out.println("有错误的信息:" + jobControl.getFailedJobList());
                code = 1;
                break;
            }
        }
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo06_JobControl(), args);
        System.exit(code);
    }
    //定义第一个Mapper,用于读取txt文件中的内容
    public static class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
        private Text key = new Text();
        private IntWritable val = new IntWritable();

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\s+");
            for (String str : strs) {
                this.key.set(str);
                val.set(1);
                context.write(this.key, val);
            }
        }
    }
    //定义第一个Reducer
    public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable sum = new IntWritable(0);
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable w : values) {
                sum += w.get();
            }
            this.sum.set(sum);
            context.write(key,this.sum);
        }
    }
    //定义第二个Job使用的Mapper,将按出现的次数进行排序
    public static class Mapper2 extends Mapper<LongWritable,Text,IntWritable,Text>{
        private IntWritable key = new IntWritable(0);
        private Text val = new Text();
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] str = value.toString().split("\s+");
            this.key.set(Integer.parseInt(str[1]));
            this.val.set(str[0]);
            context.write(this.key,this.val);
        }
    }
}

 

本地测试执行的结果:

都运行完成了:[job name:Job1

job id:My JobControl0

job state:SUCCESS

job mapred id:job_local1896504438_0001

job message:just initialized

job has no depending job:

, job name:Job2

job id:My JobControl1

job state:SUCCESS

job mapred id:job_local2112760435_0002

job message:just initialized

job has 1 dependeng jobs:

 depending job 0:Job1

]

在本地测试,输出的目录:

hdfs上测试:

$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo06_JobControl /test/ /out001

输出的目录:

[wangjian@hadoop31 ~]$ hdfs dfs -ls /

Found 4 items

drwxr-xr-x   - wangjian supergroup          0 2018-05-13 19:24 /out001

drwxr-xr-x   - wangjian supergroup          0 2018-05-13 19:25 /out001_2

drwxr-xr-x   - wangjian supergroup          0 2018-05-13 19:15 /test

 


以上是关于Hadoop之JobControl管理多个Job的主要内容,如果未能解决你的问题,请参考以下文章

hadoop学习;datajoin;chain签名;combine()

hadoop自带性能测试

Linux之程序管理

Hadoop学习之路(十五)MapReduce的多Job串联和全局计数器

搭建部署Hadoop 之 HDFS

七,JOBC数据库编程