Lab-1 MapReduce
之前的版本是用 goroutine 写的,写错了,重新梳理一下想法:
首先,Master 启动并初始化状态,然后等待 Worker 来请求,此时的 Master 并不知道有多少台 Worker 工作,因此他会维护一个 WorkerState 的数组,此时这个数组为空,当 Worker 来连接 Master 的时候,Master 在数组里为其加入状态,并为其添加 ID 号,并将其作为结果返回 Worker,此时 Worker 就知道自己的 ID 号了,在之后的请求中 Worker 将会带着这个 ID 号以便 Master 维护 Worker 的状态。
当 Worker 每次向 Master 请求并拿到 task 的时候,Worker 将会开启一个定时器来记录工作时间是否超时,倘若超时的话则说明 Worker 已经崩溃了,此时则将当前 Worker 运行的任务标记为 Idle, 当其他 Worker 请求的时候发送给其他 Worker。计时器计划采用 goroutine 来实现,为每一个 Worker 维护一个计时器,使用 channel 来发送消息,表示 Worker 是否超时,倘若超时则采取对应的策略。当所有 Map 任务都完成而此时 Reduce 数据还未处理完的时候则向对应的 Worker 发送 Wait 状态使 Worker 过一段时间再来请求。
Master 需要处理 Map 产生的中间文件并将其分到对应的 Bucket 中,这里打算使用 goroutine 来异步处理,当 Reduce 被分到对应的 Bucket 的时候,goroutine 向 Master 发送消息,此时当有 Worker 来请求任务时则向其派发 Reduce 任务。
当仍然有 Reduce 未完成时,此时当 Worker 来请求不能直接向 Worker 响应 Exit,因为有可能其他 Worker 宕机的情况,此时应当返回 Wait 使 Worker 处于等待状态,那么如果有 Worker 宕机了,Master 则将记录的 Worker 运行的任务发送给来请求的 Worker 重新运行。
一些数据结构的定义:
1 2 3 4 5 6
|
type MapTask struct { MapID int FileName string }
|
1 2 3 4 5 6
|
type ReduceTask struct { ReduceID int Bucket ReduceBucket }
|
1 2 3 4 5 6 7 8
|
type WorkersState struct { WID int WStatus int MTask MapTask RTask ReduceTask }
|
关于 Master 与 Worker 互相通信的结构体定义如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
type TaskRequest struct { WID int }
type TaskResponse struct { WID int TaskStatus int MapTask MapTask ReduceTask ReduceTask }
|
Master 所维护的状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| type Coordinator struct { nMap int nReduce int MapStateLock sync.RWMutex MapState []int
ReduceStateLock sync.RWMutex ReduceState []int Files []string
Buckets []ReduceBucket
WStates []WorkersState WorkerLock sync.RWMutex
TimerChans []chan int
IsReduceReady atomic.Value
TaskEnd atomic.Value }
|
Worker 所维护的状态:
1 2 3 4 5
| type WorkerManager struct { WID int MapF func(string, string) []KeyValue ReduceF func(string, []string) string }
|
关于 Worker 中的行为较为简单,只需在每次任务完成后向 Master 发送 RPC 请求即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { var manager WorkerManager manager.WID = -1 manager.MapF = mapf manager.ReduceF = reducef
for { req := TaskRequest{ WID: manager.WID, } rsp := TaskResponse{} call("Coordinator.RequestTask", &req, &rsp)
if manager.WID == -1 { manager.WID = rsp.WID } switch rsp.TaskStatus {
case Wait: fmt.Printf("[Wait] Worker %v wait.\n", manager.WID) time.Sleep(1 * time.Second) case RunMapTask: fmt.Printf("[Map Task] Worker %v run map task %v.\n", manager.WID, rsp.MapTask.MapID) RunMapJob(rsp.MapTask, manager.MapF) case RunReduceTask: fmt.Printf("[Map Task] Worker %v run reduce task %v.\n", manager.WID, rsp.ReduceTask.ReduceID) RunReduceJob(rsp.ReduceTask, manager.ReduceF) case Exit: fmt.Printf("[Exit] Worker %v exit.\n", manager.WID) RunExitJob() return } }
}
|
关于 Master 的核心处理逻辑则较为复杂,因为它需要调度 Worker 去运行不同的任务,并判断是否有 Worker 宕机了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| func (c *Coordinator) RequestTask(req *TaskRequest, rsp *TaskResponse) error { c.UpdateTaskState(req.WID) if req.WID == -1 { rsp.WID = len(c.WStates) c.WorkerLock.Lock() c.WStates = append(c.WStates, WorkersState{}) c.WorkerLock.Unlock() c.TimerChans = append(c.TimerChans, make(chan int, 1)) }
var WID int if req.WID == -1 { WID = rsp.WID } else { WID = req.WID }
for i := 0; i < len(c.MapState); i++ { if c.MapState[i] == Idle { c.MapStateLock.Lock() c.MapState[i] = Progress c.MapStateLock.Unlock()
fmt.Printf("[Map Task] 此时 Worker %v 运行 %v 号任务.\n", WID, i)
rsp.TaskStatus = RunMapTask rsp.MapTask = MapTask{ MapID: i, FileName: c.Files[i], }
c.WorkerLock.Lock() c.WStates[WID].WStatus = RunMapTask c.WStates[WID].MTask = rsp.MapTask c.WorkerLock.Unlock() go c.StartTimer(WID, c.TimerChans[WID]) return nil } }
if c.IsReduceReady.Load() == false { rsp.WID = WID rsp.TaskStatus = Wait c.WorkerLock.Lock() c.WStates[WID].WStatus = Wait c.WorkerLock.Unlock() return nil }
for i := 0; i < len(c.ReduceState); i++ { if c.ReduceState[i] == Idle { c.ReduceStateLock.Lock() c.ReduceState[i] = Progress c.ReduceStateLock.Unlock()
fmt.Printf("[Reduce Task] 此时 Worker %v 运行 %v 号任务.\n", WID, i)
rsp.TaskStatus = RunReduceTask rsp.ReduceTask = ReduceTask{ ReduceID: i, Bucket: c.Buckets[i], } c.WorkerLock.Lock() c.WStates[WID] = WorkersState{ WID: WID, WStatus: RunReduceTask, RTask: ReduceTask{ ReduceID: i, Bucket: c.Buckets[i], }, } c.WorkerLock.Unlock() go c.StartTimer(WID, c.TimerChans[WID]) return nil } }
if c.TaskEnd.Load() == false { rsp.WID = WID rsp.TaskStatus = Wait c.WorkerLock.Lock() c.WStates[WID] = WorkersState{ WID: WID, WStatus: Wait, } c.WorkerLock.Unlock() return nil }
rsp.TaskStatus = Exit c.WorkerLock.Lock() c.WStates[WID].WStatus = Exit c.WorkerLock.Unlock()
return nil }
|
通过的测试用例:
- [x] wc
- [x] indexer
- [x] jobcount
- [x] mtiming
- [x] rtiming
- [x] early_exit
- [x] nocrash
- [x] crash