Hadoop文件切分的源码

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop文件切分的源码相关的知识,希望对你有一定的参考价值。


TextInputFormat

Hadoop文件的切分原则:

一 按每个文件切分

二 文件大小/分片大小《=1.1则划分为一个文件,否则切分为2个文件

三 一个切片一个Maptask,一个Maptask代表一个并行度

分片默认设置

Hadoop文件切分的源码_big

分片切分的核心源码

public List<InputSplit> getSplits(JobContext job) throws IOException 
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);

boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
//循环文件
for (FileStatus file: files)
if (ignoreDirs && file.isDirectory())
continue;

//获取文件的长度值
Path path = file.getPath();
long length = file.getLen();
if (length != 0)
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus)
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
else
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);

//是否支持切割
if (isSplitable(job, path))
long blockSize = file.getBlockSize();
//由集群指定的块大小,最小和最大 下面对切片大小有描述
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
//判断是否可以切成一块,如果大于1.1切成两片,如果小于1.1形成一片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;


if (bytesRemaining != 0)
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));

else // not splitable
if (LOG.isDebugEnabled())
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize))
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());


splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));

else
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));


// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled())
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));

return splits;

修改分片规则

//取maxsize和blocksize的最小值
//可以控制minsize和maxsize来控制切片大小
protected long computeSplitSize(long blockSize, long minSize,
long maxSize)
return Math.max(minSize, Math.min(maxSize, blockSize));

流程的总结

Hadoop文件切分的源码_List_02

CombineTextInputFormat

Hadoop文件切分的源码_big


Hadoop文件切分的源码_big


Hadoop文件切分的源码_List_05


以上是关于Hadoop文件切分的源码的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop ---- MapReduce

Hadoop TextInputFormat源码分析

hadoop功能模块之hdfs

Hadoop旧mapreduce的map任务切分原理

Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)

hadoop核心组件