撸论文系列之——MapReduce

Posted 移轴人生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了撸论文系列之——MapReduce相关的知识,希望对你有一定的参考价值。

初识—— 
MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。 
用途—— 
可以用下图表示: 

好处—— 
采用MapReduce架构可以使得没有并行计算和分布式处理系统开发经验的人员有效利用分布式系统的丰富资源。 
背景—— 
在处理海量原始数据过程中,由于输入的数据量巨大,如果需要在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。而分发数据和处理错误就成了难题,综合起来的代码非常复杂,原本简单的运算也变得难以处理。

编程模型—— 
利用一个输入key/value pair集合来产生一个输出的key/value pair集合。 
下面的伪代码诠释了整个过程:

map(String key,String value):    //key: document name
    //value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key,Iterator values):    //key: a word
    //value: a list of counts
    int result = 0;    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));


在概念上,用户定义的Map和Reduce函数都有相关联的类型: 
map(k1,v1)->list(k2,v2) 
reduce(k2,list(v2))->list(v2) 
一些常用的例子中可以体现出的MapReduce模型: 
分布式的Grep:Map函数输出匹配某个模式的一行,Reduce函数是一个恒等函数,即把中间数据复制到输出。 
计算URL访问频率:Map函数处理日志中web页面请求的记录,然后输出(URL,1)。Reduce函数把相同URL的value值都累加起来,产生(URL,记录总数)结果。 
倒排索引:Map函数分析每个文档输出一个(词,文档号)的列表,Reduce函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。 
分布式排序:Map函数从每个记录提取key,输出(key,record)。Reduce函数不改变任何的值。这个运算依赖分区机制和排序属性。 
执行过程—— 

撸论文系列之——MapReduce

使用分区函数将Map调用产生的中间key值分成R个不同分区,Reduce调用也被分布到多台机器上执行。由Master负责将一个Map任务或Reduce任务分配给一个空闲的worker。分配了了map任务的worker读取输入数据片段,解析出k/v pair,然后传递给用户自定义的map函数,生成并输出中间k/v pair,缓存在内存中。之后通过分区函数分成R个趋于,周期性写入本地磁盘,然后将磁盘上的存储位置回传给master,master负责将存储位置再传送给Reduce worker。Reduce worker通过存储位置,使用RPC从Map worker的磁盘上读取缓存数据。按照相同key值聚合,必须进行排序才可以。最后Reduce函数根据中间key值得到输出,完成后master唤醒用户程序调用MapReduce。 
容错—— 
1 worker故障 
master将一段时间内没有收到信息的worker标记为失效,在其上执行的map或reduce任务被重置为空闲状态,等待重新调度。而在其上已经完成的Map任务由于存储在这台机器(的本地磁盘)上,Map任务输出已不可访问,必须重新执行。而已经完成的Reduce任务存储在全局文件系统上,因此不需要重新执行。而执行调度过程中,也会通知所有执行Reduce任务的worker,之后从该机器读取数据的Reduce任务将从新的worker上读取。 
2 master失败 
还是检查点(checkpoint)方法,master会周期性将R个中间文件存储区域的大小和位置写入磁盘,如果失效,则从最后一个checkpoint开始启动另一个master进程。然而只有一个master进程,所以失效后应当立即终止mapreduce运算。 
存储位置,任务粒度和备用任务—— 
由于带宽很昂贵,所以我们尽量将GFS上存储的数据存放在本地磁盘上来节省网络带宽。而且master在调度map任务时也尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行。具体实现中对M和R的取值(将Map拆分为M个片段,Reduce拆分为R个片段)有一定的客观限制,master必须执行O(M+R)次调度,并且在内存中保存O(M*R)个状态。当一个MR操作接近完成时master调度备用任务进程来执行剩下的、处于处理中状态的任务。采用这种机制对于减少超大MR操作的总处理时间效果显著。 
性能—— 
举个例子,在一个大型集群上运行两个计算来衡量MR的性能。一个在大约1TB的数据中进行特定的模式匹配,另一个计算对大约1TB的数据进行排序。 

上图显示,随着参与MR计算的机器数量增加,处理速度也相应增加,当1764台worker参与计算时,处理速度达到了30GB/s。整个时间包括初始启动消耗(包括传送到worker,GFS打开文件,获取本地位置的时间),处理时间及结束时间。 

上中下三幅图分别表示输入数据读取、中间数据的发送以及输出最终文件的过程。 
可以看出时间主要消耗在磁盘写入上,总共花费了891s,而其他两个过程总共750s,远小于写入的时间。输入数据读取速度之所以比输出数据写入速度快很多,是因为做了优化,大多数数据都是在本地磁盘读取的,节省了网络带宽。而同时写入是写了两份,包括一个备份,这样保证了数据可靠性和可用性。如果底层文件系统使用类似容错编码,也可降低网络带宽的使用。第二列图是关闭backup之后的表现,明显会有还没完成的Reduce任务在执行,拖长了时间。 
最后一列图,大家可能没看明白,其实它是在执行过程中通过手动kill掉进程来模拟部分失效的场景,可以看出会有一些波动,是因为调度会重启worker的处理进程。

几个应用—— 
1 大规模机器学习问题 
2 google news的集群问题 
3 从公众查询产品的报告中抽取数据 
4 从网页中提取有用信息 
5 大规模图形计算

最后做个总结,MR之所以能成功的应用于多个领域归结于:首先,它封装了并行处理、容错处理、数据本地化优化、负载均衡等技术难点的细节,使得易于使用;其次,大量不同类型问题都可以通过使用MR轻易解决。第三,在大规模集群上运行部署的MR可以有效利用计算资源,适合涉及大量运算的问题。 
我们可以从MR开发过程中学到不少东西。 
约束编程模式使得并行和分布式计算很容易,网络带宽很昂贵,多次执行相同任务可解决硬件配置不平衡问题带来的隐患。


以上是关于撸论文系列之——MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

《撸轮子系列》之LoadPE

手撸系列之——ORM(对象关系映射)

MapReduce之单词计数

Hadoop MapReduce编程 API入门系列之join(二十五)(未完)

Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)