Lab1 MapReduce学习笔记
Posted 技术随想录
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lab1 MapReduce学习笔记相关的知识,希望对你有一定的参考价值。
1. 序言
Lab1是参照论文MapReduce:Simplifed Data Processing on Large Clusters,实现一个简易单机版MapReduce系统。
MapReduce实现依赖于分布式文件系统(诸如GFS)。
2. 模型抽象
论文作者将大数据处理抽象为两个函数:Map
以及Reduce
。
Map: 将用户输入的数据,按照用户提供的Map Function
分解为一系列的key/value数据对;通过MapReduce library
将众多的数据对按照key
汇聚发送至Reduce function
;
Reduce: 根据用户提供的Reduce Function
将指定key
对应的value
数值汇聚;用户根据迭代器遍历value
集合,以避免value
集合过于庞大耗尽内存。
3. 执行过程
MapReduce系统中有诸多节点,其中有一个节点是master,其余节点是worker(6.824-2021课程中将master更正为coordinator)。
(1)map task: master将input data划分为M个task,worker节点通过rpc访问master领取map task(主要是读取数据);
(2)map phase: worker节点根据Map Function
将input data分解为key/value
数据对,并写入本地文件(文件名按照一定规则生成,避免重复),最终将写入完成的文件名发送至master节点;
(3)reduce task: 执行完map task的worker节点通过rpc访问matster节点领取reduce task;
(4)reduce phase: worker节点根据分配的reduce task(文件名列表)执行远程读取,根据用户编写的Reduce Function
收集key/value
数据对,merge的结果写入最终文件。
4. 任务总览
实验文件夹下有若干个以pg-*.txt规则命名的文件(每个文件都是电子书),我们的任务就是统计出电子书中出现过的单词,以及它们出现的次数。
具体执行过程,可参见下图:
以下是试验提供的Map/Reduce函数,以动态库形式链接至测试程序。
4.1 Map Function
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
Map Function执行逻辑如上所示,每读取一个单词,即构造以单词为key的key/value对。
4.2 Reduce Function
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
Reduce Function求取key对应的value数组长度,从而得到key出现的次数。
5. Worker节点
worker节点主要逻辑为,通过rpc向master节点申请map task或者reduce task,任务执行完毕后将结果同步至master节点。
具体如下所示:
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
reply := getCallResp()
switch reply.TaskType {
case Finish:
break
case Free:
time.Sleep(100 * time.Microsecond)
case MapFunc:
results := handleMapFunc(mapf, reply.TaskId, reply.Filenames[0], reply.ReduceNum)
syncMapReduceResult(reply.TaskId, reply.TaskType, reply.Version, results)
case ReduceFunc:
results := handleReduceFunc(reducef, reply.TaskId, reply.Filenames)
syncMapReduceResult(reply.TaskId, reply.TaskType, reply.Version, results)
}
}
}
注意事项:
(1) handleMapFunc
需要将key根据ReduceNum(master节点给定)进行hash,分配写入到不同的中间文件中;
中间文件名称,建议使用ioutil.TempFile进行写入,以确保中间文件名称不会同名(任务可能因为容错等因素重复认领)。
(2)handleReduceFunc
读取reduce task分配的中间文件,统计key/value数据对,结果依然使用ioutil.TempFile写入临时文件;
待master节点确认reduce task执行结果,会将上报的临时文件重命名为结果文件。
MapReduce通过底层的分布式文件系统,保证Reduce任务执行结果只会保存一份。
6. master节点
master节点主要任务为分派map task
/reduce task
,重命名reduce task
上报的结果。
在生产环境中,MapReduce需要考虑诸多的容错措施,诸如任务运行状态监控,master节点状态保存(checkpoint等),异常数据标记处理,备份任务执行等。当前实验,仅要求执行超时任务的重新分配。
因此,master的代码主要分三个部分:
(1)任务分配
func (c *Coordinator) TaskSchedule(request *RpcRequest, reply *RpcReply) error {
c.lock.Lock()
defer c.lock.Unlock()
switch c.stage {
case MapStage:
if task := c.dispatchMapTask(); task != nil {
reply.TaskId = uint(task.Index)
reply.TaskType = MapFunc
reply.Filenames = []string{task.Files}
reply.ReduceNum = c.reduceNum
reply.Version = task.Version
go c.monitorTaskTimeout(task)
} else {
reply.TaskType = Free // 没有任务可以分配
}
case ReduceStage:
if task := c.dispatchReduceTask(); task != nil {
reply.TaskId = uint(task.Index)
reply.TaskType = ReduceFunc
reply.Filenames = task.Files
reply.ReduceNum = c.reduceNum
reply.Version = task.Version
go c.monitorTaskTimeout(task)
} else {
reply.TaskType = Free // 没有任务可以分配
}
case CoordinatorDone:
reply.TaskType = Finish
}
return nil
}
(2)监控任务是否超时
/**
监控任务是否超时,或者已完成
*/
func (c *Coordinator) monitorTaskTimeout(task interface{}) {
switch t := task.(type) {
case *mapTask:
{
select {
case <-time.After(10 * time.Second):
{
c.lock.Lock()
t.State = Idle
t.Version++ // 提升版本号,避免过期任务提交数据
c.lock.Unlock()
}
case <-t.done:
return
}
}
case *reduceTask:
select {
case <-time.After(10 * time.Second):
{
c.lock.Lock()
t.State = Idle
t.Version++
c.lock.Unlock()
}
case <-t.done:
return
}
}
}
(3)任务执行进度监控
/**
使用信号量完成Coordinator状态更新
*/
func (c *Coordinator) updateStage() {
c.cond.L.Lock()
for !c.checkAllMapTaskFinished() {
c.cond.Wait()
}
c.stage = ReduceStage
c.prepareReduceTasks()
for !c.checkAllReduceTaskFinished() {
c.cond.Wait()
}
c.stage = CoordinatorDone
c.lock.Unlock()
}
7. 一点思考
MapReduce
是我看的第一篇分布式论文,其在大数据领域前期表现十分出色,通过分布式架构完成了海量数据的处理;也为大型程序跑在廉价易崩溃的PC上运行提供了思路。
就我现在的浅显理解,在分布式这个场景下其存在以下缺点:
master节点是单节点,如果出现故障,需要人工介入进行处理,在此过程中系统停止服务;
master节点需要与众多worker节点进行交互,进行map/reduce task分发,维护任务执行状态,在规模越来越大的数据中心下,难以满足要求;
整个系统的容错度比较低;
这大概就是paxos/raft等算法兴起的原因吧。
以上是关于Lab1 MapReduce学习笔记的主要内容,如果未能解决你的问题,请参考以下文章
MapReduce学习笔记,理解学习Hadoop的MapReduce计算系统