Lab1 MapReduce学习笔记

Posted 技术随想录

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lab1 MapReduce学习笔记相关的知识,希望对你有一定的参考价值。

1. 序言

Lab1是参照论文MapReduce:Simplifed Data Processing on Large Clusters,实现一个简易单机版MapReduce系统。

MapReduce实现依赖于分布式文件系统(诸如GFS)。


2. 模型抽象

论文作者将大数据处理抽象为两个函数:Map以及Reduce

Map: 将用户输入的数据,按照用户提供的Map Function分解为一系列的key/value数据对;通过MapReduce library将众多的数据对按照key汇聚发送至Reduce function;

Reduce: 根据用户提供的Reduce Function将指定key对应的value数值汇聚;用户根据迭代器遍历value集合,以避免value集合过于庞大耗尽内存。


3. 执行过程


MapReduce系统中有诸多节点,其中有一个节点是master,其余节点是worker(6.824-2021课程中将master更正为coordinator)。

(1)map task: master将input data划分为M个task,worker节点通过rpc访问master领取map task(主要是读取数据);
(2)map phase: worker节点根据Map Function将input data分解为key/value数据对,并写入本地文件(文件名按照一定规则生成,避免重复),最终将写入完成的文件名发送至master节点;
(3)reduce task: 执行完map task的worker节点通过rpc访问matster节点领取reduce task;
(4)reduce phase: worker节点根据分配的reduce task(文件名列表)执行远程读取,根据用户编写的Reduce Function收集key/value数据对,merge的结果写入最终文件。


4. 任务总览

实验文件夹下有若干个以pg-*.txt规则命名的文件(每个文件都是电子书),我们的任务就是统计出电子书中出现过的单词,以及它们出现的次数。

具体执行过程,可参见下图:


以下是试验提供的Map/Reduce函数,以动态库形式链接至测试程序。

4.1 Map Function

func Map(filename string, contents string) []mr.KeyValue { // function to detect word separators. ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words. words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{} for _, w := range words { kv := mr.KeyValue{w, "1"} kva = append(kva, kv) } return kva}
Map Function执行逻辑如上所示,每读取一个单词,即构造以单词为key的key/value对。

4.2 Reduce Function

func Reduce(key string, values []string) string { // return the number of occurrences of this word. return strconv.Itoa(len(values))}

Reduce Function求取key对应的value数组长度,从而得到key出现的次数。


5. Worker节点

worker节点主要逻辑为,通过rpc向master节点申请map task或者reduce task,任务执行完毕后将结果同步至master节点。
具体如下所示:

func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for { reply := getCallResp() switch reply.TaskType { case Finish: break case Free: time.Sleep(100 * time.Microsecond) case MapFunc: results := handleMapFunc(mapf, reply.TaskId, reply.Filenames[0], reply.ReduceNum) syncMapReduceResult(reply.TaskId, reply.TaskType, reply.Version, results) case ReduceFunc: results := handleReduceFunc(reducef, reply.TaskId, reply.Filenames) syncMapReduceResult(reply.TaskId, reply.TaskType, reply.Version, results) } }}
注意事项:

(1) handleMapFunc

  • 需要将key根据ReduceNum(master节点给定)进行hash,分配写入到不同的中间文件中;

  • 中间文件名称,建议使用ioutil.TempFile进行写入,以确保中间文件名称不会同名(任务可能因为容错等因素重复认领)。

(2)handleReduceFunc

  • 读取reduce task分配的中间文件,统计key/value数据对,结果依然使用ioutil.TempFile写入临时文件;

  • 待master节点确认reduce task执行结果,会将上报的临时文件重命名为结果文件。

MapReduce通过底层的分布式文件系统,保证Reduce任务执行结果只会保存一份。


6. master节点

master节点主要任务为分派map task/reduce task,重命名reduce task上报的结果。
在生产环境中,MapReduce需要考虑诸多的容错措施,诸如任务运行状态监控,master节点状态保存(checkpoint等),异常数据标记处理,备份任务执行等。当前实验,仅要求
执行超时任务的重新分配

因此,master的代码主要分三个部分:

(1)任务分配

func (c *Coordinator) TaskSchedule(request *RpcRequest, reply *RpcReply) error { c.lock.Lock() defer c.lock.Unlock()
switch c.stage { case MapStage: if task := c.dispatchMapTask(); task != nil { reply.TaskId = uint(task.Index) reply.TaskType = MapFunc reply.Filenames = []string{task.Files} reply.ReduceNum = c.reduceNum reply.Version = task.Version
go c.monitorTaskTimeout(task) } else { reply.TaskType = Free // 没有任务可以分配 } case ReduceStage: if task := c.dispatchReduceTask(); task != nil { reply.TaskId = uint(task.Index) reply.TaskType = ReduceFunc reply.Filenames = task.Files reply.ReduceNum = c.reduceNum reply.Version = task.Version
go c.monitorTaskTimeout(task) } else { reply.TaskType = Free // 没有任务可以分配 } case CoordinatorDone: reply.TaskType = Finish } return nil}

(2)监控任务是否超时

/**监控任务是否超时,或者已完成*/func (c *Coordinator) monitorTaskTimeout(task interface{}) { switch t := task.(type) { case *mapTask: { select { case <-time.After(10 * time.Second): { c.lock.Lock() t.State = Idle t.Version++ // 提升版本号,避免过期任务提交数据 c.lock.Unlock() } case <-t.done: return } } case *reduceTask: select { case <-time.After(10 * time.Second): { c.lock.Lock() t.State = Idle t.Version++ c.lock.Unlock() } case <-t.done: return } }}

(3)任务执行进度监控

/**使用信号量完成Coordinator状态更新*/func (c *Coordinator) updateStage() { c.cond.L.Lock() for !c.checkAllMapTaskFinished() { c.cond.Wait() }
c.stage = ReduceStage c.prepareReduceTasks() for !c.checkAllReduceTaskFinished() { c.cond.Wait() }
c.stage = CoordinatorDone c.lock.Unlock()}


7. 一点思考

MapReduce是我看的第一篇分布式论文,其在大数据领域前期表现十分出色,通过分布式架构完成了海量数据的处理;也为大型程序跑在廉价易崩溃的PC上运行提供了思路。

就我现在的浅显理解,在分布式这个场景下其存在以下缺点:

  • master节点是单节点,如果出现故障,需要人工介入进行处理,在此过程中系统停止服务;

  • master节点需要与众多worker节点进行交互,进行map/reduce task分发,维护任务执行状态,在规模越来越大的数据中心下,难以满足要求;

  • 整个系统的容错度比较低;

这大概就是paxos/raft等算法兴起的原因吧。


以上是关于Lab1 MapReduce学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

[6][lab] lab1: map reduce

ucore lab1实验笔记

MapReduce学习笔记,理解学习Hadoop的MapReduce计算系统

Hadoop学习笔记:使用Mrjob框架编写MapReduce

大数据学习笔记—MapReduce

学习笔记:python3,代码片段(2017)