Lab 1: MapReduce

6.824 Lab 1: MapReduce

Part1 Map/Reduce input and output

主要是实现 mapreduce/common_map.go 中的 doMap() 函数以及 mapreduce/common_reduce.go 中的 doReduce() 函数

doMap()函数:

  1. 读取 inFile 文件的内容,然后经过用户定义的 mapF() 函数得到一个 KeyValue列表
  2. 对于每个KeyValue值,经过ihash平均分配给nReduce个中间文件,中间文件可由reduceName() 得到名字,注意读写文件需要用json来序列化

疑问:

  • 如果文件特别大,不可能一次性全读到内存中啊
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
// 读取文件,并处理产生键值对
cont, _ := ioutil.ReadFile(inFile)
contents := string(cont)
resultKV := mapF(inFile, contents)

reduceFiles := make([]*os.File, nReduce)
for i := 0; i < nReduce; i++ {
file, _ := os.OpenFile(reduceName(jobName, mapTask, i), os.O_CREATE|os.O_WRONLY, 0666)
reduceFiles[i] = file
defer file.Close()
}

// 将键值对写入nReduce个文件中
for _, keyValue := range resultKV {
r := ihash(keyValue.Key) % nReduce
file := reduceFiles[r]
enc := json.NewEncoder(file)
_ = enc.Encode(&keyValue)
}
}

doReduce

  1. 从所有中间文件中读取到内存中,读出KeyValue值后将相同key的value合并,这里可以使用map。然后按照key进行排序
  2. 对新的key-value使用 reduceF() 函数,即可得到例如 hello: 2, word: 3 的词频结果,写入新的merge文件,merge文件可由mergeName()得到名字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
var keyValues []KeyValue
for i := 0; i < nMap; i++ {
reduceFileName := reduceName(jobName, i, reduceTask)
file, err := os.Open(reduceFileName)
if err != nil {
fmt.Println("doReduce can not open:", reduceFileName)
return
}
defer file.Close()
br := bufio.NewReader(file)
for {
line, _, next := br.ReadLine()
var tmp KeyValue
json.Unmarshal(line, &tmp)
if next == io.EOF {
break
}
keyValues = append(keyValues, tmp)
}
}
sort.Slice(keyValues, func(i, j int) bool {
return keyValues[i].Key < keyValues[j].Key
})

oFile, _ := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY, 0666)
defer oFile.Close()
enc := json.NewEncoder(oFile)

tmp := keyValues[0].Key
var values []string
for _, keyValue := range keyValues {
if keyValue.Key == tmp {
values = append(values, keyValue.Value)
} else {
enc.Encode(KeyValue{tmp, reduceF(tmp, values)})
values = []string{keyValue.Value}
tmp = keyValue.Key
}
}
enc.Encode(KeyValue{tmp, reduceF(tmp, values)})
}

最后在终端输入 go test -run Sequential进行测试,如果出现

1
2
3
4
5
6
7
$ cd 6.824
$ export "GOPATH=$PWD" # go needs $GOPATH to be set to the project's working directory
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential

.\master_rpc.go:48: debug call has arguments but no formatting directives
FAIL 6.824/src/mapreduce [build failed]

注释掉 master_rpc.go的48行 debug("RegistrationServer: accept error", err) 即可

Part2 Single-worker word count

实现main/wc.go 中的mapF() and reduceF() 方法,统计单词出现次数

1
2
3
4
5
6
7
8
9
10
11
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
words := strings.FieldsFunc(contents, func(r rune) bool {
return !unicode.IsLetter(r)
})
var kvs []mapreduce.KeyValue
for _, word := range words {
kvs = append(kvs, mapreduce.KeyValue{Key: word, Value: "1"})
}
return kvs
}
1
2
3
4
func reduceF(key string, values []string) string {
// Your code here (Part II).
return strconv.Itoa(len(values))
}

运行:

1
2
3
4
$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt

Part III: Distributing MapReduce tasks

文案翻译:

您当前的实现只能一次运行一个任务。 Map / Reduce最大的卖点之一是,它可以自动并行化普通的顺序代码,而无需开发人员进行任何额外的工作。 在本部分的实验中,您将完成一个MapReduce版本,该版本将工作划分为一组在多个内核上并行运行的工作线程。 虽然不像实际的Map / Reduce部署那样分布在多台计算机上,但是您的实现将使用RPC来模拟分布式计算。

mapreduce / master.go中的代码完成了管理MapReduce作业的大部分工作。 我们还在mapreduce / worker.go中为您提供了工作线程的完整代码,并在mapreduce / common_rpc.go中为您提供了处理RPC的一些代码。

您的工作是在mapreduce / schedule.go中实现schedule()。 Master在MapReduce作业期间两次调用schedule(),一次在Map阶段,一次在Reduce阶段。 schedule()的工作是将任务分发给可用的worker。 通常tasks数超过worker数,因此schedule()必须给每个工作线程一系列任务, schedule()应等待所有任务完成,然后返回。

schedule()registerChan中获取空闲的worker的地址,然后将任务非配给空闲的worker执行.

schedule()通过Worker.DoTaskRPC发送给worder来通知worker执行任务。 该RPC的参数由DoTaskArgsmapreduce / common_rpc.go中定义。 File元素仅由Map任务使用,是要读取的文件的名称; schedule()可以在mapFiles中找到这些文件名。

使用mapreduce / common_rpc.go中的call()函数将RPC发送给工作程序。 第一个参数是worker的地址,从registerChan读取。 第二个参数应为Worker.DoTask。 第三个参数应为DoTaskArgs结构,最后一个参数应为nil。

使用go test -run TestParallel来测试您的解决方案。 这将执行两个测试,TestParallelBasic和TestParallelCheck; 后者验证您的scheduler程序执行task的并行度

思路:

  • 使用for循环启动需要执行的任务
  • 使用goroutine执行每个任务
  • 等待所有任务执行结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
var waitGroup sync.WaitGroup
for i := 0; i < ntasks; i++ {
waitGroup.Add(1)
var taskArgs DoTaskArgs
taskArgs.JobName = jobName
taskArgs.Phase = phase
taskArgs.TaskNumber = i
taskArgs.NumOtherPhase = n_other
if phase == mapPhase {
taskArgs.File = mapFiles[i]
}
go func() {
defer waitGroup.Done()
worker := <-registerChan
if !call(worker, "Worker.DoTask", &taskArgs, nil) {
log.Fatal("call RPC failed")
}
go func() { registerChan <- worker }()
}()
}
waitGroup.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}

参考:
Go官方库RPC开发指南
Package rpc
谈谈 Golang 中的 Data Race
sync.Mutex 互斥锁
Golang中WaitGroup使用的一点坑

Part IV: Handling worker failures

schedule调度的任务可能会发生错误,需要将发生错误的任务在一个空闲的worker上重新执行。这里可以对发生错误的任务使用递归的方式在一个新的worker执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
var waitGroup sync.WaitGroup
for i := 0; i < ntasks; i++ {
waitGroup.Add(1)
var taskArgs DoTaskArgs
taskArgs.JobName = jobName
taskArgs.Phase = phase
taskArgs.TaskNumber = i
taskArgs.NumOtherPhase = n_other
if phase == mapPhase {
taskArgs.File = mapFiles[i]
}
go func() {
defer waitGroup.Done()
for {
worker := <-registerChan
if call(worker, "Worker.DoTask", taskArgs, nil) {
go func() {
registerChan <- worker
}()
break
}

}

}()
}
waitGroup.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}

Part V: Inverted index generation (optional, does not count in grade)

倒排索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
DocMaps := make(map[string]string)
var kv []mapreduce.KeyValue
for _, word := range words {
DocMaps[word] = document
}
for k, doc := range DocMaps {
kv = append(kv, mapreduce.KeyValue{Key: k, Value: doc})
}
return kv
}

// The reduce function is called once for each key generated by Map, with a
// list of that key's string value (merged across all inputs). The return value
// should be a single output value for that key.
func reduceF(key string, values []string) string {
// Your code here (Part V).
nDoc := len(values)
sort.Strings(values)
resString := strconv.Itoa(nDoc) + " "
for _, v := range values {
resString = resString + v + ","
}
return resString[:(len(resString)-1)]
}

参考

用go实现MapReduce部分功能
MIT 6.824 Distributed Systems Lab-1 笔记