MIT6.5840(6.824) Lab1: MapReduce

1 实验介绍

本次实验是实现一个简易版本的MapReduce编程框架,官方文档在这里:lab1文档, 强烈建议先阅读MapReduce论文, 难度主要体现在设计上, 实际的代码实现倒是相对简单, 这也得益于go的语言特性, 比CMU15445使用Cpp写代码方便多了。实验需要实现的是CoordinatorWorker的设计, 具体实现细节十分自由(无从下手)

我的代码实现点这里: https://github.com/ToniXWD/MIT6.5840/tree/lab1

2 既有框架解读

解读现有的框架设计是第一步。

2.1 代码解读

  1. 阅读src/main/mrcoordinator.go可知:
    服务进程通过MakeCoordinator启动了一个Coordinator c, c.server()中启用了一个协程用于接受RPC调用:go http.Serve(l, nil), 需要注意的是, 在 Go 的 net/http 包中, 使用 http.Serve(l, nil) 启动 HTTP 服务器以侦听和处理请求时,服务器会为每个进来的请求自动启动一个新的协程。这意味着每个 RPC 调用都是在其自己的独立协程中被处理的,允许并发处理多个请求。因此, 我们的设计可能需要使用锁等同步原语实现共享资源的保护, 同时Coordinator不会主动与Worker通信(除非自己额外实现), 只能通过WorkerRPC通信来完成任务。同时, 当所有任务完成时, Done方法将会返回false, 从而将Coordinator关闭。
  2. 阅读src/main/mrworker.go
    可以得知,mrworker.go仅仅通过Worker函数来运行, 因此Worker函数需要完成请求任务、执行任务、报告任务执行状态等多种任务。因此可以猜测,Worker需要再这个函数里不断地轮训Coordinator,根据Coordinator的不同回复驱使当前Worker完成各种任务。

2.2 任务误区解读

  1. MapReduce任务、CoordinatorWorker的关系如何?
    这些任务(文中此后称为Task)与Worker是什么关系呢? 是否存在对应关系? 这些对应关系需要记录吗? 通常, 在常见的主从关系中, 主节点需要记录从节点的信息,例如线程id等表名身份的信息, 但在我们的MapReduce中却没有这样的必要, 因为Worker节点是可以扩容的, 而CoordinatorWorker之间只有传递Task相关信息的需求, 因此Coordinator只需要记录Task任务的状态即可, Task分配给Worker后执行可能成功或失败, 因此Coordinator还需要维护任务执行的时间信息, 以便在超时后重新分配任务。因此,MapReduce任务、CoordinatorWorker的关系可以参考下图:

    MapReduce_relation

    Worker可能在不同时间执行不同的Task, 也可能什么也不做(初始状态或等候所有Map Task完成时可能会闲置)

  2. MapReduce任务有多少个? 如何分配?

    • Map Task实际上在此实验中被简化了, 每个Map Task的任务就是处理一个.txt文件, 因此Map Task的数量实际上就是.txt文件的数量。 因此, 每个.txt文件对应的Map Task需要Coordinator记录其执行情况并追踪。
    • Reduce Task的数量是nReduce。由于Map Task会将文件的内容分割为指定的nReduce份, 每一份应当由序号标明, 拥有这样的序号的多个Map Task的输出汇总起来就是对应的Reduce Task的输入。
  3. 中间文件的格式是怎么样的? Reduce任务如何选择中间文件作为输入?
    因为Map Task分割采用的是统一的哈希函数ihash, 所以相同的key一定会被Map Task输出到格式相同的中间文件上。例如在wc任务中, Map Task 1Map Task 2输入文件中都存在hello这个词, Map Task 1中所有的hello会被输出到mr-out-1-5这个中间文件, 1代表Map Task序号, 5代表被哈希值取模的结果。那么,Map Task 2中所有的hello会被输出到mr-out-2-5这个中间文件。那么Reduce Task 5读取的就是形如mr-out-*-5这样的文件。

3 设计与实现

3.1 RPC设计

3.1.1. 消息类型

,通信时首先需要确定这个消息是什么类型, 通过前述分析可知, 通信的信息类型包括:

  • Worker请求任务
  • Coordinator分配ReduceMap任务
  • Worker报告ReduceMap任务的执行情况(成功或失败)
  • Coordinator告知Worker休眠(暂时没有任务需要执行)
  • Coordinator告知Worker退出(所有任务执行成功)

每一种消息类型会需要附带额外的信息, 例如Coordinator分配任务需要告知任务的ID, Map任务还需要告知NReduce,和输入文件名。
综上考虑, 消息类型的定义如下(SendReply是从Worker视角出发的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const (
AskForTask MsgType = iota // `Worker`请求任务
MapTaskAlloc // `Coordinator`分配`Map`任务
ReduceTaskAlloc // `Coordinator`分配`Reduce`任务
MapSuccess // `Worker`报告`Map`任务的执行成功
MapFailed // `Worker`报告`Map`任务的执行失败
ReduceSuccess // `Worker`报告`Reduce`任务的执行成功
ReduceFailed //`Worker`报告`Reduce`任务的执行失败
Shutdown // `Coordinator`告知`Worker`退出(所有任务执行成功)
Wait //`Coordinator`告知`Worker`休眠(暂时没有任务需要执行)
)

type MessageSend struct {
MsgType MsgType
TaskID int // `Worker`回复的消息类型如MapSuccess等需要使用
}

type MessageReply struct {
MsgType MsgType
NReduce int // MapTaskAlloc需要告诉Map Task 切分的数量
TaskID int // 任务Id用于选取输入文件
TaskName string // MapSuccess专用: 告知输入.txt文件的名字
}

3.1.2 通信函数设计

在我的设计中,Worker只需要有2个动作:

  • Coordinator请求Task
  • Coordinator报告之前的Task的执行情况

因此, worker.go中通信函数应该是下面的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func CallForReportStatus(succesType MsgType, taskID int) error {
// 报告Task执行情况
// declare an argument structure.
args := MessageSend{
MsgType: succesType,
TaskID: taskID,
}

err := call("Coordinator.NoticeResult", &args, nil)

return err
}
func CallForTask() *MessageReply {
// 请求一个Task
// declare an argument structure.
args := MessageSend{
MsgType: AskForTask,
}

// declare a reply structure.
reply := MessageReply{}

// send the RPC request, wait for the reply.
err := call("Coordinator.AskForTask", &args, &reply)
if err == nil {
// fmt.Printf("TaskName %v, NReduce %v, taskID %v\n", reply.TaskName, reply.NReduce, reply.TaskID)
return &reply
} else {
// log.Println(err.Error())
return nil
}
}

coordinator.go有相应的处理函数:

1
2
func (c *Coordinator) AskForTask(req *MessageSend, reply *MessageReply) error {}
func (c *Coordinator) NoticeResult(req *MessageSend, reply *MessageReply) error {}

这些处理函数则需要进一步的设计。

3.2 Worker设计

3.2.1 Worker主函数设计

由之前的分析可以看出,Woker所做的内容就是不断的请求任务、执行任务和回复任务执行情况,因此,可以很容易地写出Worker函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.

for {
// 循环请求
replyMsg := CallForTask()

switch replyMsg.MsgType {
case MapTaskAlloc:
err := HandleMapTask(replyMsg, mapf)
if err == nil {
_ = CallForReportStatus(MapSuccess, replyMsg.TaskID)
} else {
// log.Println("Worker: Map Task failed")
_ = CallForReportStatus(MapFailed, replyMsg.TaskID)
}
case ReduceTaskAlloc:
err := HandleReduceTask(replyMsg, reducef)
if err == nil {
_ = CallForReportStatus(ReduceSuccess, replyMsg.TaskID)
} else {
// log.Println("Worker: Map Task failed")
_ = CallForReportStatus(ReduceFailed, replyMsg.TaskID)
}
case Wait:
time.Sleep(time.Second * 10)
case Shutdown:
os.Exit(0)
}
time.Sleep(time.Second)
}
}

3.2.2 Map Task执行函数

HandleMapTask函数是执行具体的MapTask, 这样部分很简单, 可以从mrsequential.go中偷代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) error {
file, err := os.Open(reply.TaskName)
if err != nil {
return err
}
defer file.Close()

content, err := io.ReadAll(file)
if err != nil {
return err
}

// 进行mapf
kva := mapf(reply.TaskName, string(content))
sort.Sort(ByKey(kva))

oname_prefix := "mr-out-" + strconv.Itoa(reply.TaskID) + "-"

key_group := map[string][]string{}
for _, kv := range kva {
key_group[kv.Key] = append(key_group[kv.Key], kv.Value)
}

// 先清理可能存在的垃圾
// TODO: 原子重命名的方法
_ = DelFileByMapId(reply.TaskID, "./")

for key, values := range key_group {
redId := ihash(key)
oname := oname_prefix + strconv.Itoa(redId%reply.NReduce)
var ofile *os.File
if _, err := os.Stat(oname); os.IsNotExist(err) {
// 文件夹不存在
ofile, _ = os.Create(oname)
} else {
ofile, _ = os.OpenFile(oname, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
}
enc := json.NewEncoder(ofile)
for _, value := range values {
err := enc.Encode(&KeyValue{Key: key, Value: value})
if err != nil {
ofile.Close()
return err
}
}
ofile.Close()
}
return nil
}

虽然偷了很多代码, 但是有家店需要注意, 因为之前的Worker可能写入了一部分数据到中间文件后失败的情况, 之后Coordinator重新分配任务时, 文件是可能存在脏数据的, 因此需要先执行清理:

1
_ = DelFileByMapId(reply.TaskID, "./")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func DelFileByMapId(targetNumber int, path string) error {
// 创建正则表达式,X 是可变的指定数字
pattern := fmt.Sprintf(`^mr-out-%d-\d+$`, targetNumber)
regex, err := regexp.Compile(pattern)
if err != nil {
return err
}

// 读取当前目录中的文件
files, err := os.ReadDir(path)
if err != nil {
return err
}

// 遍历文件,查找匹配的文件
for _, file := range files {
if file.IsDir() {
continue // 跳过目录
}
fileName := file.Name()
if regex.MatchString(fileName) {
// 匹配到了文件,删除它
filePath := filepath.Join(path, file.Name())
err := os.Remove(filePath)
if err != nil {
return err
}
}
}
return nil
}

DelFileByMapId函数删除特定Map Task的输出文件, 但这样的执行存在一定隐患:

  • 首先是Coordinator只能重新分配一个Worker执行Coordinator认为死掉的任务, 这一点可以通过加锁和记录时间戳完成, 后续会讲到
  • 其次, 如果之前死掉的Worker又活了,其可能和现在的Worker的输出又有交叉了

因此, 目前的实现是存在一定漏洞的, 有以下的改进方案:

  1. 参考官方的提示, 先为当前的中间文件使用临时名, 完成操作后再进行原子重命名
  2. 通过加文件锁的方式保护文件

Ps: 后续有时间再改代码吧

3.2.3 Reduce Task执行函数

Reduce Task手机对应序号的中间文件, 汇总后应用指定的reduce函数,实现也比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func HandleReduceTask(reply *MessageReply, reducef func(string, []string) string) error {
key_id := reply.TaskID

k_vs := map[string][]string{}

fileList, err := ReadSpecificFile(key_id, "./")

if err != nil {
return err
}

// 整理所有的中间文件
for _, file := range fileList {
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
k_vs[kv.Key] = append(k_vs[kv.Key], kv.Value)
}
file.Close()
}

// 获取所有的键并排序
var keys []string
for k := range k_vs {
keys = append(keys, k)
}
sort.Strings(keys)

oname := "mr-out-" + strconv.Itoa(reply.TaskID)
ofile, err := os.Create(oname)
if err != nil {
return err
}
defer ofile.Close()

for _, key := range keys {
output := reducef(key, k_vs[key])
_, err := fmt.Fprintf(ofile, "%v %v\n", key, output)
if err != nil {
return err
}
}

DelFileByReduceId(reply.TaskID, "./")

return nil
}

需要注意的是, 我收集文件内容是使用了map, 而迭代map时, key的顺序是随机的, 因此需要先进行对key排序的操作:

1
2
3
4
5
6
// 获取所有的键并排序
var keys []string
for k := range k_vs {
keys = append(keys, k)
}
sort.Strings(keys)

其实这里也存在漏洞:同样就是死了的Worker突然复活了怎么办的问题,相比Map Task来说, 这里虽然相对不容易出错, 因为这里对多个中间文件只存在读取而不存在写入, 将内容读取到内存中不会有冲突的。出错只可能在将数据在写入到指定的文件时, os.Create(oname)也会存在竞争条件:因为网络等问题,Coordinator启动了多个Worker, 多个Worker同时运行,并且都尝试创建同一个文件名oname, 假设一个Worker先创建了oname并写入了一部分数据,当另一个Worker再次调用os.Create(oname)时,之前的数据将会被清空。这意味着第一个Worker在接下来的写操作中不会出现错误,但它写入的部分数据会丢失,因为第二个Worker已经截断了文件。

因此, 目前的实现是也存在一定漏洞的, 有以下的改进方案:

  1. 参考官方的提示, 先为当前的中间文件使用临时名, 完成操作后再进行原子重命名
  2. 通过加文件锁的方式保护文件

3.3 Coordinator设计

3.3.1 TaskInfo设计

首先需要考虑的是, 如何维护Task的执行信息, Task执行状态包括了: 未执行、执行者、执行失败、执行完成。
这里有一个很重要的问题需要考虑, 超时的任务时什么状态呢?因为在我的设计中,CoordinatorWorker是通过RPC来驱动彼此运行的, 当然你也可以启动一个goroutine间隔地检查是否超时, 但为了使设计更简单, 我们可以这样设计检查超时的方案:

  1. 为每个Worker分配Task时需要记录Task被分配的时间戳, 并将其状态置为running
  2. 为每个Worker分配Task, 遍历存储TaskInfo的数据结构, 检查每一个状态为runningTask的时间戳是否与当前时间戳差距大于10s, 如果是, 则代表这个Task超时了, 立即将它分配给当前请求的Worker, 并更新其时间戳
  3. 如果导致Task超时的老旧的Woker之后又完成了, 结果也就是这个Task返回了多次执行成功的报告而已, 可忽略

PS: Worker执行失败有2种, 一种是Worker没有崩溃但发现了error, 这时Worker会将错误报告给Coordinator, Coordinator会将其状态设置为failed, 另一种情况是Worker崩溃了, 连通知都做不到, 这就以超时体现出来, 处理好超时即可

因此, 我如下设计TaskInfo的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type taskStatus int

// Task 状态
const (
idle taskStatus = iota // 闲置未分配
running // 正在运行
finished // 完成
failed //失败
)

// Map Task 执行状态
type MapTaskInfo struct {
TaskId int // Task 序号
Status taskStatus // 执行状态
StartTime int64 // 开始执行时间戳
}

// Reduce Task 执行状态
type ReduceTaskInfo struct {
// ReduceTask的 序号 由数组下标决定, 不进行额外存储
Status taskStatus // 执行状态
StartTime int64 // 开始执行时间戳
}

type Coordinator struct {
// Your definitions here.
NReduce int // the number of reduce tasks to use.
MapTasks map[string]*MapTaskInfo //MapTaskInfo
mu sync.Mutex // 一把大锁保平安
ReduceTasks []*ReduceTaskInfo // ReduceTaskInfo
}

添加TaskInfo的初始化方法, 并在MakeCoordinator中调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *Coordinator) initTask(files []string) {
for idx, fileName := range files {
c.MapTasks[fileName] = &MapTaskInfo{
TaskId: idx,
Status: idle,
}
}
for idx := range c.ReduceTasks {
c.ReduceTasks[idx] = &ReduceTaskInfo{
Status: idle,
}
}
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
NReduce: nReduce,
MapTasks: make(map[string]*MapTaskInfo),
ReduceTasks: make([]*ReduceTaskInfo, nReduce),
}

// Your code here.
// 由于每一个文件名就是一个map task ,需要初始化任务状态
c.initTask(files)

c.server()
return &c
}

3.3.2 RPC 响应函数-AskForTask

这部分算是较为复杂的, 其逻辑如下:

  1. 如果有闲置的任务(idle)和之前执行失败(failed)的Map Task, 选择这个任务进行分配
  2. 如果检查到有超时的任务Map Task, 选择这个任务进行分配
  3. 如果以上的Map Task均不存在, 但Map Task又没有全部执行完成, 告知Worker先等待
  4. Map Task全部执行完成的情况下, 按照12相同的逻辑进行Reduce Task的分配
  5. 所有的Task都执行完成了, 告知Worker退出

因此, AskForTask代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func (c *Coordinator) AskForTask(req *MessageSend, reply *MessageReply) error {
if req.MsgType != AskForTask {
return BadMsgType
}
// 选择一个任务返回给worker
c.mu.Lock()
defer c.mu.Unlock()

count_map_success := 0
for fileName, taskinfo := range c.MapTasks {
alloc := false

if taskinfo.Status == idle || taskinfo.Status == failed {
// 选择闲置或者失败的任务
alloc = true
} else if taskinfo.Status == running {
// 判断其是否超时, 超时则重新派发
curTime := time.Now().Unix()
if curTime-taskinfo.StartTime > 10 {
taskinfo.StartTime = curTime
alloc = true
}
} else {
count_map_success++
}

if alloc {
// 将未分配的任务和已经失败的任务分配给这个worker
reply.MsgType = MapTaskAlloc
reply.TaskName = fileName
reply.NReduce = c.NReduce
reply.TaskID = taskinfo.TaskId

// log.Printf("coordinator: apply Map Task: taskID = %v\n", reply.TaskID)

// 修改状态信息
taskinfo.Status = running
taskinfo.StartTime = time.Now().Unix()
return nil
}
}

if count_map_success < len(c.MapTasks) {
// map任务没有可以分配的, 但都还未完成
reply.MsgType = Wait
return nil
}

count_reduce_success := 0
// 运行到这里说明map任务都已经完成
for idx, taskinfo := range c.ReduceTasks {
alloc := false
if taskinfo.Status == idle || taskinfo.Status == failed {
alloc = true
} else if taskinfo.Status == running {
// 判断其是否超时, 超时则重新派发
curTime := time.Now().Unix()
if curTime-taskinfo.StartTime > 10 {
taskinfo.StartTime = curTime
alloc = true
}
} else {
count_reduce_success++
}

if alloc {
// 分配给其一个Reduce任务
reply.MsgType = ReduceTaskAlloc
reply.TaskID = idx

// log.Printf("coordinator: apply Reduce Task: taskID = %v\n", reply.TaskID)

taskinfo.Status = running
taskinfo.StartTime = time.Now().Unix()
return nil
}
}

if count_reduce_success < len(c.ReduceTasks) {
// reduce任务没有可以分配的, 但都还未完成
reply.MsgType = Wait
return nil
}

// 运行到这里说明所有任务都已经完成
reply.MsgType = Shutdown

return nil
}

在这里, 我对数据的保护是一把大锁保平安, 这其实可以优化的

3.3.3 RPC 响应函数-NoticeResult

这个函数就简单很多了, 只需要改变对应TaskInfo的状态即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *Coordinator) NoticeResult(req *MessageSend, reply *MessageReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if req.MsgType == MapSuccess {
for _, v := range c.MapTasks {
if v.TaskId == req.TaskID {
v.Status = finished
// log.Printf("coordinator: map task%v finished\n", v.TaskId)
break
}
}
} else if req.MsgType == ReduceSuccess {
c.ReduceTasks[req.TaskID].Status = finished
// log.Printf("coordinator: reduce task%v finished\n", req.TaskID)
} else if req.MsgType == MapFailed {
for _, v := range c.MapTasks {
if v.TaskId == req.TaskID {
v.Status = failed
// log.Printf("coordinator: map task%v failed\n", v.TaskId)
break
}
}
} else if req.MsgType == ReduceFailed {
c.ReduceTasks[req.TaskID].Status = failed
// log.Printf("coordinator: reduce task%v failed\n", req.TaskID)
}
return nil
}

3.3.4 Done方法

Done方法是最简单的, 遍历TaskInfo的数据结构, 如果全部完成则返回True,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *Coordinator) Done() bool {
// Your code here.
// 先确认mapTask完成
for _, taskinfo := range c.MapTasks {
if taskinfo.Status != finished {
return false
}
}

// fmt.Println("Coordinator: All map task finished")

// 再确认Reduce Task 完成
for _, taskinfo := range c.ReduceTasks {
if taskinfo.Status != finished {
return false
}
}

// fmt.Println("Coordinator: All reduce task finished")

// time.Sleep(time.Second * 5)

return true
}

有一个小细节, time.Sleep(time.Second * 5)是为了让Coordinator延迟关闭, 这样可以留出时间告知Worker退出, 也可以直接注释掉它, 让测试跑得更快

4 测试和优化

4.1 原实现测试

运行测试:

1
$ time bash test-mr.sh

结果如下图, 耗时3m3s

MapReduce-Test

4.2 优化

4.2.1 原子重命名

按照官方提示, 使用原子重命名避免竞争, 修改HandleMapTask函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) error {
file, err := os.Open(reply.TaskName)
if err != nil {
return err
}
defer file.Close()

content, err := io.ReadAll(file)
if err != nil {
return err
}

kva := mapf(reply.TaskName, string(content))
sort.Sort(ByKey(kva))

tempFiles := make([]*os.File, reply.NReduce)
encoders := make([]*json.Encoder, reply.NReduce)

for _, kv := range kva {
redId := ihash(kv.Key) % reply.NReduce
if encoders[redId] == nil {
tempFile, err := ioutil.TempFile("", fmt.Sprintf("mr-map-tmp-%d", redId))
if err != nil {
return err
}
defer tempFile.Close()
tempFiles[redId] = tempFile
encoders[redId] = json.NewEncoder(tempFile)
}
err := encoders[redId].Encode(&kv)
if err != nil {
return err
}
}

for i, file := range tempFiles {
if file != nil {
fileName := file.Name()
file.Close()
newName := fmt.Sprintf("mr-out-%d-%d", reply.TaskID, i)
if err := os.Rename(fileName, newName); err != nil {
return err
}
}
}

return nil
}

运行测试:

1
$ time bash test-mr.sh

结果如下图, 耗时2m52s, 原子重命名节省了原实现中的清理耗费的时间, 节约了差不多10s的样子

MapReduce-Test-2

4.2.2 锁细化

我们可以发现, 在Worker请求任务时, Map Task是需要先全部执行成功的, 因此我们可以增加一个字段记录Map Task是否全部完成, 同时为MapTaskInfoReduceTaskInfo分别实现设计一个锁来取代原来的大锁, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
type Coordinator struct {
// Your definitions here.
NReduce int // the number of reduce tasks to use.
MapTasks map[string]*MapTaskInfo //MapTaskInfo
MapSuccess bool // Map Task 是否全部完成
muMap sync.Mutex // Map 锁, 保护 MapTasks
ReduceTasks []*ReduceTaskInfo // ReduceTaskInfo
ReduceSuccess bool // Reduce Task 是否全部完成
muReduce sync.Mutex // Reduce 锁, 保护 ReduceTasks
}

func (c *Coordinator) AskForTask(req *MessageSend, reply *MessageReply) error {
if req.MsgType != AskForTask {
return BadMsgType
}
if !c.MapSuccess {
// 选择一个 Map Task 返回给worker

c.muMap.Lock()

count_map_success := 0
for fileName, taskinfo := range c.MapTasks {
alloc := false

if taskinfo.Status == idle || taskinfo.Status == failed {
// 选择闲置或者失败的任务
alloc = true
} else if taskinfo.Status == running {
// 判断其是否超时, 超时则重新派发
curTime := time.Now().Unix()
if curTime-taskinfo.StartTime > 10 {
taskinfo.StartTime = curTime
alloc = true
}
} else {
count_map_success++
}

if alloc {
// 将未分配的任务和已经失败的任务分配给这个worker
reply.MsgType = MapTaskAlloc
reply.TaskName = fileName
reply.NReduce = c.NReduce
reply.TaskID = taskinfo.TaskId

// log.Printf("coordinator: apply Map Task: taskID = %v\n", reply.TaskID)

// 修改状态信息
taskinfo.Status = running
taskinfo.StartTime = time.Now().Unix()
c.muMap.Unlock()
return nil
}
}

c.muMap.Unlock()

if count_map_success < len(c.MapTasks) {
// map任务没有可以分配的, 但都还未完成
reply.MsgType = Wait
return nil
} else {
c.MapSuccess = true
}
}

if !c.ReduceSuccess {
// 选择一个 Reduce Task 返回给worker
c.muReduce.Lock()

count_reduce_success := 0
// 运行到这里说明map任务都已经完成
for idx, taskinfo := range c.ReduceTasks {
alloc := false
if taskinfo.Status == idle || taskinfo.Status == failed {
alloc = true
} else if taskinfo.Status == running {
// 判断其是否超时, 超时则重新派发
curTime := time.Now().Unix()
if curTime-taskinfo.StartTime > 10 {
taskinfo.StartTime = curTime
alloc = true
}
} else {
count_reduce_success++
}

if alloc {
// 分配给其一个Reduce任务
reply.MsgType = ReduceTaskAlloc
reply.TaskID = idx

// log.Printf("coordinator: apply Reduce Task: taskID = %v\n", reply.TaskID)

taskinfo.Status = running
taskinfo.StartTime = time.Now().Unix()

c.muReduce.Unlock()
return nil
}
}

c.muReduce.Unlock()

if count_reduce_success < len(c.ReduceTasks) {
// reduce任务没有可以分配的, 但都还未完成
reply.MsgType = Wait
return nil
} else {
c.ReduceSuccess = true
}
}

// 运行到这里说明所有任务都已经完成
reply.MsgType = Shutdown

return nil
}

运行测试:

1
$ time bash test-mr.sh

结果如下图, 耗时2m39s, 原子重命名节省了原实现中的清理耗费的时间, 又节约了差不多13s的样子
另外别忘了NoticeResult也要相应地修改, 由于比较简单就不列出来了, 可以直接看仓库代码。

MapReduce-Test-3