hadoop的输入类型InputFormats

Posted 健哥说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop的输入类型InputFormats相关的知识,希望对你有一定的参考价值。

输入

HadoopMapReduce读取何种形式的文件,即为输入。

 


 

1TextInputFormat

TextInputFormat是默认的InputFormat。输入类型是LongWritableTextLongWritale是文件字节的偏移量,Text是这一行的文本内容。

(0,SomeLine)

(23,SomeLone)

 

【具体代码-略】

 

2KeyValueTextInputFormat

1、通常情况下,文件中的每一行是一个键值对象,使用某个分界符号进行分隔,如制表符号(默认TextOutputFormat就是以制表符号做为分隔符号输出数据)。

2、如果要正确处理这类文件,则KeyValueTextInputFormat就比较合适。

3、可以通过mapreduce.input.keyvaluelinerecordreader.key.value.separator来指定分隔符号。它的默认值是一个制表符号。

如存在以下的文件:(使用->表示一个制表符号)

Line1->This is line One

Line2->This is line Two

则读取以后,获取到的数据为:
(Line1,This is line one)

(Line2,This is line Two)

1、KeyValueTextInputFormat返回的类型为<Text,Text>。内部使用的RecordReader为:KeyValueLine

RecordReader类。

代码示例:

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 使用KeyvalueTextInputFormat的示例
 * @author wangjian
 * @version 1.0 2018/5/19 0019
 */
public class Demo11_KeyValueTextInputFormat extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if(args.length!=2){
            System.out.println("用法:input ouput");
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
        }
        Configuration config = getConf();
        FileSystem fs = FileSystem.get(config);
        //删除输出的目录,如果存在的话
        Path path = new Path(args[1]);
        if(fs.exists(path)){
            fs.delete(path,true);
        }
        Job job = Job.getInstance(config,"KeyValueTextInputFormat");
        job.setInputFormatClass(KeyValueTextInputFormat.class);//添加输入类型
        KeyValueTextInputFormat.addInputPath(job,new Path(args[0]));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        int code = job.waitForCompletion(true)?0:1;
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo11_KeyValueTextInputFormat(),args);
        System.exit(code);
    }
}

 如果存在以下文件内容:

Line1Jack And Mary

Line2Rose And Alex

Line3Mary And Mark

由于上面的程序并没有处理,而是使用的默认Mapper,所以,还是原样输出:

Line1Jack And Mary

Line2Rose And Alex

Line3Mary And Mark

 

 

3、二进制输入SequenceFileInputFormat

1hadoopmr不只是可以处理文本文件,也可以处理二进制格式的文件。

2hadoop的顺序文件格式存储的是分键-值对的序列。

3、顺序文件是可分割的,所以它们很符合MR数据的格式。且还支持压缩。

4、如果使用SequenceFileInputFormat,只需要保证map的输入类型匹配就可以了。

 

示例代码1-SequenceFile文件:

以下示例将某个目录下文件,都读取出来保存到一个SequenceFile中。

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 读取茜个目录下的文件,保存到一个序列文件中格式:LongWritable,Text
 * 写一个SequenceFile,读取某个目录下的所有文件保存成SequenceFile
 * @author wangjian
 * @version 1.0 2018/5/20 0020
 */
public class Demo13_WriteSequenceFile extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("Usage : input output");
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
        }
        Configuration config = getConf();
        Path path = new Path(args[1]);
        FileSystem fs = FileSystem.get(config);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        Job job = Job.getInstance(config, "WriteSequenceFile");
        job.setInputFormatClass(TextInputFormat.class);//读取的可是文本文件
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setMapperClass(Mapper.class);
        //指定为SequenceFileOutputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class); //指定保存为顺序文件
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //注意在没有指定MapperReduce时的输出类型
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        int code = job.waitForCompletion(true) ? 0 : 1;
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo13_WriteSequenceFile(), args);
        System.exit(code);
    }
}

 

示例代码2-读取SequenceFile

开发一个MR驱动程序,读取SequenceFile中的内容。注意,SequenceFile中的内容以(key,value)的形式保存,所以,只要设置的MapperK1V1SequenceFile的输入类型即可。

以下程序,将读取SequenceFile中的内容,并进行字符统计:

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * 直接使用SequenceFileInputFormat读取顺序文件中的数据,然后保存到指定的文件中<br>
 * 由于保存的是TextFileOutput所以,将会保存成(LongWritable,Text)的文本类型
 * @author wangjian
 * @version 1.0 2018/5/21 0021
 */
public class Demo14_ReadSequenceFile extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("usage : Input output");
            return -1;
        }
        Configuration config = getConf();
        FileSystem fs = FileSystem.get(config);
        Path path = new Path(args[1]);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        Job job = Job.getInstance(config, "ReadSequenceFile");
        job.setJarByClass(getClass());
        //设置输入类型
        job.setInputFormatClass(SequenceFileInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setMapperClass(ReadSequenceFileMapper.class);
        //设置输出的文件类型为文本文件
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //设置Reducer
        job.setReducerClass(ReadSequenceFileReduce.class);
        //设置输入的key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        int code = job.waitForCompletion(true) ? 0 : 1;
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo14_ReadSequenceFile(), args);
        System.exit(code);
    }
    /**
     * 务必保持此MapperSequenceFile中的key1,value1的值保持相同
     */
    public static class ReadSequenceFileMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        private Text key = new Text();
        private LongWritable val = new LongWritable();
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\\s+");
            for(String str:strs){
                this.key.set(str);
                this.val.set(1);
                context.write(this.key,this.val);
            }
        }
    }
    /**
     * 使用Reduce将字符进行统计
     */
    public static class ReadSequenceFileReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
        private LongWritable val = new LongWritable();
        @Override
        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
            for(LongWritable w:values){
                sum+=w.get();
            }
            this.val.set(sum);
            context.write(key,this.val);
        }
    }
}

 

在集群上测试:

使用类:Demo13_WriteSequenceFilehdfs上生成SequenceFile序列文件。

$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo13_WriteSequenceFile /test /seq001

使用上面的类生成序列文件以后,可以使用-text查看序列文件中的数据,由于内容比较多,可以使用head只显示前几行数据:

[wangjian@hadoop41 jar]$ hdfs dfs -text /seq001/* | head

0总用量 96

0package cn.wangjian.mapreduce;

0package cn.wangjian.mapreduce;

0package cn.wangjian.mapreduce;

0package cn.wangjian.mapreduce;

0package cn.wangjian.mapreduce;

0package cn.wangjian.mapreduce;

13drwxr-xr-x.  4 wangjian hadoop  4096 4月  19 16:39 app

32import org.apache.hadoop.conf.Configuration;

32import org.apache.hadoop.conf.Configured;

 

然后再使用Demo14_ReadSequenceFile处理SequenceFile序列文件中的数据,如上例中进行wordcount统计:

$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo14_ReadSequenceFile /seq001 /out002

然后就可以使用cat查看里面的内容:

[wangjian@hadoop41 jar]$ hdfs dfs -cat /out002/* | head

3575650

"ChainMapperDemo");11002

"ChainMapperWithTool");11002

"Job1");11002

"Job2");11002

"WordCountWithTool");11002

(IntWritable11002

(LongWritable11002

(String33006

(args.length33006

 

 

 

4、多个输入MultipleInputs

 

MultipleInputs可以接收多个不同的输入,且每一个输出都可以接收不同的Mapper。最好次这些Mapper再合并到一个Reduce输出。

如:

MultipleInputs.addInputPath(job,new Path(路径1) ,

TextInputFormat.class, //指定这个目录的输入类型

Some1Mapper.class); // 指定处理这个目录处理的Mapper

    MultipleInputs.addInputPath(job, new Path(路径2),

     SequenceFileInputFormat.class,//设置另一个输入类型

Some2Mapper.class); //指定用于处理这个目录的Mapper

示例代码:

package cn.wangjian.mapreduce;
import cn.wangjian.mapreduce.mapper.SequenceFileWordCountMapper;
import cn.wangjian.mapreduce.mapper.WordCountMapper;
import cn.wangjian.mapreduce.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 多个输入
 * @author wangjian
 * @version 1.0 2018/5/22 0022
 */
public class Demo15_MultipleInputs extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if(args.length!=3){
            System.out.println("usage : input1 input2 output");
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
        }
        Configuration config = getConf();
        FileSystem fs = FileSystem.get(config);
        Path path = new Path(args[2]);
        if(fs.exists(path)){
            fs.delete(path,true);
        }
        Job job = Job.getInstance(config,"MultipleInputs");
        job.setJarByClass(getClass());
        //声明MultipleInputs设置第一个输入
        MultipleInputs.addInputPath(job,//参数Job
                new Path(args[0]),//参数每个目录或是文件
                TextInputFormat.class,//参数第一个输入处理类型
                WordCountMapper.class);//处理第一个输入的Mapper
        //声明第二个输入
        MultipleInputs.addInputPath(job,//
                new Path(args[1]),//
                SequenceFileInputFormat.class,//
                SequenceFileWordCountMapper.class);
        //可选的,设置Reducer
        job.setReducerClass(WordCountReducer.class);
        job.setNumReduceTasks(1);
        //设置输出
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job,new Path(args[2]));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //开始任务
        int code = job.waitForCompletion(true)?0:1;
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo15_MultipleInputs(),args);
        System.exit(0);
    }
}

 

在集群中执行:

$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo15_MultipleInputs /test /seq001 /out002

查看结果:

[wangjian@hadoop41 jar]$ hdfs dfs -cat /out002/* | head

7151300

"ChainMapperDemo");22004

"ChainMapperWithTool");22004

 

 


以上是关于hadoop的输入类型InputFormats的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Hadoop 中使用自定义类型

Hadoop Mapreduce的框架组成

Java UDF on Hadoop 输入参数——从 Pig on Hadoop 调用

hadoop中mapreduce的默认设置

学习笔记Hadoop—— MapReduce编程进阶

学习笔记Hadoop(十五)—— MapReduce编程进阶