hadoop的输入类型InputFormats
Posted 健哥说编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop的输入类型InputFormats相关的知识,希望对你有一定的参考价值。
输入
Hadoop的MapReduce读取何种形式的文件,即为输入。
1、TextInputFormat
TextInputFormat是默认的InputFormat。输入类型是LongWritable和Text。LongWritale是文件字节的偏移量,Text是这一行的文本内容。
(0,SomeLine)
(23,SomeLone)
【具体代码-略】
2、KeyValueTextInputFormat
1、通常情况下,文件中的每一行是一个键值对象,使用某个分界符号进行分隔,如制表符号(默认TextOutputFormat就是以制表符号做为分隔符号输出数据)。
2、如果要正确处理这类文件,则KeyValueTextInputFormat就比较合适。
3、可以通过mapreduce.input.keyvaluelinerecordreader.key.value.separator来指定分隔符号。它的默认值是一个制表符号。
如存在以下的文件:(使用->表示一个制表符号)
Line1->This is line One
Line2->This is line Two
则读取以后,获取到的数据为:
(Line1,This is line one)
(Line2,This is line Two)
1、KeyValueTextInputFormat返回的类型为<Text,Text>。内部使用的RecordReader为:KeyValueLine
RecordReader类。
代码示例:
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 使用KeyvalueTextInputFormat的示例
* @author wangjian
* @version 1.0 2018/5/19 0019
*/
public class Demo11_KeyValueTextInputFormat extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if(args.length!=2){
System.out.println("用法:input ouput");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Configuration config = getConf();
FileSystem fs = FileSystem.get(config);
//删除输出的目录,如果存在的话
Path path = new Path(args[1]);
if(fs.exists(path)){
fs.delete(path,true);
}
Job job = Job.getInstance(config,"KeyValueTextInputFormat");
job.setInputFormatClass(KeyValueTextInputFormat.class);//添加输入类型
KeyValueTextInputFormat.addInputPath(job,new Path(args[0]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
int code = job.waitForCompletion(true)?0:1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo11_KeyValueTextInputFormat(),args);
System.exit(code);
}
}
如果存在以下文件内容:
Line1Jack And Mary
Line2Rose And Alex
Line3Mary And Mark
由于上面的程序并没有处理,而是使用的默认Mapper,所以,还是原样输出:
Line1Jack And Mary
Line2Rose And Alex
Line3Mary And Mark
3、二进制输入SequenceFileInputFormat
1、hadoop的mr不只是可以处理文本文件,也可以处理二进制格式的文件。
2、hadoop的顺序文件格式存储的是分键-值对的序列。
3、顺序文件是可分割的,所以它们很符合MR数据的格式。且还支持压缩。
4、如果使用SequenceFileInputFormat,只需要保证map的输入类型匹配就可以了。
示例代码1-写SequenceFile文件:
以下示例将某个目录下文件,都读取出来保存到一个SequenceFile中。
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 读取茜个目录下的文件,保存到一个序列文件中格式:LongWritable,Text
* 写一个SequenceFile,读取某个目录下的所有文件保存成SequenceFile
* @author wangjian
* @version 1.0 2018/5/20 0020
*/
public class Demo13_WriteSequenceFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage : input output");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Configuration config = getConf();
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(config);
if (fs.exists(path)) {
fs.delete(path, true);
}
Job job = Job.getInstance(config, "WriteSequenceFile");
job.setInputFormatClass(TextInputFormat.class);//读取的可是文本文件
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(Mapper.class);
//指定为SequenceFileOutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class); //指定保存为顺序文件
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//注意在没有指定Mapper和Reduce时的输出类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
int code = job.waitForCompletion(true) ? 0 : 1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo13_WriteSequenceFile(), args);
System.exit(code);
}
}
示例代码2-读取SequenceFile:
开发一个MR驱动程序,读取SequenceFile中的内容。注意,SequenceFile中的内容以(key,value)的形式保存,所以,只要设置的Mapper的K1和V1为SequenceFile的输入类型即可。
以下程序,将读取SequenceFile中的内容,并进行字符统计:
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 直接使用SequenceFileInputFormat读取顺序文件中的数据,然后保存到指定的文件中<br>
* 由于保存的是TextFileOutput所以,将会保存成(LongWritable,Text)的文本类型
* @author wangjian
* @version 1.0 2018/5/21 0021
*/
public class Demo14_ReadSequenceFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("usage : Input output");
return -1;
}
Configuration config = getConf();
FileSystem fs = FileSystem.get(config);
Path path = new Path(args[1]);
if (fs.exists(path)) {
fs.delete(path, true);
}
Job job = Job.getInstance(config, "ReadSequenceFile");
job.setJarByClass(getClass());
//设置输入类型
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(ReadSequenceFileMapper.class);
//设置输出的文件类型为文本文件
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Reducer
job.setReducerClass(ReadSequenceFileReduce.class);
//设置输入的key,value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
int code = job.waitForCompletion(true) ? 0 : 1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo14_ReadSequenceFile(), args);
System.exit(code);
}
/**
* 务必保持此Mapper与SequenceFile中的key1,value1的值保持相同
*/
public static class ReadSequenceFileMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text key = new Text();
private LongWritable val = new LongWritable();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\\s+");
for(String str:strs){
this.key.set(str);
this.val.set(1);
context.write(this.key,this.val);
}
}
}
/**
* 使用Reduce将字符进行统计
*/
public static class ReadSequenceFileReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
private LongWritable val = new LongWritable();
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for(LongWritable w:values){
sum+=w.get();
}
this.val.set(sum);
context.write(key,this.val);
}
}
}
在集群上测试:
使用类:Demo13_WriteSequenceFile在hdfs上生成SequenceFile序列文件。
$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo13_WriteSequenceFile /test /seq001
使用上面的类生成序列文件以后,可以使用-text查看序列文件中的数据,由于内容比较多,可以使用head只显示前几行数据:
[wangjian@hadoop41 jar]$ hdfs dfs -text /seq001/* | head
0总用量 96
0package cn.wangjian.mapreduce;
0package cn.wangjian.mapreduce;
0package cn.wangjian.mapreduce;
0package cn.wangjian.mapreduce;
0package cn.wangjian.mapreduce;
0package cn.wangjian.mapreduce;
13drwxr-xr-x. 4 wangjian hadoop 4096 4月 19 16:39 app
32import org.apache.hadoop.conf.Configuration;
32import org.apache.hadoop.conf.Configured;
然后再使用Demo14_ReadSequenceFile处理SequenceFile序列文件中的数据,如上例中进行wordcount统计:
$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo14_ReadSequenceFile /seq001 /out002
然后就可以使用cat查看里面的内容:
[wangjian@hadoop41 jar]$ hdfs dfs -cat /out002/* | head
3575650
"ChainMapperDemo");11002
"ChainMapperWithTool");11002
"Job1");11002
"Job2");11002
"WordCountWithTool");11002
(IntWritable11002
(LongWritable11002
(String33006
(args.length33006
4、多个输入MultipleInputs
MultipleInputs可以接收多个不同的输入,且每一个输出都可以接收不同的Mapper。最好次这些Mapper再合并到一个Reduce输出。
如:
MultipleInputs.addInputPath(job,new Path(路径1) ,
TextInputFormat.class, //指定这个目录的输入类型
Some1Mapper.class); // 指定处理这个目录处理的Mapper
MultipleInputs.addInputPath(job, new Path(路径2),
SequenceFileInputFormat.class,//设置另一个输入类型
Some2Mapper.class); //指定用于处理这个目录的Mapper
示例代码:
package cn.wangjian.mapreduce;
import cn.wangjian.mapreduce.mapper.SequenceFileWordCountMapper;
import cn.wangjian.mapreduce.mapper.WordCountMapper;
import cn.wangjian.mapreduce.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 多个输入
* @author wangjian
* @version 1.0 2018/5/22 0022
*/
public class Demo15_MultipleInputs extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if(args.length!=3){
System.out.println("usage : input1 input2 output");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Configuration config = getConf();
FileSystem fs = FileSystem.get(config);
Path path = new Path(args[2]);
if(fs.exists(path)){
fs.delete(path,true);
}
Job job = Job.getInstance(config,"MultipleInputs");
job.setJarByClass(getClass());
//声明MultipleInputs设置第一个输入
MultipleInputs.addInputPath(job,//参数Job
new Path(args[0]),//参数每个目录或是文件
TextInputFormat.class,//参数第一个输入处理类型
WordCountMapper.class);//处理第一个输入的Mapper
//声明第二个输入
MultipleInputs.addInputPath(job,//
new Path(args[1]),//
SequenceFileInputFormat.class,//
SequenceFileWordCountMapper.class);
//可选的,设置Reducer
job.setReducerClass(WordCountReducer.class);
job.setNumReduceTasks(1);
//设置输出
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job,new Path(args[2]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//开始任务
int code = job.waitForCompletion(true)?0:1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo15_MultipleInputs(),args);
System.exit(0);
}
}
在集群中执行:
$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo15_MultipleInputs /test /seq001 /out002
查看结果:
[wangjian@hadoop41 jar]$ hdfs dfs -cat /out002/* | head
7151300
"ChainMapperDemo");22004
"ChainMapperWithTool");22004
以上是关于hadoop的输入类型InputFormats的主要内容,如果未能解决你的问题,请参考以下文章