MapReduce小文件处理之CombineFileInputFormat实现
Posted jhcelue
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce小文件处理之CombineFileInputFormat实现相关的知识,希望对你有一定的参考价值。
在MapReduce使用过程中。一般会遇到输入文件特别小(几百KB、几十MB)。而Hadoop默认会为每一个文件向yarn申请一个container启动map,container的启动关闭是很耗时的。
Hadoop提供了CombineFileInputFormat。一个抽象类。作用是将多个小文件合并到一个map中,我们仅仅需实现三个类:
CompressedCombineFileInputFormat
CompressedCombineFileRecordReader
CompressedCombineFileWritable
maven
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.5.0-cdh5.2.1</version> </dependency>
CompressedCombineFileInputFormat.java
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import java.io.IOException; public class CompressedCombineFileInputFormat extends CombineFileInputFormat<CompressedCombineFileWritable, Text> { public CompressedCombineFileInputFormat() { super(); } public RecordReader<CompressedCombineFileWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader<CompressedCombineFileWritable, Text>((CombineFileSplit) split, context, CompressedCombineFileRecordReader.class); } @Override protected boolean isSplitable(JobContext context, Path file) { return false; } }
CompressedCombineFileRecordReader.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.util.LineReader; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CompressedCombineFileRecordReader extends RecordReader<CompressedCombineFileWritable, Text> { private long startOffset; private long end; private long pos; private FileSystem fs; private Path path; private Path dPath; private CompressedCombineFileWritable key = new CompressedCombineFileWritable(); private Text value; private long rlength; private FSDataInputStream fileIn; private LineReader reader; public CompressedCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException { Configuration currentConf = context.getConfiguration(); this.path = split.getPath(index); boolean isCompressed = findCodec(currentConf, path); if (isCompressed) codecWiseDecompress(context.getConfiguration()); fs = this.path.getFileSystem(currentConf); this.startOffset = split.getOffset(index); if (isCompressed) { this.end = startOffset + rlength; } else { this.end = startOffset + split.getLength(index); dPath = path; } boolean skipFirstLine = false; fileIn = fs.open(dPath); if (isCompressed) fs.deleteOnExit(dPath); if (startOffset != 0) { skipFirstLine = true; --startOffset; fileIn.seek(startOffset); } reader = new LineReader(fileIn); if (skipFirstLine) { startOffset += reader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - startOffset)); } this.pos = startOffset; } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } public void close() throws IOException { } public float getProgress() throws IOException { if (startOffset == end) { return 0.0f; } else { return Math.min(1.0f, (pos - startOffset) / (float) (end - startOffset)); } } public boolean nextKeyValue() throws IOException { if (key.fileName == null) { key = new CompressedCombineFileWritable(); key.fileName = dPath.getName(); } key.offset = pos; if (value == null) { value = new Text(); } int newSize = 0; if (pos < end) { newSize = reader.readLine(value); pos += newSize; } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } public CompressedCombineFileWritable getCurrentKey() throws IOException, InterruptedException { return key; } public Text getCurrentValue() throws IOException, InterruptedException { return value; } private void codecWiseDecompress(Configuration conf) throws IOException { CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(path); if (codec == null) { System.err.println("No Codec Found For " + path); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(path.toString(), codec.getDefaultExtension()); dPath = new Path(outputUri); InputStream in = null; OutputStream out = null; fs = this.path.getFileSystem(conf); try { in = codec.createInputStream(fs.open(path)); out = fs.create(dPath); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); rlength = fs.getFileStatus(dPath).getLen(); } } private boolean findCodec(Configuration conf, Path p) { CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(path); if (codec == null) return false; else return true; } }
CompressedCombineFileWritable.java
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CompressedCombineFileWritable implements WritableComparable { public long offset; public String fileName; public CompressedCombineFileWritable() { super(); } public CompressedCombineFileWritable(long offset, String fileName) { super(); this.offset = offset; this.fileName = fileName; } public void readFields(DataInput in) throws IOException { this.offset = in.readLong(); this.fileName = Text.readString(in); } public void write(DataOutput out) throws IOException { out.writeLong(offset); Text.writeString(out, fileName); } public int compareTo(Object o) { CompressedCombineFileWritable that = (CompressedCombineFileWritable) o; int f = this.fileName.compareTo(that.fileName); if (f == 0) { return (int) Math.signum((double) (this.offset - that.offset)); } return f; } @Override public boolean equals(Object obj) { if (obj instanceof CompressedCombineFileWritable) return this.compareTo(obj) == 0; return false; } @Override public int hashCode() { final int hashPrime = 47; int hash = 13; hash = hashPrime * hash + (this.fileName != null ? this.fileName.hashCode() : 0); hash = hashPrime * hash + (int) (this.offset ^ (this.offset >>> 16)); return hash; } @Override public String toString() { return this.fileName + "-" + this.offset; } }
MR測试类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.StringTokenizer; public class CFWordCount extends Configured implements Tool { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new CFWordCount(), args)); } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.setLong(CombineFileInputFormat.SPLIT_MAXSIZE, 128 * 1024 * 1024); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); conf.setClass(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class); Job job = new Job(conf); job.setJobName("CombineFile Demo"); job.setJarByClass(CFWordCount.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setInputFormatClass(CompressedCombineFileInputFormat.class); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(IntSumReducer.class); job.setNumReduceTasks(1); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.submit(); job.waitForCompletion(true); return 0; } public static class TestMapper extends Mapper<CompressedCombineFileWritable, Text, Text, IntWritable> { private Text txt = new Text(); private IntWritable count = new IntWritable(1); public void map(CompressedCombineFileWritable key, Text val, Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(val.toString()); while (st.hasMoreTokens()) { txt.set(st.nextToken()); context.write(txt, count); } } } }
注意:使用CombineFileInputFormat过程中发现不管小文件积累到多大,甚至超过HDFS BlockSize后。仍然仅仅有一个map split,查看 hadoop 的源代码发现,使用CombineFileInputFormat时。假设没有显示指定CombineFileInputFormat.SPLIT_MAXSIZE,默认不会切分map split,解决方法例如以下:
conf.setLong(CombineFileInputFormat.SPLIT_MAXSIZE, 128 * 1024 * 1024);
以上是关于MapReduce小文件处理之CombineFileInputFormat实现的主要内容,如果未能解决你的问题,请参考以下文章
小白学习大数据测试之hadoop hdfs和MapReduce小实战
大数据之Hadoop(MapReduce):CombineTextInputFormat案例实操
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客