diff --git a/docs/answers/lab2-raft.md b/docs/answers/lab2-raft.md new file mode 100644 index 0000000..a0cc791 --- /dev/null +++ b/docs/answers/lab2-raft.md @@ -0,0 +1,519 @@ +# MIT 6.824 Lab2: Raft 实验报告 + +> 📅 日期:2026-02-26 +> 🔗 代码仓库:https://git.rc707blog.top/rose_cat707/6.824-golabs-2021-6.824 +> 🌿 分支:answer/20260226 + +--- + +## 1. 实验概述 + +本实验要求实现 Raft 共识算法,这是分布式系统中实现容错的关键组件。Raft 将共识问题分解为三个相对独立的子问题: + +1. **领导者选举 (Leader Election)** - 2A +2. **日志复制 (Log Replication)** - 2B +3. **持久化 (Persistence)** - 2C +4. **快照 (Snapshots)** - 2D + +--- + +## 2. 核心数据结构 + +### 2.1 服务器状态 + +```go +type Raft struct { + mu sync.Mutex // 保护共享状态的锁 + peers []*labrpc.ClientEnd // 所有节点的 RPC 端点 + persister *Persister // 持久化存储 + me int // 当前节点索引 + dead int32 // 是否被 Kill + + // 持久状态 (Figure 2) + currentTerm int // 当前任期 + votedFor int // 投票给谁 + log []LogEntry // 日志条目 + + // 易失状态 - 所有节点 + commitIndex int // 已提交的最高日志索引 + lastApplied int // 已应用的最高日志索引 + + // 易失状态 - 仅领导者 + nextIndex []int // 对每个节点,下一个要发送的日志索引 + matchIndex []int // 对每个节点,已复制的最高日志索引 + + // 服务器状态 + state int // Follower/Candidate/Leader + lastHeartbeat time.Time // 上次收到心跳的时间 +} +``` + +### 2.2 日志条目 + +```go +type LogEntry struct { + Term int + Command interface{} +} +``` + +--- + +## 3. 2A: Leader Election + +### 3.1 状态转换 + +``` +┌─────────────┐ ┌─────────────┐ +│ Follower │───超时无心跳──────→│ Candidate │ +└─────────────┘ └─────────────┘ + ↑ │ + │ │ + │ ┌─────────────────────────┼─────────────────────────┐ + │ │ │ │ + │ ↓ ↓ ↓ + │ ┌─────────────┐ ┌─────────────┐ 收到更高任期 + └──│ Leader │←──获得多数票──│ Candidate │──────────────────┘ + └─────────────┘ └─────────────┘ +``` + +### 3.2 选举超时 + +```go +const ( + HeartbeatInterval = 100 * time.Millisecond + ElectionTimeoutMin = 300 // ms + ElectionTimeoutMax = 500 // ms +) +``` + +随机化选举超时 (300-500ms) 避免同时选举。 + +### 3.3 选举流程 + +```go +func (rf *Raft) startElection() { + // 1. 转为 Candidate,增加任期,投自己一票 + rf.state = Candidate + rf.currentTerm++ + rf.votedFor = rf.me + rf.persist() + + // 2. 并行向所有节点发送 RequestVote RPC + for i := range rf.peers { + go func(server int) { + args := RequestVoteArgs{ + Term: term, + CandidateId: rf.me, + LastLogIndex: lastLogIndex, + LastLogTerm: lastLogTerm, + } + reply := RequestVoteReply{} + ok := rf.sendRequestVote(server, &args, &reply) + // 处理投票响应... + }(i) + } + + // 3. 如果获得多数票,成为 Leader + if votes > len(rf.peers)/2 { + rf.state = Leader + // 初始化 nextIndex 和 matchIndex + } +} +``` + +### 3.4 RequestVote RPC 处理 + +```go +func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { + // 1. 拒绝过期任期的请求 + if args.Term < rf.currentTerm { + return + } + + // 2. 更新任期(如果请求方任期更高) + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.state = Follower + rf.persist() + } + + // 3. 检查投票条件 + // - 没投给别人,或投的就是这个候选人 + // - 候选人日志至少和自己一样新 + if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && + (args.LastLogTerm > lastLogTerm || + (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) { + reply.VoteGranted = true + rf.votedFor = args.CandidateId + rf.lastHeartbeat = time.Now() // 重置选举超时 + rf.persist() + } +} +``` + +--- + +## 4. 2B: Log Replication + +### 4.1 AppendEntries RPC + +领导者通过心跳发送 AppendEntries RPC: + +```go +type AppendEntriesArgs struct { + Term int // 领导者任期 + LeaderId int // 领导者 ID + PrevLogIndex int // 前一个日志索引 + PrevLogTerm int // 前一个日志任期 + Entries []LogEntry // 要追加的日志条目 + LeaderCommit int // 领导者的 commitIndex +} +``` + +### 4.2 日志一致性检查 + +Follower 收到 AppendEntries 后: + +1. 检查 `PrevLogIndex` 处的日志任期是否匹配 `PrevLogTerm` +2. 如果匹配,追加新条目(删除冲突条目) +3. 更新 `commitIndex` + +### 4.3 快速回退优化 (Fast Backup) + +传统方法:逐个递减 `nextIndex`,效率低。 + +优化方法:Follower 在拒绝时返回额外信息: + +```go +type AppendEntriesReply struct { + Term int + Success bool + XTerm int // 冲突条目的任期 + XIndex int // 该任期的第一个索引 + XLen int // 日志长度 +} +``` + +领导者根据这些信息快速调整 `nextIndex`: + +```go +if reply.XTerm != 0 { + // 在自己日志中找 XTerm + found := false + for i := rf.getLastLogIndex(); i > rf.lastIncludedIndex; i-- { + if rf.getLogTerm(i) == reply.XTerm { + rf.nextIndex[server] = i + 1 + found = true + break + } + } + if !found { + rf.nextIndex[server] = reply.XIndex // 跳到 XTerm 第一个索引 + } +} else { + rf.nextIndex[server] = reply.XLen // Follower 日志太短 +} +``` + +### 4.4 提交规则 + +**关键规则**:只能提交当前任期的日志条目! + +```go +func (rf *Raft) updateCommitIndex() { + for n := rf.getLastLogIndex(); n > rf.commitIndex && n > rf.lastIncludedIndex; n-- { + if rf.getLogTerm(n) != rf.currentTerm { + continue // 只提交当前任期的日志 + } + count := 1 // 自己 + for i := range rf.peers { + if i != rf.me && rf.matchIndex[i] >= n { + count++ + } + } + if count > len(rf.peers)/2 { + rf.commitIndex = n + break + } + } +} +``` + +--- + +## 5. 2C: Persistence + +### 5.1 需要持久化的状态 + +根据 Raft 论文 Figure 2: + +- `currentTerm` - 当前任期 +- `votedFor` - 投票给谁 +- `log[]` - 日志条目 + +### 5.2 持久化实现 + +```go +func (rf *Raft) persist() { + w := new(bytes.Buffer) + e := labgob.NewEncoder(w) + e.Encode(rf.currentTerm) + e.Encode(rf.votedFor) + e.Encode(rf.log) + e.Encode(rf.lastIncludedIndex) // 2D 新增 + e.Encode(rf.lastIncludedTerm) // 2D 新增 + data := w.Bytes() + rf.persister.SaveRaftState(data) +} + +func (rf *Raft) readPersist(data []byte) { + if data == nil || len(data) < 1 { + return + } + r := bytes.NewBuffer(data) + d := labgob.NewDecoder(r) + var currentTerm, votedFor, lastIncludedIndex, lastIncludedTerm int + var log []LogEntry + if d.Decode(¤tTerm) != nil || + d.Decode(&votedFor) != nil || + d.Decode(&log) != nil || + d.Decode(&lastIncludedIndex) != nil || + d.Decode(&lastIncludedTerm) != nil { + // 解码失败 + } else { + rf.currentTerm = currentTerm + rf.votedFor = votedFor + rf.log = log + rf.lastIncludedIndex = lastIncludedIndex + rf.lastIncludedTerm = lastIncludedTerm + } +} +``` + +### 5.3 持久化时机 + +每次修改 `currentTerm`、`votedFor` 或 `log` 后都要调用 `persist()`。 + +--- + +## 6. 2D: Snapshots + +### 6.1 为什么需要快照? + +随着日志增长,内存和磁盘占用会无限增长。快照机制可以: +- 截断已应用的日志 +- 减少内存占用 +- 加速新节点同步 + +### 6.2 快照数据结构 + +```go +type Raft struct { + // ... 其他字段 + lastIncludedIndex int // 快照包含的最后日志索引 + lastIncludedTerm int // 快照包含的最后日志任期 +} +``` + +### 6.3 服务端创建快照 + +```go +func (rf *Raft) Snapshot(index int, snapshot []byte) { + rf.mu.Lock() + defer rf.mu.Unlock() + + // 边界检查 + if index <= rf.lastIncludedIndex || index > rf.getLastLogIndex() { + return + } + + // 先保存 term,再截断日志 + newLastIncludedTerm := rf.getLogTerm(index) + rf.log = rf.log[index-rf.lastIncludedIndex:] + rf.lastIncludedTerm = newLastIncludedTerm + rf.lastIncludedIndex = index + + // 持久化状态和快照 + rf.persister.SaveStateAndSnapshot(state, snapshot) +} +``` + +### 6.4 InstallSnapshot RPC + +当日志落后于领导者时,领导者发送快照: + +```go +type InstallSnapshotArgs struct { + Term int + LeaderId int + LastIncludedIndex int + LastIncludedTerm int + Data []byte +} + +func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) { + // 1. 检查任期 + if args.Term < rf.currentTerm { + return + } + + // 2. 更新状态 + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.persist() + } + + // 3. 截断日志 + if args.LastIncludedIndex > rf.lastIncludedIndex { + if args.LastIncludedIndex <= rf.getLastLogIndex() && + rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm { + // 保留后面的日志 + rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:] + } else { + rf.log = make([]LogEntry, 0) + } + rf.lastIncludedIndex = args.LastIncludedIndex + rf.lastIncludedTerm = args.LastIncludedTerm + rf.persister.SaveStateAndSnapshot(state, args.Data) + + // 发送到 applyCh + rf.applyCh <- ApplyMsg{ + SnapshotValid: true, + Snapshot: args.Data, + SnapshotIndex: args.LastIncludedIndex, + SnapshotTerm: args.LastIncludedTerm, + } + } +} +``` + +--- + +## 7. 关键实现细节 + +### 7.1 并发安全 + +- 使用 `sync.Mutex` 保护所有共享状态 +- 使用 `sync.Cond` 实现应用器的等待/唤醒机制 +- 使用 `atomic` 实现 `Kill()` 和 `killed()` + +### 7.2 避免活锁 + +- 随机化选举超时 (300-500ms) +- 心跳间隔 100ms,远小于选举超时 + +### 7.3 正确处理任期 + +- 任期是 Raft 的核心,所有 RPC 都要检查任期 +- 收到更高任期时,立即转为 Follower 并更新任期 + +### 7.4 日志索引处理 + +- 快照后的日志索引需要偏移计算 +- `getLogTerm()` 和 `getLogEntry()` 统一处理索引转换 + +--- + +## 8. 测试结果 + +### 8.1 2A: Leader Election + +``` +=== RUN TestInitialElection2A + ... Passed -- 3.1 3 54 14568 0 +=== RUN TestReElection2A + ... Passed -- 8.9 3 176 34056 0 +=== RUN TestManyElections2A + ... Passed -- 5.5 7 408 86416 0 +PASS +``` + +### 8.2 2B: Log Replication + +``` +=== RUN TestBasicAgree2B + ... Passed -- 1.0 3 16 4310 3 +=== RUN TestRPCBytes2B + ... Passed -- 2.8 3 48 113570 11 +=== RUN TestFailAgree2B + ... Passed -- 6.3 3 116 30885 8 +=== RUN TestFailNoAgree2B + ... Passed -- 3.8 5 148 34328 4 +=== RUN TestConcurrentStarts2B + ... Passed -- 0.8 3 14 3552 6 +=== RUN TestRejoin2B + ... Passed -- 4.5 3 138 31037 4 +=== RUN TestBackup2B + ... Passed -- 33.6 5 2120 1888317 102 +=== RUN TestCount2B + ... Passed -- 2.1 3 36 10066 12 +PASS +``` + +### 8.3 2C: Persistence + +``` +=== RUN TestPersist12C + ... Passed -- 4.1 3 76 18768 6 +=== RUN TestPersist22C + ... Passed -- 44.1 5 1812 370976 16 +=== RUN TestPersist32C + ... Passed -- 2.1 3 34 8370 4 +=== RUN TestFigure82C + ... Passed -- 34.7 5 308 60588 12 +=== RUN TestUnreliableAgree2C + ... Passed -- 8.5 5 308 96421 251 +=== RUN TestFigure8Unreliable2C + ... Passed -- 30.8 5 2216 12947486 895 +=== RUN TestReliableChurn2C + ... Passed -- 16.3 5 580 424371 315 +=== RUN TestUnreliableChurn2C + ... Passed -- 16.2 5 268 133185 64 +PASS +``` + +### 8.4 2D: Snapshots + +``` +=== RUN TestSnapshotBasic2D + ... Passed -- 7.7 3 136 48058 251 +=== RUN TestSnapshotInstall2D + ... Passed -- 66.2 3 1358 359800 321 +=== RUN TestSnapshotInstallUnreliable2D + ... Passed -- 88.0 3 1808 431965 341 +=== RUN TestSnapshotInstallCrash2D + ... Passed -- 37.9 3 658 183093 355 +=== RUN TestSnapshotInstallUnCrash2D + ... Passed -- 57.5 3 782 207519 388 +PASS +``` + +--- + +## 9. 总结与收获 + +### 9.1 实现要点 + +1. **任期是核心**:所有操作都要正确处理任期变化 +2. **随机化超时**:避免选举冲突 +3. **快速回退**:优化日志同步效率 +4. **正确持久化**:崩溃恢复的关键 +5. **快照索引**:注意偏移量计算 + +### 9.2 调试经验 + +- 使用 `-race` 检测数据竞争 +- 仔细阅读论文 Figure 2 +- 处理边界条件(空日志、快照边界等) +- 测试中的超时时间很宽松,实际实现要更高效 + +### 9.3 后续工作 + +Lab3 将基于此 Raft 实现构建容错的 KV 服务。 + +--- + +> 💡 Raft 是分布式系统的基石,理解它的实现对于后续的分布式存储、分布式协调等系统至关重要。 \ No newline at end of file