Part1 Map/Reduce input and output
主要是实现 mapreduce/common_map.go 中的 doMap() 函数以及 mapreduce/common_reduce.go 中的 doReduce() 函数
doMap()函数:
- 读取 inFile 文件的内容,然后经过用户定义的 mapF() 函数得到一个 KeyValue列表
- 对于每个KeyValue值,经过ihash平均分配给nReduce个中间文件,中间文件可由reduceName() 得到名字,注意读写文件需要用json来序列化
疑问:
- 如果文件特别大,不可能一次性全读到内存中啊
1 | func doMap( |
doReduce
- 从所有中间文件中读取到内存中,读出KeyValue值后将相同key的value合并,这里可以使用map。然后按照key进行排序
- 对新的key-value使用 reduceF() 函数,即可得到例如 hello: 2, word: 3 的词频结果,写入新的merge文件,merge文件可由mergeName()得到名字
1 | func doReduce( |
最后在终端输入 go test -run Sequential
进行测试,如果出现
1 | $ cd 6.824 |
注释掉 master_rpc.go
的48行 debug("RegistrationServer: accept error", err)
即可
Part2 Single-worker word count
实现main/wc.go
中的mapF() and reduceF()
方法,统计单词出现次数
1 | func mapF(filename string, contents string) []mapreduce.KeyValue { |
1 | func reduceF(key string, values []string) string { |
运行:
1 | cd 6.824 |
Part III: Distributing MapReduce tasks
文案翻译:
您当前的实现只能一次运行一个任务。 Map / Reduce最大的卖点之一是,它可以自动并行化普通的顺序代码,而无需开发人员进行任何额外的工作。 在本部分的实验中,您将完成一个MapReduce版本,该版本将工作划分为一组在多个内核上并行运行的工作线程。 虽然不像实际的Map / Reduce部署那样分布在多台计算机上,但是您的实现将使用RPC来模拟分布式计算。
mapreduce / master.go
中的代码完成了管理MapReduce作业的大部分工作。 我们还在mapreduce / worker.go
中为您提供了工作线程的完整代码,并在mapreduce / common_rpc.go
中为您提供了处理RPC的一些代码。
您的工作是在mapreduce / schedule.go
中实现schedule()
。 Master在MapReduce作业期间两次调用schedule()
,一次在Map阶段,一次在Reduce阶段。 schedule()
的工作是将任务分发给可用的worker。 通常tasks数超过worker数,因此schedule()
必须给每个工作线程一系列任务, schedule()
应等待所有任务完成,然后返回。
schedule()
从registerChan
中获取空闲的worker的地址,然后将任务非配给空闲的worker执行.
schedule()
通过Worker.DoTask
RPC发送给worder来通知worker执行任务。 该RPC的参数由DoTaskArgs
在mapreduce / common_rpc.go
中定义。 File元素仅由Map任务使用,是要读取的文件的名称; schedule()可以在mapFiles中找到这些文件名。
使用mapreduce / common_rpc.go
中的call()
函数将RPC发送给工作程序。 第一个参数是worker的地址,从registerChan读取。 第二个参数应为Worker.DoTask
。 第三个参数应为DoTaskArgs结构,最后一个参数应为nil。
使用go test -run TestParallel
来测试您的解决方案。 这将执行两个测试,TestParallelBasic和TestParallelCheck
; 后者验证您的scheduler程序执行task的并行度
思路:
- 使用for循环启动需要执行的任务
- 使用goroutine执行每个任务
- 等待所有任务执行结束
1 | func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { |
参考:
Go官方库RPC开发指南
Package rpc
谈谈 Golang 中的 Data Race
sync.Mutex 互斥锁
Golang中WaitGroup使用的一点坑
Part IV: Handling worker failures
schedule调度的任务可能会发生错误,需要将发生错误的任务在一个空闲的worker上重新执行。这里可以对发生错误的任务使用递归的方式在一个新的worker执行。
1 | func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { |
Part V: Inverted index generation (optional, does not count in grade)
倒排索引
1 | func mapF(document string, value string) (res []mapreduce.KeyValue) { |