6.5840-lab1-mr记录

urlyy

6.5840分布式(前6.824)—Lab1 MapReduce 学习记录

MapReduce

在这里插入图片描述
其次论文讨论了大量减少网络通信的优化技巧,因为带宽是瓶颈。

Lab任务

  • 实现一个分布式mr,其中包括一个coordinate和多个worker,每个worker通过RPC与coordinate通信
  • mr/coordinator.go,mr/worker.go,mr/rpc.go补充内容
  • worker负责从coordinator中读入内容,然后进行加工写入worker本地
  • coordinator负责发送任务给worker,同时如果coordinator发现一个worker无法在规定时间(10s)内完成任务,就将任务分配给其他worker
  • 给出了main/mrcoordinator.gomain/mrworker.go两个样例,不要修改他们。
  • 运行go build -buildmode=plugin ../mrapps/wc.go来将这个go文件打包成插件wc.so供Map使用。然后运行go run mrcoordinator.go pg-*.txt建立并启动coordinator,并将要处理的文本文件名传给coordinator。然后运行go run mrworker.go wc.so,读取之前生成的插件中含有的map和reduce函数,将其传给worker,让他们调用。
  • 然后给了一个测试方法,即bash ~/6.5840/src/main/test-mr.sh

在这里插入图片描述

lab的一些要求

  • map将输入文件分成多组中间文件,供多个reduce使用,同时要传递nReduce这个变量来创建coordinator
  • reduce的输出文件名格式为mr-out-X,其中X为该reduce任务的编号
  • 每个reduce文件里面的每一行,格式为"%v %v"导出的一行,分别对应key、value,方式为`fmt.Fprintf(ofile, “%v %v\n”, intermediate[i].Key, output)
  • worker的map要输出到当前目录,以方便reduce读取
  • 对于mr/coordinator.go中的Done方法要返回true,才表示这次mapreduce结束了,mrcoordinator.go会终止
  • mapreduce结束,coordinator会终止,worker也要终止,有很多种设计方法,包括当worker联系不到coordinator时就自行终止。

Lab的一些提示

  • 建议先修改mr/worker.go'Worker()来实现worker通过rpc从coordinator请求任务文件,然后修改coordinator的代码来返回给worker任务对应的文件名,然后再实现worker读取文件内容并调用map的操作
  • 中间文件的命名可以是mr-X-Y,其中X是Map任务的编号,Y是Reduce任务的编号,所以每个mapper都会产生reducer个文件的中间文件
  • 虽然reduce的输出是一行一行的kv,但是map要给reduce的输出不一定是这样,可以采用读写json的方式来让map的输出有效地被reduce读入
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //写
    enc := json.NewEncoder(file)
    for _, kv := ... {
    err := enc.Encode(&kv)


    //读
    dec := json.NewDecoder(file)
    for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
    break
    }
    kva = append(kva, kv)
    }
  • worker可以使用ihash函数来选择map的输出让哪个reduce接收
  • mrsequential.go里的读入、写入、排序操作可以自行借用
  • coordinator要注意并发问题
  • 使用go run -race. test-mr.sh来检测并发问题
  • reducer必须在所有的mapper结束之后才能开始操作,所以必须请求coordinator,对于这类轮询可以采用nio的模式,用一个handler loop来接收请求,灵活使用time.Sleep()sync.Cond,一把大锁保平安
  • coordinator不能识别worker是正常、崩掉还是处理过慢,这次lab里允许worker通过每10s的发送心跳检测来告知coordinator自己没挂,可以多加一个字段以合并请求操作和心跳操作,比如isHeart的布尔类型;或者coordinator对每一个分配出去的任务设一个定时器,时间到没做完就算worker出问题了
  • Go的rpc发送结构体时,只能发送字段名开头大写的字段,子结构体也一样
  • rpc调用call返回信息时,reply对象所有的字段都是默认值即可,不要有非默认的赋值,不然可能会出问题,就像下面这样,空构造即可
    1
    2
    reply := SomeType{}
    call(..., &reply)
  • 一些测试的东西,懒得翻译了
    在这里插入图片描述

完成过程

配置环境

  1. Go(参考我写的deepin搭建go开发环境(git、go、neovim、NvChad、Nerd Font),只要看go部分就行了)
    deepin和ubuntu都是用apt
    注意我的go版本是1.20.4,所以部分代码可能不能在较低版本的Go上编译成功
  2. Goland(因为我vsc总是报权限不足,nvim也用的繁琐,果然还得是正宗IDE靠谱)
    链接:Jetbrains全家桶破解文件
  3. 配一下Goland运行程序的方式
    参考了下别人配的Goland教程,可以减少很多的测试时间,而且也是第一次学着搭配命令行参数配置运行程序
    在这里插入图片描述在这里插入图片描述
    1
    2
    3
    go build -buildmode=plugin ../mrapps/wc.go
    rm -f mr-out*
    rm -f mr-*.json

注意Working directory很关键,因为很多程序是基于相对路径来的,你设置错运行目录了,他可能就找不到其他文件了
在这里插入图片描述
虽然课程作业里面 用命令行可以以pg-*.txt作为入参,但是Goland不惯着你,只能老老实实把每个参数敲上去
不知道为什么有的博主在Go tool arguments那里写了-i我的go版本是1.20.4,好像没这个参数了

1
pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt

同时注意这个Before launch这里,就可以执行那个删除之前生成的文件和编译插件的作用
在这里插入图片描述
在这里插入图片描述

下面这个是test的

image-20230728004926670

注意打开下面这个可以开启竞态检查

image-20230728004852814

具体实现步骤

  1. 搞清楚哪些代码是我们需要的,哪些是要改和补充的。
    实际上我们只需要看main/mrcoordinator.gomain/mrsequential.gomain/mrworker.go,好了上面这三个相当于启动类,mrsequential是一个没有用rpc的单机mr例子,可以给我们参考和代码借鉴。然后就是mr/coordinate.gomr/rpc.gomr/worker.go,这三个才是我们真正要大量写代码的地方。当然可以自己加一些go文件,比如我加了一个common.go写了一些枚举状态常量,反正只要通过测试就好,然后测试脚本是main/test-mr.sh
  2. 定义好一些状态枚举值
    这个根据具体实现来吧,而且其实也不急,写到后面自然会加上的,主要就是当前是Map还是Reduce阶段的状态枚举、任务是哪种类型的状态枚举
  3. 知道怎么使用RPC,他那有例子,还挺方便的,传一个参数对象的引用和一个返回值对象的引用
  4. 知道要把初始传入coordinator的所有文件各自转成Map任务,是的他们每个文件就是一个Map任务。
  5. 知道封装Task,反正ID和文件名和Type是需要的。
  6. 知道对于这种发放任务的场景,要支持并发,又类似队列一个一个发放,这就可以联想到Go的chan,待发送的任务都保存在chan里面,而且发放任务的方法里最好加个锁保险
  7. 知道worker根据任务类型,用switch进行不同的操作。注意因为要等Map全搞完才能搞Reduce,所以需要多一个Wait状态,即worker空闲,但也要减少它的询问次数,即这种情况下Sleep。
  8. 要知道Map就是分词,然后要把每个词根据key的hash值 % nReduce归类,hash值由ihash函数得来,别忘了一开始就传入的nReduce。要会借鉴mrsequential.go和课程的提示,他给了将这些kv值转成json格式的代码。注意map这边只需要将输出写入本地,Reduce那边是根据文件名的格式(其实可以正则?)获取到这些输出文件的
  9. Reduce就是对这一系列reduceID是这个Worker的文件进行处理,把kv全部拿出来,排序,再用他给出的代码(双指针统计相同key的数目),然后再按他的要求写入最终的结果文件
  10. 注意!每个reduce各自有个输出就行了,不需要最后再Reduce成一个
  11. 再补充一下,理解一下Coordinator是一个公司高管,Worker是召之即来、挥之即去的短工,所以这些ID啊、阶段啊、还没完成的Map或Reduce任务数啊,都是由Coordinator掌管,其他人所谓的MapperID、ReducerID,都是Coordinator临时赋给他们的,不是他们天生固定的。
  12. 注意Worker处理完后还要再发rpc告知coordinator,他已经做完了,Coordinator算着这个阶段还差多少任务没干,干完了就前往下一阶段,注意Reduce阶段的开头,coordinator要把所有的mapper生成的文件转为Reduce阶段的任务。
  13. 基本做完上述就能只剩最后一个测试trace不通过了,他会随机关掉Worker,看coordinator能不能继续分配任务并最终完成这次MR。有人是worker发心跳给coordinator来证明自己活着。我是对每个发出的任务监控,如果超出10秒钟(作业里讲了10秒)就认定这个worker有问题,然后把这个任务再放到待发放队列里,让有能力的worker来拉取,反正ReduceID不变,即便那个原来的Worker在断网情况下正常完成任务,这两个worker的结果也顶多相互覆盖,结果还是一样的。我的监控机制是把发出的任务再换个队列存,记录各自的发放时间,然后每秒去检查,如果当前时间-发放时间>10,说明有问题,就将它再放入chan中,但这么写就还要改好几个地方,别忘了,比如发放时要再存一次,做完之后要删掉存的这一份。
  14. 一些小细节就不说了,我也忘了,比如对终止时的处理啥的

源码如下(已通过race,作为一个Go新手敲的,很多地方的使用不是很优雅)

mr/common.go 状态枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package mr

const (
MapTask = iota
ReduceTask = iota
WaitTask = iota
ExitTask = iota
)

const (
MapPhase = iota
ReducePhase = iota
EndPhase = iota
)

mr/rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package mr

import (
"os"
"strconv"
)

type Task struct {
ID int
BelongPhase int
FileNames []string
}

func MakeTask(ID int, ty int, fileNames []string) *Task {
task := Task{
ID,
ty,
fileNames,
}
return &task
}

type CallArgs struct {
BelongPhase int
TaskID int
}

type CallReply struct {
Task *Task
TaskType int
ReducerNum int
}

func coordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

mr/coordinator.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package mr

import (
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"strings"
"sync"
"time"
)

type Coordinator struct {
// Your definitions here.
TaskChan chan *Task
CurPhase int
ReducerNum int
mu *sync.Mutex
MapTaskNumTODO int
ReduceTaskNumTODO int
Task2Check map[int]*CheckItem
checkTimer *time.Ticker
}

type CheckItem struct {
task *Task
startTime time.Time
}

// Your code here -- RPC handlers for the worker to call.
// 根据当前阶段分发任务
func (c *Coordinator) Assign(args *CallArgs, reply *CallReply) error {
//一把大锁防止竞争
c.mu.Lock()
defer c.mu.Unlock()
switch c.CurPhase {
case MapPhase:
if len(c.TaskChan) > 0 {
reply.ReducerNum = c.ReducerNum
reply.Task = <-c.TaskChan
ci := CheckItem{reply.Task, time.Now()}
c.Task2Check[reply.Task.ID] = &ci
reply.TaskType = MapTask
} else {
reply.TaskType = WaitTask
}
case ReducePhase:
if len(c.TaskChan) > 0 {
reply.Task = <-c.TaskChan
ci := CheckItem{reply.Task, time.Now()}
c.Task2Check[reply.Task.ID] = &ci
reply.TaskType = ReduceTask
} else {
reply.TaskType = WaitTask
}
case EndPhase:
reply.TaskType = ExitTask
}
return nil
}

func (c *Coordinator) check() {
go func() {
for {
select {
case <-c.checkTimer.C:
var expireTaskIDs []int
var expireTasks []*Task
now := time.Now()
c.mu.Lock()
for taskID, taskItem := range c.Task2Check {
//fmt.Printf("%f\n", now.Sub(taskItem.startTime).Seconds())
if now.Sub(taskItem.startTime).Seconds() > 10 {
expireTasks = append(expireTasks, taskItem.task)
expireTaskIDs = append(expireTaskIDs, taskID)
}
}
for idx, taskID := range expireTaskIDs {
delete(c.Task2Check, taskID)
c.TaskChan <- expireTasks[idx]
}
c.mu.Unlock()
}
}
}()
}

// 任务结束的通知,像个回调
func (c *Coordinator) TaskDone(args *CallArgs, reply *CallReply) error {
//fmt.Println(args.BelongPhase)
//fmt.Println(args.TaskID, args.BelongPhase)
c.mu.Lock()
if args.BelongPhase != c.CurPhase {
c.mu.Unlock()
return nil
}
switch c.CurPhase {
case MapPhase:
c.MapTaskNumTODO--
delete(c.Task2Check, args.TaskID)
if c.MapTaskNumTODO == 0 {
//开始构建Reduce任务
c.CurPhase = ReducePhase
c.makeAllReduceTask()
//fmt.Println("进入reduce")
}
case ReducePhase:
c.ReduceTaskNumTODO--
delete(c.Task2Check, args.TaskID)
if c.ReduceTaskNumTODO == 0 {
c.CurPhase = EndPhase
c.checkTimer.Stop()
}
case EndPhase:
fmt.Println("不收了")
}
c.mu.Unlock()
return nil
}

// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
// Your code here.
c.mu.Lock()
defer c.mu.Unlock()
return c.CurPhase == EndPhase
}

// 初始生成所有的map任务
func (c *Coordinator) makeAllMapTask(files []string) {
for idx, file := range files {
filenames := []string{file}
t := MakeTask(idx, MapPhase, filenames)
c.TaskChan <- t
}
}

// 进入第二阶段的开始时,生成所有的reduce任务
func (c *Coordinator) makeAllReduceTask() {
path, _ := os.Getwd()
files, _ := os.ReadDir(path)
//遍历ReducerNum遍
for reduceTaskID := 0; reduceTaskID < c.ReducerNum; reduceTaskID++ {
var filenames []string
for _, file := range files {
// 匹配对应的reduce文件
if strings.HasPrefix(file.Name(), "mr-") && strings.HasSuffix(file.Name(), strconv.Itoa(reduceTaskID)+".json") {
filenames = append(filenames, file.Name())
}
}
reduceTask := MakeTask(reduceTaskID, ReducePhase, filenames)
c.TaskChan <- reduceTask
}
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
TaskChan: make(chan *Task, 10),
CurPhase: MapPhase,
ReducerNum: nReduce,
mu: new(sync.Mutex),
MapTaskNumTODO: len(files),
ReduceTaskNumTODO: nReduce,
Task2Check: make(map[int]*CheckItem),
checkTimer: time.NewTicker(1 * time.Second), // 创建一个每隔1秒触发一次的定时器
}
c.makeAllMapTask(files)
//开始定时检查任务
c.check()
// Your code here.
c.server()
return &c
}

mr/worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package mr

import (
"encoding/json"
"fmt"
"hash/fnv"
"io"
"log"
"net/rpc"
"os"
"sort"
"strconv"
"time"
)

// Map functions return a slice of KeyValue.
type KeyValue struct {
Key string
Value string
}

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// Map任务只涉及一个File
func doMap(mapf func(string, string) []KeyValue, task *Task, reducerNum int) {
//读取文件内容
intermediate := []KeyValue{}
file, err := os.Open(task.FileNames[0])
if err != nil {
log.Fatalf("cannot open %v", task.FileNames[0])
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", task.FileNames[0])
}
file.Close()
//分词
kva := mapf(task.FileNames[0], string(content))
intermediate = append(intermediate, kva...)
sort.Sort(ByKey(intermediate))
//第一维是按key分的桶,第二维是桶内的数组
hashedKV := make([][]KeyValue, reducerNum)
for _, kv := range intermediate {
hashKey := ihash(kv.Key) % reducerNum
hashedKV[hashKey] = append(hashedKV[hashKey], kv)
}
//生成这个Map能生成的所有的reduce任务
for hashKey, kvs := range hashedKV {
output := "mr-" + strconv.Itoa(task.ID) + "-" + strconv.Itoa(hashKey) + ".json"
ofile, err := os.Create(output)
if err != nil {
log.Fatalf("cannot create %v", output)
}
enc := json.NewEncoder(ofile)
for _, kv := range kvs {
enc.Encode(kv)
}
ofile.Close()
}
}

// Reduce任务涉及多个File
func doReduce(reducef func(string, []string) string, task *Task) {
//1.取出这个reduce任务负责的所有的kv,然后按key排序
var kvs []KeyValue
for _, filename := range task.FileNames {
file, _ := os.Open(filename)
dec := json.NewDecoder(file)
for {
var kv KeyValue
//一报错就跳出循环
if err := dec.Decode(&kv); err != nil {
break
}
kvs = append(kvs, kv)
}
file.Close()
}
sort.Sort(ByKey(kvs))
oname := "mr-out-" + strconv.Itoa(task.ID)
ofile, _ := os.Create(oname)
//2.双指针 统计每个key的个数
i := 0
for i < len(kvs) {
j := i + 1
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
//将统计结果写入文件
output := reducef(kvs[i].Key, values)
fmt.Fprintf(ofile, "%v %v\n", kvs[i].Key, output)
//继续下一个key
i = j
}
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
for {
reply := AskTask()
switch reply.TaskType {
case WaitTask:
time.Sleep(1 * time.Second)
case MapTask:
doMap(mapf, reply.Task, reply.ReducerNum)
callDone(MapPhase, reply.Task.ID)
case ReduceTask:
doReduce(reducef, reply.Task)
callDone(ReducePhase, reply.Task.ID)
case ExitTask:
os.Exit(66)
default:
break
}
}
}

func AskTask() CallReply {
args := CallArgs{}
reply := CallReply{}
call("Coordinator.Assign", &args, &reply)
return reply
}

func callDone(belongPhase int, taskID int) CallReply {
args := CallArgs{belongPhase, taskID}
reply := CallReply{}
ok := call("Coordinator.TaskDone", &args, &reply)
if !ok {
fmt.Printf("coordinator把我抛弃了\n")
os.Exit(66)
}
return reply
}

// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()

err = c.Call(rpcname, args, reply)
if err == nil {
return true
}

fmt.Println(err)
return false
}
  • 标题: 6.5840-lab1-mr记录
  • 作者: urlyy
  • 创建于 : 2023-02-19 01:18:22
  • 更新于 : 2023-07-29 01:59:51
  • 链接: https://urlyy.github.io/2023/02/19/6-5840-mr-lab1记录/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。