最近终于自己动手做了一下大名鼎鼎的 MIT 6.824 Lab。因为 6.824 的资料已经有太多了,像介绍 Raft、介绍 Lab 之类的东西就不必再提了,另外 Raft 确实是名副其实的 understandable,算法很容易看懂,而且实现细节在 paper 中都体现得很好(Figure 2 is extremely precise),真的非常难得。不过还是有一些自己的收获和踩坑经历想记录一下,把我思考的过程写出来。

实现思路

尽管 Raft paper 的 Figure 2 算是一个相当详细的 specification 了,但是对我一个工程小白来说,中间有不少“不重要”的实现细节可能还是需要想一想。目前都是自己写的,还没参考过别人的实现,可能不是 best practice,可能会很 naive。

Election timeout

我们需要一个能 reset 的计时器。Lab 指导里建议不要用 golang 的 Timer,说很难用(还不知道为什么,以后学习一下)。

一个自然的想法是:每次 sleep(timeout),然后检查是否发生事件。严格地说这跟 reset 并不等价,例如 timeout 1s,在 0.1s 和 1.9s 发生事件,严格计时的话是会 timer fired。不过这对我们的应用无伤大雅……我一开始感觉实现严格的倒计时有点难,所以就按照睡醒检查的思路实现了。

检查事件的发生也有多种方式。Lab hint 里的方式是记录最后一次事件时间(用这个方式貌似也可以实现严格计时了,只需要减少下次 sleep 的时间)。我一开始没这么做是觉得事件通知这个需求不想用共享状态的方式来做,肯定可以用消息传递的方式来实现。

第一想法是用一个消息队列,但是 golang 的 channel 不管有无缓冲都会阻塞,而且我还需要每次清空消息,感觉很麻烦,所以我一开始就直接放弃了这条路(当然后来发现其实用 select 很容易实现非阻塞和清空,还是一开始对 golang 不熟……)。而且我觉得用消息有点“大材小用”,因为并没有消息要传递,只是一个等待与通知的模式,我只需要知道事件发生过,被通知一次。这样的情景应该是 condition variable 更直接吧?

但是 Cond 没唤醒我又不能进行操作,怎么办?我需要一个 select。不过 golang 里的 select 只能用于 channel,要与 condvar 配合使用的话还得包装一个 channel 出来,有点绕。于是,我写出来了大概这样的代码:

for !rf.killed() {
    heardLeaderChan := make(chan struct{})
    go func() {
        rf.heardLeaderCond.Wait()
        rf.mu.Unlock()
        close(heardLeaderChan)
    }()

    time.Sleep(timeout)

    select {
    case <-heardLeaderChan:
        continue
    default:
        rf.beginElection()
    }
}

当然现在发现有很大槽点:为什么不直接 select 里用 time.After?用了的话严格倒计时也有了。当然是因为一开始不知道有这个东西,还有点思维定势,确定了睡醒检查事件的思路,就一直在想怎么记录事件发生了,也没有想到可以直接把 select 同时用在时钟和时间上,后来知道以后又没想到再回来改这块了(又不是不能用)……另外,正如上面所说,可以直接用一个大小为 1 的 channel,每次非阻塞地 send,不必使用 Cond,写起来更直接一些。

所以简言之,倒计时这个需求这样就可以很好的解决了:

select {
case <-time.After(timeout):
    ...
case <-eventChan:
    ...
}

Election

我想单独写个选举函数,但是除了选举完成,还有可能发生遇到 leader、超时,等情况,虽然可以放在选举函数里一起处理,不过我感觉不够优雅。后来我意识到可以用一个 channel 参数 beginElection(resultChan) 来异步传结果。这样选举函数就可以专心选举,不用管其他情况了。

关于如何发选票和收集统计选票我也纠结了一会儿。可以选择在发 RPC 请求的 goroutine 里原地处理 reply,但我选择了把 reply 送到一个 channel 里,让另一个人统一处理。我主要是担心同步起来麻烦,计数器不好维护(一开始想用 atomic,其实加一把大锁就行),通知投票完成/失败也有点麻烦。我还想过用 waitGroup,不过我并不需要所有人都返回结果。

有一点值得一提的是选举完成/失败后更新状态要小心,有可能已经变成 follower 了,不能更新为 leader。这个问题的关键原因是收到 RPC reply 时不能假设状态和发送 RPC request 时一致,在每个处理 RPC reply 的地方都要小心这个问题。

心跳和复制日志

我一开始是完全把心跳和复制日志分开的,虽然它们使用相同的 RPC,但是性质稍有不同,心跳需要单纯定时发送空日志,而复制日志我希望请求来时能立刻发送,并且失败后要重试,这样才比较快。

不过这样做当然代码会显得比较乱。更关键其实我没意识到一点:复制日志也可以视作一次心跳,就是没必要区分的。

主要是我仍然有点怀疑:没有失败后直接重试,而且每次新的 RPC 都要再复制一次需要发送的 entries,会不会性能很差?我还是有点想加一个重传机制。后来我还是试了一下,发现只靠心跳间隔重传完全没有问题,测试都能通过,时间也挺合理。所以说在一开始真的可以不用想太多(细节的性能问题),避免 premature optimization

但我另一个(被遗忘的)担心的点其实真的是重要的:复制日志确实需要能立刻发送。这个问题直到做 Lab 3 kvraft 的时候才发现:TestSnapshotSize3B 竟然花了 80s(Lab 里说 20s 以内是合理的,实际上只要不到 5s)。这个问题卡了我很久。最后我发现这个测试是 200 次循环,每次先请求指令,等完成了再发下一条,所以对 latency 有要求。之前别的测试其实都只看 throughput,所以都能顺利通过。要解决这个问题,就在请求来时主动额外多发一次心跳就可以了。

Commit & Apply

apply 肯定是弄一个 background worker 等 commit 之后唤醒比较好。一个可以优化的点是先复制 log,然后 unlock,然后 apply(send to channel),这么做是假设 apply 会比较慢,甚至阻塞。其实也不太有必要。

而 commit,因为我一开始 replicate 写的是要完成一个 index 的死任务(不断失败重传,直到这个 index 成功),所以完成任务后 commit 就行了。将 replicate 和心跳统一后,每次心跳给每个 follower 发送的内容都是动态调整的,因此还是把 commit 也做成了单独的一个 background worker,每次唤醒时检查 matchIndex。修改后大概写成了下面这样:

Nid := (len(rf.peers) - 1) / 2
rf.commitCond.Wait()
for !rf.killed() {
    if rf.lastTerm() == rf.currentTerm {
        matchIndex := append([]int(nil), rf.matchIndex...)
        sort.Ints(matchIndex)
        N := matchIndex[Nid]
        if N > rf.commitIndex {
            rf.commitIndex = N
            rf.applyCond.Broadcast()
        }
    }
    rf.commitCond.Wait()
}

这里面我犯了一个严重的错误:仅当最后一项的 Term 是 currentTerm 时才试图 commit,这没有问题;问题是,这并不保证检查出来的多数已达到的最大 index N 就可以 commit!还需要再检查 rf.log[N].Term == rf.currentTerm 才可以 commit。这里其实是因为小聪明(加了一个 lastTerm 的判断,就不判断 index N 的 Term 了)导致了大错误……这当然就会挂掉 Figure 8 的 test,因为 commit 了旧 term 的 entries。

我一开始单独 replicate,完成之后 commit 的实现里是没有这个问题的,当时实现完了唯一的问题是 fail to reach agreement,应该是性能不太够的原因。然后我就着手重构代码,首先做的就是把 replicate 完全合进了 heartbeat,单独拎出 committer。结果就出现了 Figure 8 测试的 apply error。

然后我没有立刻意识到问题,就一边重新过了一边 paper 和代码,一边优化重构,一边 debug。通常来说 apply error 要么是 persist 有问题,要么是违反了 Figure 2 的 rules,但我感觉这两块怎么也看不出问题。最后一共花了 10 几个小时,终于突然意识到了这个问题,一下子就解决了……

反思:在一开始出现问题的时候就盯准了趁早 debug 或许会更好。因为改动还较少,等到后面改的多了更找不到问题了。另外对问题的原因不够敏感,其他测试能过,但是 Figure 8 挂了,应该立刻想到是 commit 了 previous term 的 entries,毕竟 Figure 8 谈的就是这个问题。Debug 的时候真的要多思考,不能盲目地瞎看代码。

测试脚本

分布式系统需要进行大量的测试,这让我第一次切实地感受到用 script 减少重复劳动的需求,否则我只能一次次地手打 go test XXTest > filename,看一下结果,然后再删掉。虽然我多少会一点命令行使用,不过还是没正经写过 bash。这次是个好机会,真的需要“必先利其器”了,所以我去学了(收藏已久的)MIT the Missing Semester 的前两节课,终于会写 shell script 了 O(∩_∩)O

(写的比较挫,能用就行,就不贴出来献丑了。)

踩坑记录

死锁

虽然在一定程度上,好的代码风格可以避免死锁,不过难免还是可能写出死锁。于是我找到个在线检测死锁的工具,还是挺好用的。

我被死锁卡住的最大一个问题其实很愚蠢,就是我不知道 Cond 被唤醒以后是持有锁的……

RPC

  1. 不要对自己 RPC,循环的时候记得加个 if i != rf.me……
  2. RPC 参数的字段一定要大写开头。对 go 用大小写管理可见性的设计有点无力吐槽……
  3. RPC 的 reply 不要复用,每次调用都重新创建一个空的,因为 labgob warning: Decoding into a non-default variable/field %v may not work。
  4. RPC 调用的返回值还是有用的,要区分一下调用失败和丢包。比如AppendEntries 丢包的情况不应该更新 nextIndex,不然可能会出错 index out of range。
  5. 上文说过,发送和接受 RPC 的时候状态很可能发生了变化,要小心过时 reply。

Election restriction

我在 RPC handler 里检查了 restriction,但是忘记改 election 的代码,RPC 参数没加上相应的内容……

所以每次要加点什么,或者改点什么的时候,一定得 find all usage 看看,不能遗漏了地方没改,否则就会出现不一致。

Partition 恢复以后选不出 leader

现象是一个 follower disconnect 以后自己不断选举,Term 变得超大,然后 reconnect 以后它赢不了,因为 log 不 up-to-date。别人的 Term 太小,也赢不了。

我马上意识到问题:其他人会更新到这个很大的 term 啊。看了一下,果然是我没更新,我只想到选举会被 leader 打断,不 grant vote 的 reply 可以直接无视,但这也需要更新 Term。

commitIndex > len(log)

说明 committed entries 竟然被删掉了。这也是个容易犯的错误,是 AppendEntries 仅当有冲突的时候才删除多余的 log。一开始我写成了 log = append(log[:lastIndex], args.Entries...),也就是会删除超出 leader 送过来的 entries 之后的以后 entries。这当 leader 重启后 replay 的时候就可能会出现问题了。

小结

虽然还有不少可以优化的,不过 Lab 做成这样差不多了,我也基本没怎么参考别人的实现,有那精力不如去看生产级的代码(etcd、TiKV)。

在著名的 Students’ Guide to Raft 里有很多常见问题,不过我自己 debug 去看的时候发现倒基本没踩里面的坑(反而踩了一堆其他乱七八糟的坑),所以说关键还是还原 paper、细心 debug 吧。

Tags:

Categories:

Updated: