如何将java类对象作为mapreduce中map函数的输入?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将java类对象作为mapreduce中map函数的输入?相关的知识,希望对你有一定的参考价值。

1.首先介绍一下wordcount 早mapreduce框架中的 对应关系
大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 Text之类的 对象,并不是 file对象
二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的
三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce?

一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);


我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具体实现参考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。

二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:
RecordReader:
RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

可以看到接口中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;

FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类

在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象

那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,

下面继续看看这些RecordReader是如何被MapReduce框架使用的

终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException
protected void cleanup(Context context ) throws IOException, InterruptedException
public void run(Context context) throws IOException, InterruptedException

我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split)
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;


RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?

我们可以想象 这里 应该被框架调用的可能性比较大了,那么mapreduce 框架是怎么分别来调用map和reduce呢?
还以为分析完map就完事了,才发现这里仅仅是做了mapreduce 框架调用前的一些准备工作,

还是继续分析 下 mapreduce 框架调用吧:

1.在 job提交 任务之后 首先由jobtrack 分发任务,

在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper

在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。
参考技术A 1首先介绍一下wordcount 早mapreduce框架中的 对应关系 大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce; 大家都明白 map接受一个参数,经过map处如何将java类对象作为mapreduce中map函数的输入?本回答被提问者采纳

十MapReduce--InputFormat以及RecordReader抽象类

一、基本原理

? 在map执行之前,需要将数据进行切片,每个切片对应一个map任务。而每个map任务并不是直接处理这些切片数据的,它是处理KV的。所以问题有两个:数据是如何切片的、切片是如何转为KV给map处理的。
? 这就涉及到两个抽象类,InputFormat以及 RecordReader。具体为什么是这两个抽象类,请看之前input的源码分析

1、InputFormat

public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

我们看到,这个抽象类就两个方法
getSplits:看名字就知道是用来将数据处理成切片的了
createRecordReader:就是用来创建RecordReader对象的。
所以这就是一个InputFormat基本的功能

2、 RecordReader

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    public RecordReader() {
    }

    //初始化,一般就是读取切片的数据
    public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;

    //检查是否还有下一对KV,并且如果有,实际上会将其处理成KV,并赋值给this.key和this.value
    public abstract boolean nextKeyValue() throws IOException, InterruptedException;

    //返回一个key
    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

    //返回一个value
    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

    //返回是否在处理
    public abstract float getProgress() throws IOException, InterruptedException;

    //关闭reader
    public abstract void close() throws IOException;
}

这个抽象类就涉及到读取切片的数据,处理成KV结构。而在input源码分析中说到,mapper.run方法中通过 context.getCurrentKey() 类似的方法获取key其实就是调用这个RecordReader中的这些get方法而已。

3、InputFormat以及 RecordReader的关系

从上面的源码可以看到。
InputFormat:负责规划切片信息,以及创建RecordReader对象
RecordReader:负责按照切片规划去读取当前mapper处理的切片数据,并将其处理成KV形式,然后通过context传递给mapper。

二、InputFormat以及 RecordReader常用实现类

常用的有:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat(自定义有另外的文章讲)

1、TextInputFormat

? 这是默认的InputFormat,切片方式是按数据块的方式切割,默认大小block大小。一个文件至少是一个切片(无论多小)。因为这个类继承FileInputFormat,使用的是其父类定义的getsplit() 方法进行切片。
? 使用的RecordReader是LineRecordReader。处理切片成KV时,每条记录是一行输入。键K是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。

2、KeyValueTextInputFormat

? 这个类也是使用父类FileInputFormat的getsplit() 方法进行切片,所以切片方式和上面一致。
? 使用的RecordReader是KeyValueLineRecordReader。每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");来设定分隔符。默认分隔符是tab( )。

3、NLineInputFormat

? 这个类虽然继承了FileInputFormat,但是自己重写了getSplit方法,使用另外的方式来切片。是按指定的行数来切片,比如5行,那就5行作为一个切片,无论数据大小。通过mapreduce.input.lineinputformat.linespermap 这个参数设置切片行数。
? 使用的RecordReader是LineRecordReader。和上面类似,不重复说。

4、CombineTextInputFormat

? 这个类继承于 CombineFileInputFormat,父类继承于FileInputFormat。在CombineFileInputFormat中重写了 getSplits方法。因为FileInputFormat默认无论多小的文件,一个文件至少是一个切片。如果遇到很多小文件,就会导致很多切片。而这里的切片方式就是严格按照大小来切片,会将小文件集合在一起,达到指定大小,才作为一个切片。
? 使用的RecordReader是CombineFileRecordReader。处理方式和 LineRecordReader类似,只不过切片可能是来自多个文件,读取方式上略显麻烦。

三、设置使用指定的inputformat

job.setInputFormatClass(xxxInputFormat.class);

以上是关于如何将java类对象作为mapreduce中map函数的输入?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Result 对象中获取 HBase 表名作为 mapreduce 参数?

java MapReduce - Map类

Map可以将java类ByteArrayoutputstream作为值吗? Can List可以保存javax.mail.MimeMessage对象吗?

在map中怎么删除一个对象

十MapReduce--InputFormat以及RecordReader抽象类

将JFrame对象类作为值传递给java中的HashMap