6.5840-lab1-mr记录

6.5840分布式(前6.824)—Lab1 MapReduce 学习记录
MapReduce
其次论文讨论了大量减少网络通信的优化技巧,因为带宽是瓶颈。
Lab任务
- 实现一个分布式mr,其中包括一个coordinate和多个worker,每个worker通过RPC与coordinate通信
- 在
mr/coordinator.go
,mr/worker.go
,mr/rpc.go
补充内容 - worker负责从coordinator中读入内容,然后进行加工写入worker本地
- coordinator负责发送任务给worker,同时如果coordinator发现一个worker无法在规定时间(10s)内完成任务,就将任务分配给其他worker
- 给出了
main/mrcoordinator.go
和main/mrworker.go
两个样例,不要修改他们。 - 运行
go build -buildmode=plugin ../mrapps/wc.go
来将这个go文件打包成插件wc.so
供Map使用。然后运行go run mrcoordinator.go pg-*.txt
建立并启动coordinator,并将要处理的文本文件名传给coordinator。然后运行go run mrworker.go wc.so
,读取之前生成的插件中含有的map和reduce函数,将其传给worker,让他们调用。 - 然后给了一个测试方法,即
bash ~/6.5840/src/main/test-mr.sh
lab的一些要求
- map将输入文件分成多组中间文件,供多个reduce使用,同时要传递
nReduce
这个变量来创建coordinator - reduce的输出文件名格式为
mr-out-X
,其中X为该reduce任务的编号 - 每个reduce文件里面的每一行,格式为
"%v %v"
导出的一行,分别对应key、value,方式为`fmt.Fprintf(ofile, “%v %v\n”, intermediate[i].Key, output) - worker的map要输出到当前目录,以方便reduce读取
- 对于
mr/coordinator.go
中的Done方法要返回true,才表示这次mapreduce结束了,mrcoordinator.go
会终止 - mapreduce结束,coordinator会终止,worker也要终止,有很多种设计方法,包括当worker联系不到coordinator时就自行终止。
Lab的一些提示
- 建议先修改
mr/worker.go'
的Worker()
来实现worker通过rpc从coordinator请求任务文件,然后修改coordinator的代码来返回给worker任务对应的文件名,然后再实现worker读取文件内容并调用map的操作 - 中间文件的命名可以是
mr-X-Y
,其中X是Map任务的编号,Y是Reduce任务的编号,所以每个mapper都会产生reducer个文件的中间文件 - 虽然reduce的输出是一行一行的kv,但是map要给reduce的输出不一定是这样,可以采用读写json的方式来让map的输出有效地被reduce读入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15//写
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
//读
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
} - worker可以使用ihash函数来选择map的输出让哪个reduce接收
mrsequential.go
里的读入、写入、排序操作可以自行借用- coordinator要注意并发问题
- 使用
go run -race. test-mr.sh
来检测并发问题 - reducer必须在所有的mapper结束之后才能开始操作,所以必须请求coordinator,对于这类轮询可以采用nio的模式,用一个handler loop来接收请求,灵活使用
time.Sleep()
或sync.Cond
,一把大锁保平安 - coordinator不能识别worker是正常、崩掉还是处理过慢,这次lab里允许worker通过每10s的发送心跳检测来告知coordinator自己没挂,可以多加一个字段以合并请求操作和心跳操作,比如isHeart的布尔类型;或者coordinator对每一个分配出去的任务设一个定时器,时间到没做完就算worker出问题了
- Go的rpc发送结构体时,只能发送字段名开头大写的字段,子结构体也一样
- rpc调用call返回信息时,reply对象所有的字段都是默认值即可,不要有非默认的赋值,不然可能会出问题,就像下面这样,空构造即可
1
2reply := SomeType{}
call(..., &reply) - 一些测试的东西,懒得翻译了
完成过程
配置环境
- Go(参考我写的deepin搭建go开发环境(git、go、neovim、NvChad、Nerd Font),只要看go部分就行了)
deepin和ubuntu都是用apt
注意我的go版本是1.20.4,所以部分代码可能不能在较低版本的Go上编译成功 - Goland(因为我vsc总是报权限不足,nvim也用的繁琐,果然还得是正宗IDE靠谱)
链接:Jetbrains全家桶破解文件 - 配一下Goland运行程序的方式
参考了下别人配的Goland教程,可以减少很多的测试时间,而且也是第一次学着搭配命令行参数配置运行程序
1
2
3go build -buildmode=plugin ../mrapps/wc.go
rm -f mr-out*
rm -f mr-*.json
注意Working directory很关键,因为很多程序是基于相对路径来的,你设置错运行目录了,他可能就找不到其他文件了
虽然课程作业里面 用命令行可以以pg-*.txt
作为入参,但是Goland不惯着你,只能老老实实把每个参数敲上去
不知道为什么有的博主在Go tool arguments
那里写了-i
,我的go版本是1.20.4,好像没这个参数了
1 | pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt |
同时注意这个Before launch这里,就可以执行那个删除之前生成的文件和编译插件的作用
下面这个是test的
注意打开下面这个可以开启竞态检查
具体实现步骤
- 搞清楚哪些代码是我们需要的,哪些是要改和补充的。
实际上我们只需要看main/mrcoordinator.go
、main/mrsequential.go
、main/mrworker.go
,好了上面这三个相当于启动类,mrsequential是一个没有用rpc的单机mr例子,可以给我们参考和代码借鉴。然后就是mr/coordinate.go
、mr/rpc.go
、mr/worker.go
,这三个才是我们真正要大量写代码的地方。当然可以自己加一些go文件,比如我加了一个common.go
写了一些枚举状态常量,反正只要通过测试就好,然后测试脚本是main/test-mr.sh
- 定义好一些状态枚举值
这个根据具体实现来吧,而且其实也不急,写到后面自然会加上的,主要就是当前是Map还是Reduce阶段的状态枚举、任务是哪种类型的状态枚举 - 知道怎么使用RPC,他那有例子,还挺方便的,传一个参数对象的引用和一个返回值对象的引用
- 知道要把初始传入coordinator的所有文件各自转成Map任务,是的他们每个文件就是一个Map任务。
- 知道封装Task,反正ID和文件名和Type是需要的。
- 知道对于这种发放任务的场景,要支持并发,又类似队列一个一个发放,这就可以联想到Go的chan,待发送的任务都保存在chan里面,而且发放任务的方法里最好加个锁保险
- 知道worker根据任务类型,用switch进行不同的操作。注意因为要等Map全搞完才能搞Reduce,所以需要多一个Wait状态,即worker空闲,但也要减少它的询问次数,即这种情况下Sleep。
- 要知道Map就是分词,然后要把每个词根据
key的hash值 % nReduce
归类,hash值由ihash函数得来,别忘了一开始就传入的nReduce
。要会借鉴mrsequential.go
和课程的提示,他给了将这些kv值转成json格式的代码。注意map这边只需要将输出写入本地,Reduce那边是根据文件名的格式(其实可以正则?)获取到这些输出文件的 - Reduce就是对这一系列reduceID是这个Worker的文件进行处理,把kv全部拿出来,排序,再用他给出的代码(双指针统计相同key的数目),然后再按他的要求写入最终的结果文件
- 注意!每个reduce各自有个输出就行了,不需要最后再Reduce成一个
- 再补充一下,理解一下Coordinator是一个公司高管,Worker是召之即来、挥之即去的短工,所以这些ID啊、阶段啊、还没完成的Map或Reduce任务数啊,都是由Coordinator掌管,其他人所谓的MapperID、ReducerID,都是Coordinator临时赋给他们的,不是他们天生固定的。
- 注意Worker处理完后还要再发rpc告知coordinator,他已经做完了,Coordinator算着这个阶段还差多少任务没干,干完了就前往下一阶段,注意Reduce阶段的开头,coordinator要把所有的mapper生成的文件转为Reduce阶段的任务。
- 基本做完上述就能只剩最后一个测试trace不通过了,他会随机关掉Worker,看coordinator能不能继续分配任务并最终完成这次MR。有人是worker发心跳给coordinator来证明自己活着。我是对每个发出的任务监控,如果超出10秒钟(作业里讲了10秒)就认定这个worker有问题,然后把这个任务再放到待发放队列里,让有能力的worker来拉取,反正ReduceID不变,即便那个原来的Worker在断网情况下正常完成任务,这两个worker的结果也顶多相互覆盖,结果还是一样的。我的监控机制是把发出的任务再换个队列存,记录各自的发放时间,然后每秒去检查,如果当前时间-发放时间>10,说明有问题,就将它再放入chan中,但这么写就还要改好几个地方,别忘了,比如发放时要再存一次,做完之后要删掉存的这一份。
- 一些小细节就不说了,我也忘了,比如对终止时的处理啥的
源码如下(已通过race,作为一个Go新手敲的,很多地方的使用不是很优雅)
mr/common.go
状态枚举
1 | package mr |
mr/rpc.go
1 | package mr |
mr/coordinator.go
1 | package mr |
mr/worker.go
1 | package mr |
- 标题: 6.5840-lab1-mr记录
- 作者: urlyy
- 创建于 : 2023-02-19 01:18:22
- 更新于 : 2025-03-16 01:04:15
- 链接: https://urlyy.github.io/2023/02/19/6-5840-mr-lab1记录/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论