LAB1 partI
Posted chendatong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LAB1 partI相关的知识,希望对你有一定的参考价值。
序言
1. master.go 可以正常调用Distributed() 和 Sequential()。
Sequential 顺序执行主要用于调试。
2. master : 创建一个 RPC server(master_rpc.go). 等待 worker 注册服务(master.go Register() RPC 调用). schedule() (schedule.go) 决定调度一个任务给worker 和处理worker 工作失败的状况。
3. master 任务每个输入文件就是一个 map 任务 和调用 doMap() [common_map.go] .可以通过Sequential()直接调用或者 PRC DoTask 分配给worker 处理(worker.go) 。
每个doMap 会生成 nReduce 个中间文件 。 有 nMap 个map任务。那么在 所有map任务处理完,也就一共又 nMap X nReduce 个中间文件。
mrtmp.xxx-0-0 mrtmp.xxx-0-1 mrtmp.xxx-0-2 mrtmp.xxx-1-0 mrtmp.xxx-1-1 mrtmp.xxx-1-2
4. 接下来,master 会调用doReduce()[common_reduce.go], 像doMap() 一样可以直接调用或者通过一个worker . doReduce() 会为 第r 个 reduce 任务 搜集 每个 map任务生成的第r 个中间文件。和调用 reduce 函数处理每个 key的value值,生成 nReduce 结果文件。
5. master 调用 mr.merge() [master_splitmerge.go] 。合并前一步所生成的 nReduce 结果文件输出一个最终结果文件。
6. master 给 每个worker 发送Shutdown RPC, 和master自己的RPC server.
part I
实现 doMap()[ common_map.go] 和 doReduce() [ common_reduce.go]
通过 test_test.go 测试修改的程序正确性。
$ cd 6.824 $ export "GOPATH=$PWD" # go needs $GOPATH to be set to the project‘s working directory $ cd "$GOPATH/src/mapreduce" $ go test -run Sequential
可以 查看详细运行的信息
set debugEnabled = true in common.go, and add -v to the test command above
$ env "GOPATH=$PWD/../../" go test -v -run Sequential === RUN TestSequentialSingle master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 master: Map/Reduce task completed --- PASS: TestSequentialSingle (1.34s) === RUN TestSequentialMany master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 Merge: read mrtmp.test-res-1 Merge: read mrtmp.test-res-2 master: Map/Reduce task completed --- PASS: TestSequentialMany (1.33s) PASS ok mapreduce 2.672s
[email protected]:~/study/6.824/src/mapreduce$ go test -run Sequential
# runtime/cgo
exec: "gcc": executable file not found in $PATH
FAIL mapreduce [build failed]
安装 gcc
sudo apt-get update
sudo apt-get install gcc
[email protected]:~/study/6.824/src/mapreduce$ env "GOPATH=$PWD/../../" go test -v -run Sequential
# mapreduce
./master_rpc.go:48:10: debug call has arguments but no formatting directives
调试 遇到问题
[email protected]:~/study/6.824/src/mapreduce$ go test -v -run Sequential
=== RUN TestSequentialSingle
master: Starting Map/Reduce task test
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0x557207]
goroutine 19 [running]:
encoding/json.(*Encoder).Encode(0x0, 0x7a8f60, 0xc0000f8020, 0xe, 0xc000043dd8)
/usr/local/go/src/encoding/json/stream.go:196 +0x37
mapreduce.doMap(0x846c2c, 0x4, 0x0, 0xc000110080, 0x11, 0x1, 0x864d90)
/home/master/study/6.824/src/mapreduce/common_map.go:94 +0x331
mapreduce.Sequential.func1(0x849c4f, 0x8)
/home/master/study/6.824/src/mapreduce/master.go:69 +0x101
mapreduce.(*Master).run(0xc00011e0a0, 0x846c2c, 0x4, 0xc0000fa140, 0x1, 0x1, 0x1, 0xc0000f8100, 0xc0000f6540)
/home/master/study/6.824/src/mapreduce/master.go:142 +0x17b
created by mapreduce.Sequential
/home/master/study/6.824/src/mapreduce/master.go:65 +0x241
exit status 2
多了 : 号
common_map.go
package mapreduce import ( "hash/fnv" "io" "os" "io/ioutil" "log" "encoding/json" ) func doMap( jobName string, // the name of the MapReduce job mapTask int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(filename string, contents string) []KeyValue, ) { bs,err:=ioutil.ReadFile(inFile) if err != io.EOF && err != nil { log.Fatal(err) return } filecontent := string(bs) fmap := make(map[string]*json.Encoder) fo := make([]*os.File,nReduce) defer func(){ for _, fff := range fo { fff.Close() } }() kvs := mapF(inFile, filecontent) for _ , kv := range kvs { k := kv.Key kh := ihash(k) r := kh % nReduce tfileName := reduceName(jobName, mapTask, r) var encoder *json.Encoder encoder = fmap[tfileName] if encoder == nil { fe , err := os.Create(tfileName) if err != nil { log.Fatal(err) return } fo = append(fo,fe) encoder = json.NewEncoder(fe) fmap[tfileName] = encoder } encoder.Encode(&kv) } } func ihash(s string) int { h := fnv.New32a() h.Write([]byte(s)) return int(h.Sum32() & 0x7fffffff) }
common_reduce.go
package mapreduce import ( "io" "os" "log" "encoding/json" ) func doReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { kvm := make(map[string][]string) fo := make([]*os.File,nMap) defer func(){ for _, fff := range fo { fff.Close() } }() for i := 0 ; i< nMap; i++ { tf := reduceName(jobName, i, reduceTask) ff, err := os.Open(tf) if err != nil { log.Fatal(err) panic(err) } fo=append(fo,ff) decoder := json.NewDecoder(ff) var ky KeyValue for { if err := decoder.Decode(&ky); err == io.EOF { break } else if(err != nil) { log.Fatal(err) panic(err) } vlist := kvm[ky.Key]; vlist = append(vlist, ky.Value) kvm[ky.Key] = vlist } } tfileName := mergeName(jobName, reduceTask) fe,err := os.Create(tfileName) if err != nil { log.Fatal(err) panic(err) } defer func(){ fe.Close(); }() encoder := json.NewEncoder(fe) for k , v := range kvm { encoder.Encode(KeyValue{k, reduceF(k,v)}) } }
以上是关于LAB1 partI的主要内容,如果未能解决你的问题,请参考以下文章