Files
6.824-golabs-2021-6.824/docs/answers/lab2-raft.md
2026-02-26 10:51:53 +00:00

14 KiB
Raw Permalink Blame History

MIT 6.824 Lab2: Raft 实验报告

📅 日期2026-02-26 🔗 代码仓库:https://git.rc707blog.top/rose_cat707/6.824-golabs-2021-6.824 🌿 分支answer/20260226


1. 实验概述

本实验要求实现 Raft 共识算法这是分布式系统中实现容错的关键组件。Raft 将共识问题分解为三个相对独立的子问题:

  1. 领导者选举 (Leader Election) - 2A
  2. 日志复制 (Log Replication) - 2B
  3. 持久化 (Persistence) - 2C
  4. 快照 (Snapshots) - 2D

2. 核心数据结构

2.1 服务器状态

type Raft struct {
    mu        sync.Mutex          // 保护共享状态的锁
    peers     []*labrpc.ClientEnd // 所有节点的 RPC 端点
    persister *Persister          // 持久化存储
    me        int                 // 当前节点索引
    dead      int32               // 是否被 Kill

    // 持久状态 (Figure 2)
    currentTerm int         // 当前任期
    votedFor    int         // 投票给谁
    log         []LogEntry  // 日志条目

    // 易失状态 - 所有节点
    commitIndex int  // 已提交的最高日志索引
    lastApplied int  // 已应用的最高日志索引

    // 易失状态 - 仅领导者
    nextIndex  []int // 对每个节点,下一个要发送的日志索引
    matchIndex []int // 对每个节点,已复制的最高日志索引

    // 服务器状态
    state         int        // Follower/Candidate/Leader
    lastHeartbeat time.Time  // 上次收到心跳的时间
}

2.2 日志条目

type LogEntry struct {
    Term    int
    Command interface{}
}

3. 2A: Leader Election

3.1 状态转换

┌─────────────┐                    ┌─────────────┐
│  Follower   │───超时无心跳──────→│  Candidate  │
└─────────────┘                    └─────────────┘
       ↑                                  │
       │                                  │
       │         ┌─────────────────────────┼─────────────────────────┐
       │         │                         │                         │
       │         ↓                         ↓                         ↓
       │  ┌─────────────┐           ┌─────────────┐          收到更高任期
       └──│   Leader    │←──获得多数票──│  Candidate  │──────────────────┘
          └─────────────┘           └─────────────┘

3.2 选举超时

const (
    HeartbeatInterval  = 100 * time.Millisecond
    ElectionTimeoutMin = 300  // ms
    ElectionTimeoutMax = 500  // ms
)

随机化选举超时 (300-500ms) 避免同时选举。

3.3 选举流程

func (rf *Raft) startElection() {
    // 1. 转为 Candidate增加任期投自己一票
    rf.state = Candidate
    rf.currentTerm++
    rf.votedFor = rf.me
    rf.persist()

    // 2. 并行向所有节点发送 RequestVote RPC
    for i := range rf.peers {
        go func(server int) {
            args := RequestVoteArgs{
                Term:         term,
                CandidateId:  rf.me,
                LastLogIndex: lastLogIndex,
                LastLogTerm:  lastLogTerm,
            }
            reply := RequestVoteReply{}
            ok := rf.sendRequestVote(server, &args, &reply)
            // 处理投票响应...
        }(i)
    }

    // 3. 如果获得多数票,成为 Leader
    if votes > len(rf.peers)/2 {
        rf.state = Leader
        // 初始化 nextIndex 和 matchIndex
    }
}

3.4 RequestVote RPC 处理

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // 1. 拒绝过期任期的请求
    if args.Term < rf.currentTerm {
        return
    }

    // 2. 更新任期(如果请求方任期更高)
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.state = Follower
        rf.persist()
    }

    // 3. 检查投票条件
    // - 没投给别人,或投的就是这个候选人
    // - 候选人日志至少和自己一样新
    if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
       (args.LastLogTerm > lastLogTerm ||
        (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
        reply.VoteGranted = true
        rf.votedFor = args.CandidateId
        rf.lastHeartbeat = time.Now()  // 重置选举超时
        rf.persist()
    }
}

4. 2B: Log Replication

4.1 AppendEntries RPC

领导者通过心跳发送 AppendEntries RPC

type AppendEntriesArgs struct {
    Term         int         // 领导者任期
    LeaderId     int         // 领导者 ID
    PrevLogIndex int         // 前一个日志索引
    PrevLogTerm  int         // 前一个日志任期
    Entries      []LogEntry  // 要追加的日志条目
    LeaderCommit int         // 领导者的 commitIndex
}

4.2 日志一致性检查

Follower 收到 AppendEntries 后:

  1. 检查 PrevLogIndex 处的日志任期是否匹配 PrevLogTerm
  2. 如果匹配,追加新条目(删除冲突条目)
  3. 更新 commitIndex

4.3 快速回退优化 (Fast Backup)

传统方法:逐个递减 nextIndex,效率低。

优化方法Follower 在拒绝时返回额外信息:

type AppendEntriesReply struct {
    Term    int
    Success bool
    XTerm  int // 冲突条目的任期
    XIndex int // 该任期的第一个索引
    XLen   int // 日志长度
}

领导者根据这些信息快速调整 nextIndex

if reply.XTerm != 0 {
    // 在自己日志中找 XTerm
    found := false
    for i := rf.getLastLogIndex(); i > rf.lastIncludedIndex; i-- {
        if rf.getLogTerm(i) == reply.XTerm {
            rf.nextIndex[server] = i + 1
            found = true
            break
        }
    }
    if !found {
        rf.nextIndex[server] = reply.XIndex  // 跳到 XTerm 第一个索引
    }
} else {
    rf.nextIndex[server] = reply.XLen  // Follower 日志太短
}

4.4 提交规则

关键规则:只能提交当前任期的日志条目!

func (rf *Raft) updateCommitIndex() {
    for n := rf.getLastLogIndex(); n > rf.commitIndex && n > rf.lastIncludedIndex; n-- {
        if rf.getLogTerm(n) != rf.currentTerm {
            continue  // 只提交当前任期的日志
        }
        count := 1  // 自己
        for i := range rf.peers {
            if i != rf.me && rf.matchIndex[i] >= n {
                count++
            }
        }
        if count > len(rf.peers)/2 {
            rf.commitIndex = n
            break
        }
    }
}

5. 2C: Persistence

5.1 需要持久化的状态

根据 Raft 论文 Figure 2

  • currentTerm - 当前任期
  • votedFor - 投票给谁
  • log[] - 日志条目

5.2 持久化实现

func (rf *Raft) persist() {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    e.Encode(rf.currentTerm)
    e.Encode(rf.votedFor)
    e.Encode(rf.log)
    e.Encode(rf.lastIncludedIndex)  // 2D 新增
    e.Encode(rf.lastIncludedTerm)   // 2D 新增
    data := w.Bytes()
    rf.persister.SaveRaftState(data)
}

func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 {
        return
    }
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm, votedFor, lastIncludedIndex, lastIncludedTerm int
    var log []LogEntry
    if d.Decode(&currentTerm) != nil ||
       d.Decode(&votedFor) != nil ||
       d.Decode(&log) != nil ||
       d.Decode(&lastIncludedIndex) != nil ||
       d.Decode(&lastIncludedTerm) != nil {
        // 解码失败
    } else {
        rf.currentTerm = currentTerm
        rf.votedFor = votedFor
        rf.log = log
        rf.lastIncludedIndex = lastIncludedIndex
        rf.lastIncludedTerm = lastIncludedTerm
    }
}

5.3 持久化时机

每次修改 currentTermvotedForlog 后都要调用 persist()


6. 2D: Snapshots

6.1 为什么需要快照?

随着日志增长,内存和磁盘占用会无限增长。快照机制可以:

  • 截断已应用的日志
  • 减少内存占用
  • 加速新节点同步

6.2 快照数据结构

type Raft struct {
    // ... 其他字段
    lastIncludedIndex int  // 快照包含的最后日志索引
    lastIncludedTerm  int  // 快照包含的最后日志任期
}

6.3 服务端创建快照

func (rf *Raft) Snapshot(index int, snapshot []byte) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // 边界检查
    if index <= rf.lastIncludedIndex || index > rf.getLastLogIndex() {
        return
    }

    // 先保存 term再截断日志
    newLastIncludedTerm := rf.getLogTerm(index)
    rf.log = rf.log[index-rf.lastIncludedIndex:]
    rf.lastIncludedTerm = newLastIncludedTerm
    rf.lastIncludedIndex = index

    // 持久化状态和快照
    rf.persister.SaveStateAndSnapshot(state, snapshot)
}

6.4 InstallSnapshot RPC

当日志落后于领导者时,领导者发送快照:

type InstallSnapshotArgs struct {
    Term              int
    LeaderId          int
    LastIncludedIndex int
    LastIncludedTerm  int
    Data              []byte
}

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    // 1. 检查任期
    if args.Term < rf.currentTerm {
        return
    }

    // 2. 更新状态
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.persist()
    }

    // 3. 截断日志
    if args.LastIncludedIndex > rf.lastIncludedIndex {
        if args.LastIncludedIndex <= rf.getLastLogIndex() &&
           rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm {
            // 保留后面的日志
            rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:]
        } else {
            rf.log = make([]LogEntry, 0)
        }
        rf.lastIncludedIndex = args.LastIncludedIndex
        rf.lastIncludedTerm = args.LastIncludedTerm
        rf.persister.SaveStateAndSnapshot(state, args.Data)

        // 发送到 applyCh
        rf.applyCh <- ApplyMsg{
            SnapshotValid: true,
            Snapshot:      args.Data,
            SnapshotIndex: args.LastIncludedIndex,
            SnapshotTerm:  args.LastIncludedTerm,
        }
    }
}

7. 关键实现细节

7.1 并发安全

  • 使用 sync.Mutex 保护所有共享状态
  • 使用 sync.Cond 实现应用器的等待/唤醒机制
  • 使用 atomic 实现 Kill()killed()

7.2 避免活锁

  • 随机化选举超时 (300-500ms)
  • 心跳间隔 100ms远小于选举超时

7.3 正确处理任期

  • 任期是 Raft 的核心,所有 RPC 都要检查任期
  • 收到更高任期时,立即转为 Follower 并更新任期

7.4 日志索引处理

  • 快照后的日志索引需要偏移计算
  • getLogTerm()getLogEntry() 统一处理索引转换

8. 测试结果

8.1 2A: Leader Election

=== RUN   TestInitialElection2A
  ... Passed --   3.1  3   54   14568    0
=== RUN   TestReElection2A
  ... Passed --   8.9  3  176   34056    0
=== RUN   TestManyElections2A
  ... Passed --   5.5  7  408   86416    0
PASS

8.2 2B: Log Replication

=== RUN   TestBasicAgree2B
  ... Passed --   1.0  3   16    4310    3
=== RUN   TestRPCBytes2B
  ... Passed --   2.8  3   48  113570   11
=== RUN   TestFailAgree2B
  ... Passed --   6.3  3  116   30885    8
=== RUN   TestFailNoAgree2B
  ... Passed --   3.8  5  148   34328    4
=== RUN   TestConcurrentStarts2B
  ... Passed --   0.8  3   14    3552    6
=== RUN   TestRejoin2B
  ... Passed --   4.5  3  138   31037    4
=== RUN   TestBackup2B
  ... Passed --  33.6  5 2120 1888317  102
=== RUN   TestCount2B
  ... Passed --   2.1  3   36   10066   12
PASS

8.3 2C: Persistence

=== RUN   TestPersist12C
  ... Passed --   4.1  3   76   18768    6
=== RUN   TestPersist22C
  ... Passed --  44.1  5 1812  370976   16
=== RUN   TestPersist32C
  ... Passed --   2.1  3   34    8370    4
=== RUN   TestFigure82C
  ... Passed --  34.7  5  308   60588   12
=== RUN   TestUnreliableAgree2C
  ... Passed --   8.5  5  308   96421  251
=== RUN   TestFigure8Unreliable2C
  ... Passed --  30.8  5 2216 12947486  895
=== RUN   TestReliableChurn2C
  ... Passed --  16.3  5  580  424371  315
=== RUN   TestUnreliableChurn2C
  ... Passed --  16.2  5  268  133185   64
PASS

8.4 2D: Snapshots

=== RUN   TestSnapshotBasic2D
  ... Passed --   7.7  3  136   48058  251
=== RUN   TestSnapshotInstall2D
  ... Passed --  66.2  3 1358  359800  321
=== RUN   TestSnapshotInstallUnreliable2D
  ... Passed --  88.0  3 1808  431965  341
=== RUN   TestSnapshotInstallCrash2D
  ... Passed --  37.9  3  658  183093  355
=== RUN   TestSnapshotInstallUnCrash2D
  ... Passed --  57.5  3  782  207519  388
PASS

9. 总结与收获

9.1 实现要点

  1. 任期是核心:所有操作都要正确处理任期变化
  2. 随机化超时:避免选举冲突
  3. 快速回退:优化日志同步效率
  4. 正确持久化:崩溃恢复的关键
  5. 快照索引:注意偏移量计算

9.2 调试经验

  • 使用 -race 检测数据竞争
  • 仔细阅读论文 Figure 2
  • 处理边界条件(空日志、快照边界等)
  • 测试中的超时时间很宽松,实际实现要更高效

9.3 后续工作

Lab3 将基于此 Raft 实现构建容错的 KV 服务。


💡 Raft 是分布式系统的基石,理解它的实现对于后续的分布式存储、分布式协调等系统至关重要。