MIT6.5840(6.824) Lab2: Raft 2C

本文将介绍lab2C部分的实现, lab2C要求实现raft中的持久化功能, 相比lab2A,和lab2B, 本节的难度其实很小, 但复杂的是lab2A,和lab2B中的一些微小的bug会在2C中显现, 并且相对不太容易注意到。

Lab文档见: https://pdos.csail.mit.edu/6.824/labs/lab-raft.html

我的代码: https://github.com/ToniXWD/MIT6.5840/tree/lab2C

raft原论文

1 bug修复:重复的RPC

在之前的AppendEntries中有这样的代码:

1
2
3
4
5
6
if len(args.Entries) != 0 && len(rf.log) > args.PrevLogIndex+1 && rf.log[args.PrevLogIndex+1].Term != args.Entries[0].Term {
// 发生了冲突, 移除冲突位置开始后面所有的内容
DPrintf("server %v 的log与args发生冲突, 进行移除\n", rf.me)
rf.log = rf.log[:args.PrevLogIndex+1]
}
rf.log = append(rf.log, args.Entries...)

这段代码所做的事情是, 如果将要追加的位置存在日志项, 且日志项与RPC中的日子切片的第一个发生冲突(Term不匹配), 则将冲突位置及其之后的日志项清除掉。

这段代码看起来没有问题,但在高并发的场景下,会存在如下问题:

  1. Leader先发送了AppendEntries RPC, 我们记为RPC1
  2. Follower收到RPC1, 发生上述代码描述的冲突, 将冲突部分的内容清除, 并追加RPC1中的日志切片
  3. 由于并发程度高, LeaderRPC1没有收到回复时又发送了下一个AppendEntries RPC, 由于nextIndexmatchIndex只有在收到回复后才会修改, 因此这个新的AppendEntries RPC, 我们记为RPC2, 与RPC1是一致的
  4. Follower收到RPC2, 由于RPC2RPC1完全相同, 因此其一定不会发生冲突, 结果是Follower将相同的一个日志项切片追加了2次!

在考虑另一个场景:

  1. Leader先发送了AppendEntries RPC, 我们记为RPC1
  2. 由于网络问题, RPC1没有即时到达Follower
  3. Leader又追加了新的log, 此时又发送了AppendEntries RPC, 我们记为RPC2
  4. 由于网络问题, 后发送的RPC2先到达Follower, FollowerRPC2的日志项追加
  5. 此时RPC1到达了Follower, FollowerRPC1的日志项强行追加时将导致log被缩短

解决方案:
也就是说, 除了考虑冲突外, 还需要考虑重复的RPC以及顺序颠倒的RPC, 因此需要检查每个位置的log是否匹配, 不匹配就覆盖, 否则不做更改。

这样对于重复的RPC就不会重复追加, 并且如果RPC顺序颠倒,也就是让Leader多通过一次心跳同步

1
2
3
4
5
6
7
8
9
10
11
12
13
for idx, log := range args.Entries {
ridx := args.PrevLogIndex + 1 + idx
if ridx < len(rf.log) && rf.log[ridx].Term != log.Term {
// 某位置发生了冲突, 覆盖这个位置开始的所有内容
rf.log = rf.log[:ridx]
rf.log = append(rf.log, args.Entries[idx:]...)
break
} else if ridx == len(rf.log) {
// 没有发生冲突但长度更长了, 直接拼接
rf.log = append(rf.log, args.Entries[idx:]...)
break
}
}

这里的bug是完成lab3后才发现的bug, lab2分支的代码还没有进行这样的修改也能通过测例。原因是在我的设计中,Start并没有立即广播心跳, 因此不会存在RPC顺序颠倒的情况, 如果想看最新的代码修改, 参见: https://github.com/ToniXWD/MIT6.5840/blob/lab3A/src/raft/raft.go#L606

2 优化: 快速回退

在之前的回退实现中, 如果有Follower的日志不匹配, 每次RPC中, Leader会将其nextIndex自减1来重试, 但其在某些情况下会导致效率很低, 因此需要AppendEntries RPC的回复信息携带更多的字段以加速回退, 核心思想就是:Follower返回更多信息给Leader,使其可以以Term为单位来回退

教授在课堂上已经介绍了快速回退的实现机制, 可以看我整理的笔记

我的实现和课堂的介绍基本一致, 只是将XLen空白的Log槽位数改为Log的长度:

2.1 结构体定义

1
2
3
4
5
6
7
8
type AppendEntriesReply struct {
// Your data here (2A).
Term int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
XTerm int // Follower中与Leader冲突的Log对应的Term
XIndex int // Follower中,对应Term为XTerm的第一条Log条目的索引
XLen int // Follower的log的长度
}

2.2 Follower侧的AppendEntries

发现冲突时, 回复的逻辑为:

  1. 如果PrevLogIndex位置不存在日志项, 通过设置reply.XTerm = -1告知Leader, 并将reply.XLen设置为自身日志长度
  2. 如果PrevLogIndex位置日志项存在但Term冲突, 通过reply.XTermreply.XIndex分别告知冲突位置的Term和这个TermFollower中第一次出现的位置

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
...
isConflict := false

// 校验PrevLogIndex和PrevLogTerm不合法
// 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogIndex >= len(rf.log) {
// PrevLogIndex位置不存在日志项
reply.XTerm = -1
reply.XLen = len(rf.log) // Log长度
isConflict = true
DPrintf("server %v 的log在PrevLogIndex: %v 位置不存在日志项, Log长度为%v\n", rf.me, args.PrevLogIndex, reply.XLen)
} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
// PrevLogIndex位置的日志项存在, 但term不匹配
reply.XTerm = rf.log[args.PrevLogIndex].Term
i := args.PrevLogIndex
for rf.log[i].Term == reply.XTerm {
i -= 1
}
reply.XIndex = i + 1
isConflict = true
DPrintf("server %v 的log在PrevLogIndex: %v 位置Term不匹配, args.Term=%v, 实际的term=%v\n", rf.me, args.PrevLogIndex, args.PrevLogTerm, reply.XTerm)
}

if isConflict {
reply.Term = rf.currentTerm
reply.Success = false
return
}
...
}

2.3 leader侧的handleAppendEntries

如果需要回退, leader的处理逻辑是:

  1. 如果XTerm == -1, 表示PrevLogIndex位置在Follower中不存在log, 需要将nextIndex设置为Followerlog长度即XLen
  2. 如果XTerm != -1, 表示PrevLogIndex位置在Follower中存在log但其TermXTerm, 与prevLogTerm不匹配, 同时XIndex表示这个TermFollower中第一次出现的位置, 需要如下进行判断:
    1. 如果Follower中存在XTerm, 将nextIndex设置为Follower中最后一个term == XTerm的日志项的下一位
    2. 否则, 将nextIndex设置为XIndex

具体代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (rf *Raft) handleAppendEntries(serverTo int, args *AppendEntriesArgs) {
// 目前的设计, 重试自动发生在下一次心跳函数, 所以这里不需要死循环

...

rf.mu.Lock()
defer rf.mu.Unlock()

...

if reply.Term == rf.currentTerm && rf.role == Leader {
// term仍然相同, 且自己还是leader, 表名对应的follower在prevLogIndex位置没有与prevLogTerm匹配的项
// 快速回退的处理
if reply.XTerm == -1 {
// PrevLogIndex这个位置在Follower中不存在
DPrintf("leader %v 收到 server %v 的回退请求, 原因是log过短, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, serverTo, rf.nextIndex[serverTo], serverTo, reply.XLen)
rf.nextIndex[serverTo] = reply.XLen
return
}

i := rf.nextIndex[serverTo] - 1
for i > 0 && rf.log[i].Term > reply.XTerm {
i -= 1
}
if rf.log[i].Term == reply.XTerm {
// 之前PrevLogIndex发生冲突位置时, Follower的Term自己也有

DPrintf("leader %v 收到 server %v 的回退请求, 冲突位置的Term为%v, server的这个Term从索引%v开始, 而leader对应的最后一个XTerm索引为%v, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, reply.XTerm, reply.XIndex, i, serverTo, rf.nextIndex[serverTo], serverTo, i+1)
rf.nextIndex[serverTo] = i + 1
} else {
// 之前PrevLogIndex发生冲突位置时, Follower的Term自己没有
DPrintf("leader %v 收到 server %v 的回退请求, 冲突位置的Term为%v, server的这个Term从索引%v开始, 而leader对应的XTerm不存在, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, reply.XTerm, reply.XIndex, serverTo, rf.nextIndex[serverTo], serverTo, reply.XIndex)
rf.nextIndex[serverTo] = reply.XIndex
}
return
}
}

3 持久化

持久化的内容只包括: votedFor, currentTerm, log, 为什么只需要持久化这三个变量, 也可以参考课堂笔记

3.1 持久化函数

persist函数和readPersist函数很简单, 只需要根据注释的提示完成即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) persist() {
// DPrintf("server %v 开始持久化, 最后一个持久化的log为: %v:%v", rf.me, len(rf.log)-1, rf.log[len(rf.log)-1].Cmd)

w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.votedFor)
e.Encode(rf.currentTerm)
e.Encode(rf.log)
raftstate := w.Bytes()
rf.persister.Save(raftstate, nil)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
if data == nil || len(data) == 0 {
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)

var votedFor int
var currentTerm int
var log []Entry
if d.Decode(&votedFor) != nil ||
d.Decode(&currentTerm) != nil ||
d.Decode(&log) != nil {
DPrintf("readPersist failed\n")
} else {
rf.votedFor = votedFor
rf.currentTerm = currentTerm
rf.log = log
}
}

3.2 持久化位置

lab的持久化方案很粗糙, 只要修改了votedFor, currentTerm, log中的任意一个, 则进行持久化, 因此只需要在相应位置调用persist即可, 这里旧不给出代码了, 感兴趣可以直接看我提供的仓库。

特别需要说明的是,当崩溃恢复时,其调用的函数仍然是Make函数, 而nextIndex需要在执行了readPersist后再初始化, 因为readPersist修改了log, 而nextIndex需初始化为log长度

3.3 持久化时是否需要锁?

按照我的理解, 持久化时不需要锁保护log, 原因如下:

  • Leader视角
    Leader永远不会删除自己的log(此时没有快照), 因此不需要锁保护
  • Follower视角
    尽管Follower可能截断log, 但永远不会截断在commitlog之前, 而持久化只需要保证已经commitlog, 因此也不需要锁

4 测试

4.1 常规测试

  1. 2B测试
    由于我们实现了快速回退, 此时可以测试2B, 看看是否速度有显著提升:
    执行测试命令
    1
    go test -v -run 2B
    结果如下:

lab2-2C-test2B

相比于之前快了15s左右, 勉勉强强满足了任务书中的一份子以内… 看了实现还是不够精妙啊

  1. 2C测试
    执行测试命令
    1
    go test -v -run 2C
    结果如下:

lab2-2C-test2C

比官方的示例满了4s, 还不错

4.2 多次测试

raft的许多特性导致其一次测试并不准确, 有些bug需要多次测试才会出现, 编写如下脚本命名为manyTest_2B.sh:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/bin/bash

# 初始化计数器
count=0
success_count=0
fail_count=0

# 设置测试次数
max_tests=50

for ((i=1; i<=max_tests; i++))
do
echo "Running test iteration $i of $max_tests..."

# 运行 go 测试命令
go test -v -run 2C &> output2C.log

# 检查 go 命令的退出状态
if [ "$?" -eq 0 ]; then
# 测试成功
success_count=$((success_count+1))
echo "Test iteration $i passed."
# 如果想保存通过的测试日志,取消下面行的注释
# mv output2C.log "success_$i.log"
else
# 测试失败
fail_count=$((fail_count+1))
echo "Test iteration $i failed, check 'failure2C_$i.log' for details."
mv output2C.log "failure2C_$i.log"
fi
done

# 报告测试结果
echo "Testing completed: $max_tests iterations run."
echo "Successes: $success_count"
echo "Failures: $fail_count"

再次进行测试:

1
./manyTest_2C.sh

结果:

lab2-2B-test2