1 Commits

Author SHA1 Message Date
青黛
25a0f4e0a4 Test commit by 青黛 2026-02-26 22:55:32 +08:00
7 changed files with 190 additions and 1922 deletions

View File

@@ -27,4 +27,4 @@ Prerequisites: 6.1910 (6.004) and one of 6.1800 (6.033) or 6.1810, or equivalent
[7. Lab 5: Sharded Key-Value Service](./docs/6.5840%3A%20Distributed%20System/7.%20Lab%205%3A%20Sharded%20Key-Value%20Service.md)
---
*From: [6.5840: Distributed Systems](https://pdos.csail.mit.edu/6.824/index.html)*
*From: [6.5840: Distributed Systems](https://pdos.csail.mit.edu/6.824/index.html)*# Test by 青黛

View File

@@ -1,277 +0,0 @@
# MIT 6.824 Lab1: MapReduce 实验报告
> 📅 日期2026-02-26
> 🔗 代码仓库https://git.rc707blog.top/rose_cat707/6.824-golabs-2021-6.824
> 🌿 分支answer/20260226
---
## 1. 实验概述
本实验要求实现一个分布式 MapReduce 系统,包括 Coordinator协调者和 Worker工作者两个核心组件。系统需要能够
- 将 Map 和 Reduce 任务分配给多个 Worker 并行执行
- 处理 Worker 崩溃的情况(故障容错)
- 确保输出文件的原子性写入
## 2. 系统架构
```
┌──────────────────────────────────────────────────────────┐
│ Coordinator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │
│ │ Map Tasks │ │ Reduce Tasks│ │ Timeout Checker │ │
│ │ (8 files) │ │ (10 tasks) │ │ (10 seconds) │ │
│ └─────────────┘ └─────────────┘ └─────────────────┘ │
└──────────────────────────────────────────────────────────┘
│ RequestTask/ReportTask (RPC)
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker1 │ │ Worker2 │ │ Worker3 │
└─────────┘ └─────────┘ └─────────┘
```
## 3. 核心实现
### 3.1 RPC 定义 (`rpc.go`)
定义了 Worker 和 Coordinator 之间的通信协议:
```go
// 任务类型
type TaskType int
const (
MapTask TaskType = 0 // Map 任务
ReduceTask TaskType = 1 // Reduce 任务
WaitTask TaskType = 2 // 等待(无可用任务)
ExitTask TaskType = 3 // 退出(全部完成)
)
// 任务状态
type TaskStatus int
const (
Idle TaskStatus = 0 // 空闲
InProgress TaskStatus = 1 // 进行中
Completed TaskStatus = 2 // 已完成
)
```
### 3.2 Coordinator (`coordinator.go`)
Coordinator 负责任务调度和状态管理:
#### 数据结构
```go
type Coordinator struct {
mu sync.Mutex // 保护并发访问
files []string // 输入文件列表
nReduce int // Reduce 任务数量
mapTasks []Task // Map 任务列表
mapDone int // 已完成的 Map 任务数
mapFinished bool // Map 阶段是否完成
reduceTasks []Task // Reduce 任务列表
reduceDone int // 已完成的 Reduce 任务数
reduceFinished bool // Reduce 阶段是否完成
}
```
#### 任务分配策略
1. **两阶段执行**:先完成所有 Map 任务,再开始 Reduce 任务
2. **超时重试**:任务分配 10 秒后未完成则重新分配(处理 Worker 崩溃)
3. **状态追踪**:每个任务有 Idle → InProgress → Completed 的状态转换
```go
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
// 检查超时任务
c.checkTimeouts()
// 优先分配 Map 任务
if !c.mapFinished {
for i := range c.mapTasks {
if c.mapTasks[i].Status == Idle {
// 分配任务并记录开始时间
c.mapTasks[i].Status = InProgress
c.mapTasks[i].StartTime = time.Now()
reply.TaskType = MapTask
reply.TaskID = i
reply.Filename = c.mapTasks[i].Filename
return nil
}
}
reply.TaskType = WaitTask // 所有任务都在进行中
return nil
}
// Map 完成后分配 Reduce 任务
// ...
}
```
### 3.3 Worker (`worker.go`)
Worker 负责执行具体的 Map 和 Reduce 任务:
#### 主循环
```go
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := requestTask()
switch reply.TaskType {
case MapTask:
doMapTask(mapf, reply)
case ReduceTask:
doReduceTask(reducef, reply)
case WaitTask:
time.Sleep(100 * time.Millisecond)
case ExitTask:
return
}
}
}
```
#### Map 任务执行
```go
func doMapTask(mapf func(string, string) []KeyValue, task RequestTaskReply) {
// 1. 读取输入文件
content, _ := ioutil.ReadFile(task.Filename)
// 2. 调用 Map 函数
kva := mapf(task.Filename, string(content))
// 3. 按 key 的 hash 值分桶
buckets := make([][]KeyValue, task.NReduce)
for _, kv := range kva {
bucket := ihash(kv.Key) % task.NReduce
buckets[bucket] = append(buckets[bucket], kv)
}
// 4. 写入中间文件 mr-X-Y (X=mapID, Y=reduceID)
for reduceID, bucket := range buckets {
tmpFile, _ := ioutil.TempFile("", "mr-map-*")
enc := json.NewEncoder(tmpFile)
for _, kv := range bucket {
enc.Encode(&kv)
}
tmpFile.Close()
// 原子重命名
os.Rename(tmpFile.Name(), fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID))
}
// 5. 报告完成
reportTask(MapTask, task.TaskID, true)
}
```
#### Reduce 任务执行
```go
func doReduceTask(reducef func(string, []string) string, task RequestTaskReply) {
// 1. 读取所有相关的中间文件 mr-*-Y
var kva []KeyValue
for mapID := 0; mapID < task.NMap; mapID++ {
filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
// 读取并解码 JSON
}
// 2. 按 key 排序
sort.Sort(ByKey(kva))
// 3. 对每个 key 调用 Reduce 函数
for i < len(kva) {
// 收集相同 key 的所有 value
values := collectValues(kva, i, &j)
output := reducef(kva[i].Key, values)
fmt.Fprintf(tmpFile, "%v %v\n", kva[i].Key, output)
i = j
}
// 4. 原子重命名为 mr-out-Y
os.Rename(tmpFile.Name(), fmt.Sprintf("mr-out-%d", task.TaskID))
// 5. 报告完成
reportTask(ReduceTask, task.TaskID, true)
}
```
## 4. 关键设计决策
### 4.1 故障容错
- **超时机制**:任务分配后 10 秒未完成,自动重置为 Idle 状态
- **原子写入**:使用临时文件 + rename 确保崩溃时不产生部分写入的文件
- **幂等性**:相同任务可以被多次执行,最终结果一致
### 4.2 并发控制
- 使用 `sync.Mutex` 保护 Coordinator 的共享状态
- Worker 是无状态的,可以随时崩溃和重启
### 4.3 中间文件格式
- 文件名:`mr-X-Y`X = Map 任务 IDY = Reduce 任务 ID
- 编码JSON便于调试和兼容性
## 5. 测试结果
```
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting job count test.
--- job count test: PASS
*** Starting early exit test.
--- early exit test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS ✅
```
### 测试说明
| 测试名称 | 测试内容 |
|---------|---------|
| wc | 基本的 word count 功能 |
| indexer | 倒排索引功能 |
| map parallelism | Map 任务并行执行 |
| reduce parallelism | Reduce 任务并行执行 |
| job count | 确保每个 Map 任务只执行一次 |
| early exit | Worker 在任务完成前不会退出 |
| crash | Worker 崩溃时的容错恢复 |
## 6. 总结与收获
### 实现要点
1. **两阶段同步**Map 阶段必须全部完成才能开始 Reduce 阶段
2. **超时重试**:处理 Worker 崩溃的核心机制
3. **原子操作**:确保输出文件的一致性
### 学到的知识
- 分布式系统中的任务调度
- RPC 通信机制
- 故障容错设计模式
- Go 语言的并发编程
---
> 💡 这是 MIT 6.824 分布式系统课程的第一个 Lab为后续的 Raft 和分布式 KV 存储奠定了基础。

View File

@@ -1,519 +0,0 @@
# MIT 6.824 Lab2: Raft 实验报告
> 📅 日期2026-02-26
> 🔗 代码仓库https://git.rc707blog.top/rose_cat707/6.824-golabs-2021-6.824
> 🌿 分支answer/20260226
---
## 1. 实验概述
本实验要求实现 Raft 共识算法这是分布式系统中实现容错的关键组件。Raft 将共识问题分解为三个相对独立的子问题:
1. **领导者选举 (Leader Election)** - 2A
2. **日志复制 (Log Replication)** - 2B
3. **持久化 (Persistence)** - 2C
4. **快照 (Snapshots)** - 2D
---
## 2. 核心数据结构
### 2.1 服务器状态
```go
type Raft struct {
mu sync.Mutex // 保护共享状态的锁
peers []*labrpc.ClientEnd // 所有节点的 RPC 端点
persister *Persister // 持久化存储
me int // 当前节点索引
dead int32 // 是否被 Kill
// 持久状态 (Figure 2)
currentTerm int // 当前任期
votedFor int // 投票给谁
log []LogEntry // 日志条目
// 易失状态 - 所有节点
commitIndex int // 已提交的最高日志索引
lastApplied int // 已应用的最高日志索引
// 易失状态 - 仅领导者
nextIndex []int // 对每个节点,下一个要发送的日志索引
matchIndex []int // 对每个节点,已复制的最高日志索引
// 服务器状态
state int // Follower/Candidate/Leader
lastHeartbeat time.Time // 上次收到心跳的时间
}
```
### 2.2 日志条目
```go
type LogEntry struct {
Term int
Command interface{}
}
```
---
## 3. 2A: Leader Election
### 3.1 状态转换
```
┌─────────────┐ ┌─────────────┐
│ Follower │───超时无心跳──────→│ Candidate │
└─────────────┘ └─────────────┘
↑ │
│ │
│ ┌─────────────────────────┼─────────────────────────┐
│ │ │ │
│ ↓ ↓ ↓
│ ┌─────────────┐ ┌─────────────┐ 收到更高任期
└──│ Leader │←──获得多数票──│ Candidate │──────────────────┘
└─────────────┘ └─────────────┘
```
### 3.2 选举超时
```go
const (
HeartbeatInterval = 100 * time.Millisecond
ElectionTimeoutMin = 300 // ms
ElectionTimeoutMax = 500 // ms
)
```
随机化选举超时 (300-500ms) 避免同时选举。
### 3.3 选举流程
```go
func (rf *Raft) startElection() {
// 1. 转为 Candidate增加任期投自己一票
rf.state = Candidate
rf.currentTerm++
rf.votedFor = rf.me
rf.persist()
// 2. 并行向所有节点发送 RequestVote RPC
for i := range rf.peers {
go func(server int) {
args := RequestVoteArgs{
Term: term,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
reply := RequestVoteReply{}
ok := rf.sendRequestVote(server, &args, &reply)
// 处理投票响应...
}(i)
}
// 3. 如果获得多数票,成为 Leader
if votes > len(rf.peers)/2 {
rf.state = Leader
// 初始化 nextIndex 和 matchIndex
}
}
```
### 3.4 RequestVote RPC 处理
```go
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// 1. 拒绝过期任期的请求
if args.Term < rf.currentTerm {
return
}
// 2. 更新任期(如果请求方任期更高)
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = Follower
rf.persist()
}
// 3. 检查投票条件
// - 没投给别人,或投的就是这个候选人
// - 候选人日志至少和自己一样新
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.lastHeartbeat = time.Now() // 重置选举超时
rf.persist()
}
}
```
---
## 4. 2B: Log Replication
### 4.1 AppendEntries RPC
领导者通过心跳发送 AppendEntries RPC
```go
type AppendEntriesArgs struct {
Term int // 领导者任期
LeaderId int // 领导者 ID
PrevLogIndex int // 前一个日志索引
PrevLogTerm int // 前一个日志任期
Entries []LogEntry // 要追加的日志条目
LeaderCommit int // 领导者的 commitIndex
}
```
### 4.2 日志一致性检查
Follower 收到 AppendEntries 后:
1. 检查 `PrevLogIndex` 处的日志任期是否匹配 `PrevLogTerm`
2. 如果匹配,追加新条目(删除冲突条目)
3. 更新 `commitIndex`
### 4.3 快速回退优化 (Fast Backup)
传统方法:逐个递减 `nextIndex`,效率低。
优化方法Follower 在拒绝时返回额外信息:
```go
type AppendEntriesReply struct {
Term int
Success bool
XTerm int // 冲突条目的任期
XIndex int // 该任期的第一个索引
XLen int // 日志长度
}
```
领导者根据这些信息快速调整 `nextIndex`
```go
if reply.XTerm != 0 {
// 在自己日志中找 XTerm
found := false
for i := rf.getLastLogIndex(); i > rf.lastIncludedIndex; i-- {
if rf.getLogTerm(i) == reply.XTerm {
rf.nextIndex[server] = i + 1
found = true
break
}
}
if !found {
rf.nextIndex[server] = reply.XIndex // 跳到 XTerm 第一个索引
}
} else {
rf.nextIndex[server] = reply.XLen // Follower 日志太短
}
```
### 4.4 提交规则
**关键规则**:只能提交当前任期的日志条目!
```go
func (rf *Raft) updateCommitIndex() {
for n := rf.getLastLogIndex(); n > rf.commitIndex && n > rf.lastIncludedIndex; n-- {
if rf.getLogTerm(n) != rf.currentTerm {
continue // 只提交当前任期的日志
}
count := 1 // 自己
for i := range rf.peers {
if i != rf.me && rf.matchIndex[i] >= n {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = n
break
}
}
}
```
---
## 5. 2C: Persistence
### 5.1 需要持久化的状态
根据 Raft 论文 Figure 2
- `currentTerm` - 当前任期
- `votedFor` - 投票给谁
- `log[]` - 日志条目
### 5.2 持久化实现
```go
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex) // 2D 新增
e.Encode(rf.lastIncludedTerm) // 2D 新增
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm, votedFor, lastIncludedIndex, lastIncludedTerm int
var log []LogEntry
if d.Decode(&currentTerm) != nil ||
d.Decode(&votedFor) != nil ||
d.Decode(&log) != nil ||
d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&lastIncludedTerm) != nil {
// 解码失败
} else {
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.log = log
rf.lastIncludedIndex = lastIncludedIndex
rf.lastIncludedTerm = lastIncludedTerm
}
}
```
### 5.3 持久化时机
每次修改 `currentTerm``votedFor``log` 后都要调用 `persist()`
---
## 6. 2D: Snapshots
### 6.1 为什么需要快照?
随着日志增长,内存和磁盘占用会无限增长。快照机制可以:
- 截断已应用的日志
- 减少内存占用
- 加速新节点同步
### 6.2 快照数据结构
```go
type Raft struct {
// ... 其他字段
lastIncludedIndex int // 快照包含的最后日志索引
lastIncludedTerm int // 快照包含的最后日志任期
}
```
### 6.3 服务端创建快照
```go
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 边界检查
if index <= rf.lastIncludedIndex || index > rf.getLastLogIndex() {
return
}
// 先保存 term再截断日志
newLastIncludedTerm := rf.getLogTerm(index)
rf.log = rf.log[index-rf.lastIncludedIndex:]
rf.lastIncludedTerm = newLastIncludedTerm
rf.lastIncludedIndex = index
// 持久化状态和快照
rf.persister.SaveStateAndSnapshot(state, snapshot)
}
```
### 6.4 InstallSnapshot RPC
当日志落后于领导者时,领导者发送快照:
```go
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
}
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
// 1. 检查任期
if args.Term < rf.currentTerm {
return
}
// 2. 更新状态
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.persist()
}
// 3. 截断日志
if args.LastIncludedIndex > rf.lastIncludedIndex {
if args.LastIncludedIndex <= rf.getLastLogIndex() &&
rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm {
// 保留后面的日志
rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:]
} else {
rf.log = make([]LogEntry, 0)
}
rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
rf.persister.SaveStateAndSnapshot(state, args.Data)
// 发送到 applyCh
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotIndex: args.LastIncludedIndex,
SnapshotTerm: args.LastIncludedTerm,
}
}
}
```
---
## 7. 关键实现细节
### 7.1 并发安全
- 使用 `sync.Mutex` 保护所有共享状态
- 使用 `sync.Cond` 实现应用器的等待/唤醒机制
- 使用 `atomic` 实现 `Kill()``killed()`
### 7.2 避免活锁
- 随机化选举超时 (300-500ms)
- 心跳间隔 100ms远小于选举超时
### 7.3 正确处理任期
- 任期是 Raft 的核心,所有 RPC 都要检查任期
- 收到更高任期时,立即转为 Follower 并更新任期
### 7.4 日志索引处理
- 快照后的日志索引需要偏移计算
- `getLogTerm()``getLogEntry()` 统一处理索引转换
---
## 8. 测试结果
### 8.1 2A: Leader Election
```
=== RUN TestInitialElection2A
... Passed -- 3.1 3 54 14568 0
=== RUN TestReElection2A
... Passed -- 8.9 3 176 34056 0
=== RUN TestManyElections2A
... Passed -- 5.5 7 408 86416 0
PASS
```
### 8.2 2B: Log Replication
```
=== RUN TestBasicAgree2B
... Passed -- 1.0 3 16 4310 3
=== RUN TestRPCBytes2B
... Passed -- 2.8 3 48 113570 11
=== RUN TestFailAgree2B
... Passed -- 6.3 3 116 30885 8
=== RUN TestFailNoAgree2B
... Passed -- 3.8 5 148 34328 4
=== RUN TestConcurrentStarts2B
... Passed -- 0.8 3 14 3552 6
=== RUN TestRejoin2B
... Passed -- 4.5 3 138 31037 4
=== RUN TestBackup2B
... Passed -- 33.6 5 2120 1888317 102
=== RUN TestCount2B
... Passed -- 2.1 3 36 10066 12
PASS
```
### 8.3 2C: Persistence
```
=== RUN TestPersist12C
... Passed -- 4.1 3 76 18768 6
=== RUN TestPersist22C
... Passed -- 44.1 5 1812 370976 16
=== RUN TestPersist32C
... Passed -- 2.1 3 34 8370 4
=== RUN TestFigure82C
... Passed -- 34.7 5 308 60588 12
=== RUN TestUnreliableAgree2C
... Passed -- 8.5 5 308 96421 251
=== RUN TestFigure8Unreliable2C
... Passed -- 30.8 5 2216 12947486 895
=== RUN TestReliableChurn2C
... Passed -- 16.3 5 580 424371 315
=== RUN TestUnreliableChurn2C
... Passed -- 16.2 5 268 133185 64
PASS
```
### 8.4 2D: Snapshots
```
=== RUN TestSnapshotBasic2D
... Passed -- 7.7 3 136 48058 251
=== RUN TestSnapshotInstall2D
... Passed -- 66.2 3 1358 359800 321
=== RUN TestSnapshotInstallUnreliable2D
... Passed -- 88.0 3 1808 431965 341
=== RUN TestSnapshotInstallCrash2D
... Passed -- 37.9 3 658 183093 355
=== RUN TestSnapshotInstallUnCrash2D
... Passed -- 57.5 3 782 207519 388
PASS
```
---
## 9. 总结与收获
### 9.1 实现要点
1. **任期是核心**:所有操作都要正确处理任期变化
2. **随机化超时**:避免选举冲突
3. **快速回退**:优化日志同步效率
4. **正确持久化**:崩溃恢复的关键
5. **快照索引**:注意偏移量计算
### 9.2 调试经验
- 使用 `-race` 检测数据竞争
- 仔细阅读论文 Figure 2
- 处理边界条件(空日志、快照边界等)
- 测试中的超时时间很宽松,实际实现要更高效
### 9.3 后续工作
Lab3 将基于此 Raft 实现构建容错的 KV 服务。
---
> 💡 Raft 是分布式系统的基石,理解它的实现对于后续的分布式存储、分布式协调等系统至关重要。

View File

@@ -1,163 +1,18 @@
package mr
import (
"log"
"net"
"net/http"
"net/rpc"
"os"
"sync"
"time"
)
import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
// Task timeout duration (10 seconds)
const TaskTimeout = 10 * time.Second
type Task struct {
Type TaskType
ID int
Status TaskStatus
Filename string // For map task
StartTime time.Time // When task was assigned
}
type Coordinator struct {
mu sync.Mutex
// Your definitions here.
// Input files
files []string
// Number of reduce tasks
nReduce int
// Map tasks
mapTasks []Task
mapDone int
mapFinished bool
// Reduce tasks
reduceTasks []Task
reduceDone int
reduceFinished bool
}
// RequestTask - Worker requests a task
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
reply.NReduce = c.nReduce
reply.NMap = len(c.files)
// Check for timed out tasks and reset them
c.checkTimeouts()
// If map phase not done, assign map tasks
if !c.mapFinished {
for i := range c.mapTasks {
if c.mapTasks[i].Status == Idle {
c.mapTasks[i].Status = InProgress
c.mapTasks[i].StartTime = time.Now()
reply.TaskType = MapTask
reply.TaskID = i
reply.Filename = c.mapTasks[i].Filename
return nil
}
}
// All map tasks are in progress, tell worker to wait
reply.TaskType = WaitTask
return nil
}
// Map phase done, assign reduce tasks
if !c.reduceFinished {
for i := range c.reduceTasks {
if c.reduceTasks[i].Status == Idle {
c.reduceTasks[i].Status = InProgress
c.reduceTasks[i].StartTime = time.Now()
reply.TaskType = ReduceTask
reply.TaskID = i
return nil
}
}
// All reduce tasks are in progress, tell worker to wait
reply.TaskType = WaitTask
return nil
}
// All done, tell worker to exit
reply.TaskType = ExitTask
return nil
}
// ReportTask - Worker reports task completion
func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
reply.Acknowledged = true
if args.TaskType == MapTask {
if args.TaskID >= 0 && args.TaskID < len(c.mapTasks) {
if c.mapTasks[args.TaskID].Status == InProgress {
if args.Success {
c.mapTasks[args.TaskID].Status = Completed
c.mapDone++
if c.mapDone == len(c.mapTasks) {
c.mapFinished = true
}
} else {
// Failed, reset to idle for retry
c.mapTasks[args.TaskID].Status = Idle
}
}
}
} else if args.TaskType == ReduceTask {
if args.TaskID >= 0 && args.TaskID < len(c.reduceTasks) {
if c.reduceTasks[args.TaskID].Status == InProgress {
if args.Success {
c.reduceTasks[args.TaskID].Status = Completed
c.reduceDone++
if c.reduceDone == c.nReduce {
c.reduceFinished = true
}
} else {
// Failed, reset to idle for retry
c.reduceTasks[args.TaskID].Status = Idle
}
}
}
}
return nil
}
// checkTimeouts - Reset timed out tasks to idle (must hold lock)
func (c *Coordinator) checkTimeouts() {
now := time.Now()
if !c.mapFinished {
for i := range c.mapTasks {
if c.mapTasks[i].Status == InProgress {
if now.Sub(c.mapTasks[i].StartTime) > TaskTimeout {
c.mapTasks[i].Status = Idle
}
}
}
}
if !c.reduceFinished {
for i := range c.reduceTasks {
if c.reduceTasks[i].Status == InProgress {
if now.Sub(c.reduceTasks[i].StartTime) > TaskTimeout {
c.reduceTasks[i].Status = Idle
}
}
}
}
}
// Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
@@ -169,6 +24,7 @@ func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
@@ -190,10 +46,12 @@ func (c *Coordinator) server() {
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
c.mu.Lock()
defer c.mu.Unlock()
ret := false
return c.reduceFinished
// Your code here.
return ret
}
//
@@ -202,31 +60,10 @@ func (c *Coordinator) Done() bool {
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
files: files,
nReduce: nReduce,
}
c := Coordinator{}
// Initialize map tasks
c.mapTasks = make([]Task, len(files))
for i, file := range files {
c.mapTasks[i] = Task{
Type: MapTask,
ID: i,
Status: Idle,
Filename: file,
}
}
// Your code here.
// Initialize reduce tasks
c.reduceTasks = make([]Task, nReduce)
for i := 0; i < nReduce; i++ {
c.reduceTasks[i] = Task{
Type: ReduceTask,
ID: i,
Status: Idle,
}
}
c.server()
return &c

View File

@@ -22,52 +22,8 @@ type ExampleReply struct {
Y int
}
// Task types
type TaskType int
// Add your RPC definitions here.
const (
MapTask TaskType = 0
ReduceTask TaskType = 1
WaitTask TaskType = 2 // No task available, worker should wait
ExitTask TaskType = 3 // All tasks done, worker should exit
)
// Task status
type TaskStatus int
const (
Idle TaskStatus = 0
InProgress TaskStatus = 1
Completed TaskStatus = 2
)
// RequestTaskArgs - Worker requests a task
type RequestTaskArgs struct {
WorkerID int
}
// RequestTaskReply - Coordinator assigns a task
type RequestTaskReply struct {
TaskType TaskType
TaskID int
NReduce int // Number of reduce tasks
NMap int // Number of map tasks
Filename string // Input file for map task
MapTaskIDs []int // For reduce task: which map tasks to read from
}
// ReportTaskArgs - Worker reports task completion
type ReportTaskArgs struct {
WorkerID int
TaskType TaskType
TaskID int
Success bool
}
// ReportTaskReply - Coordinator acknowledges
type ReportTaskReply struct {
Acknowledged bool
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.

View File

@@ -1,16 +1,10 @@
package mr
import (
"encoding/json"
"fmt"
"hash/fnv"
"io/ioutil"
"log"
"net/rpc"
"os"
"sort"
"time"
)
import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
//
// Map functions return a slice of KeyValue.
@@ -20,13 +14,6 @@ type KeyValue struct {
Value string
}
// ByKey implements sort.Interface for []KeyValue
type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
@@ -37,185 +24,18 @@ func ihash(key string) int {
return int(h.Sum32() & 0x7fffffff)
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := requestTask()
// Your worker implementation here.
switch reply.TaskType {
case MapTask:
doMapTask(mapf, reply)
case ReduceTask:
doReduceTask(reducef, reply)
case WaitTask:
// No task available, wait a bit
time.Sleep(100 * time.Millisecond)
case ExitTask:
// All done, exit
return
}
}
}
// uncomment to send the Example RPC to the coordinator.
// CallExample()
// requestTask asks the coordinator for a task
func requestTask() RequestTaskReply {
args := RequestTaskArgs{}
reply := RequestTaskReply{}
ok := call("Coordinator.RequestTask", &args, &reply)
if !ok {
// Coordinator is gone, exit
reply.TaskType = ExitTask
}
return reply
}
// reportTask tells the coordinator that a task is done
func reportTask(taskType TaskType, taskID int, success bool) {
args := ReportTaskArgs{
TaskType: taskType,
TaskID: taskID,
Success: success,
}
reply := ReportTaskReply{}
call("Coordinator.ReportTask", &args, &reply)
}
// doMapTask executes a map task
func doMapTask(mapf func(string, string) []KeyValue, task RequestTaskReply) {
// Read input file
content, err := ioutil.ReadFile(task.Filename)
if err != nil {
log.Printf("cannot read %v: %v", task.Filename, err)
reportTask(MapTask, task.TaskID, false)
return
}
// Apply map function
kva := mapf(task.Filename, string(content))
// Partition into nReduce buckets
buckets := make([][]KeyValue, task.NReduce)
for i := range buckets {
buckets[i] = []KeyValue{}
}
for _, kv := range kva {
bucket := ihash(kv.Key) % task.NReduce
buckets[bucket] = append(buckets[bucket], kv)
}
// Write intermediate files
for reduceID, bucket := range buckets {
// Use temp file for atomic write
tmpFile, err := ioutil.TempFile("", "mr-map-*")
if err != nil {
log.Printf("cannot create temp file: %v", err)
reportTask(MapTask, task.TaskID, false)
return
}
enc := json.NewEncoder(tmpFile)
for _, kv := range bucket {
if err := enc.Encode(&kv); err != nil {
log.Printf("cannot encode kv: %v", err)
tmpFile.Close()
os.Remove(tmpFile.Name())
reportTask(MapTask, task.TaskID, false)
return
}
}
tmpFile.Close()
// Atomic rename
outName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)
if err := os.Rename(tmpFile.Name(), outName); err != nil {
log.Printf("cannot rename temp file: %v", err)
os.Remove(tmpFile.Name())
reportTask(MapTask, task.TaskID, false)
return
}
}
reportTask(MapTask, task.TaskID, true)
}
// doReduceTask executes a reduce task
func doReduceTask(reducef func(string, []string) string, task RequestTaskReply) {
// Read all intermediate files for this reduce task
var kva []KeyValue
for mapID := 0; mapID < task.NMap; mapID++ {
filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
file, err := os.Open(filename)
if err != nil {
// File might not exist if map task produced no output for this reduce
continue
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
}
// Sort by key
sort.Sort(ByKey(kva))
// Create temp output file
tmpFile, err := ioutil.TempFile("", "mr-reduce-*")
if err != nil {
log.Printf("cannot create temp file: %v", err)
reportTask(ReduceTask, task.TaskID, false)
return
}
// Apply reduce function to each distinct key
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
// Collect all values for this key
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
// Apply reduce function
output := reducef(kva[i].Key, values)
// Write output
fmt.Fprintf(tmpFile, "%v %v\n", kva[i].Key, output)
i = j
}
tmpFile.Close()
// Atomic rename
outName := fmt.Sprintf("mr-out-%d", task.TaskID)
if err := os.Rename(tmpFile.Name(), outName); err != nil {
log.Printf("cannot rename temp file: %v", err)
os.Remove(tmpFile.Name())
reportTask(ReduceTask, task.TaskID, false)
return
}
reportTask(ReduceTask, task.TaskID, true)
}
//
@@ -251,7 +71,7 @@ func call(rpcname string, args interface{}, reply interface{}) bool {
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
return false
log.Fatal("dialing:", err)
}
defer c.Close()

View File

@@ -18,32 +18,26 @@ package raft
//
import (
"bytes"
"math/rand"
// "bytes"
"sync"
"sync/atomic"
"time"
"6.824/labgob"
// "6.824/labgob"
"6.824/labrpc"
)
// Server states
const (
Follower = 0
Candidate = 1
Leader = 2
)
// Timing constants
const (
HeartbeatInterval = 100 * time.Millisecond
ElectionTimeoutMin = 300
ElectionTimeoutMax = 500
)
// ApplyMsg - as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or tester)
//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {
CommandValid bool
Command interface{}
@@ -56,13 +50,9 @@ type ApplyMsg struct {
SnapshotIndex int
}
// LogEntry represents a log entry
type LogEntry struct {
Term int
Command interface{}
}
// Raft - A Go object implementing a single Raft peer.
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
@@ -70,401 +60,180 @@ type Raft struct {
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Persistent state on all servers (Figure 2)
currentTerm int
votedFor int
log []LogEntry
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// Volatile state on all servers
commitIndex int
lastApplied int
// Volatile state on leaders
nextIndex []int
matchIndex []int
// Additional state
state int
lastHeartbeat time.Time
applyCh chan ApplyMsg
applyCond *sync.Cond
// Snapshot state (2D)
lastIncludedIndex int
lastIncludedTerm int
}
// GetState returns currentTerm and whether this server believes it is the leader.
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.currentTerm, rf.state == Leader
var term int
var isleader bool
// Your code here (2A).
return term, isleader
}
// getLastLogIndex returns the index of the last log entry
func (rf *Raft) getLastLogIndex() int {
return rf.lastIncludedIndex + len(rf.log)
}
// getLastLogTerm returns the term of the last log entry
func (rf *Raft) getLastLogTerm() int {
if len(rf.log) == 0 {
return rf.lastIncludedTerm
}
return rf.log[len(rf.log)-1].Term
}
// getLogEntry returns the log entry at the given index
func (rf *Raft) getLogEntry(index int) LogEntry {
return rf.log[index-rf.lastIncludedIndex-1]
}
// getLogTerm returns the term of the log entry at the given index
func (rf *Raft) getLogTerm(index int) int {
if index == rf.lastIncludedIndex {
return rf.lastIncludedTerm
}
return rf.log[index-rf.lastIncludedIndex-1].Term
}
// persist - save Raft's persistent state to stable storage
//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
data := w.Bytes()
rf.persister.SaveRaftState(data)
// Your code here (2C).
// Example:
// w := new(bytes.Buffer)
// e := labgob.NewEncoder(w)
// e.Encode(rf.xxx)
// e.Encode(rf.yyy)
// data := w.Bytes()
// rf.persister.SaveRaftState(data)
}
// readPersist - restore previously persisted state
//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var votedFor int
var log []LogEntry
var lastIncludedIndex int
var lastIncludedTerm int
if d.Decode(&currentTerm) != nil ||
d.Decode(&votedFor) != nil ||
d.Decode(&log) != nil ||
d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&lastIncludedTerm) != nil {
return
}
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.log = log
rf.lastIncludedIndex = lastIncludedIndex
rf.lastIncludedTerm = lastIncludedTerm
rf.lastApplied = lastIncludedIndex
rf.commitIndex = lastIncludedIndex
// Your code here (2C).
// Example:
// r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r)
// var xxx
// var yyy
// if d.Decode(&xxx) != nil ||
// d.Decode(&yyy) != nil {
// error...
// } else {
// rf.xxx = xxx
// rf.yyy = yyy
// }
}
// CondInstallSnapshot - A service wants to switch to snapshot
//
// A service wants to switch to snapshot. Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Deprecated in newer version, always return true
// Your code here (2D).
return true
}
// Snapshot - the service says it has created a snapshot
// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
// Your code here (2D).
if index <= rf.lastIncludedIndex || index > rf.getLastLogIndex() {
return
}
// Get term BEFORE trimming log
newLastIncludedTerm := rf.getLogTerm(index)
// Trim log - keep entries from index+1 onwards
rf.log = rf.log[index-rf.lastIncludedIndex:]
rf.lastIncludedTerm = newLastIncludedTerm
rf.lastIncludedIndex = index
// Persist state and snapshot
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
rf.persister.SaveStateAndSnapshot(w.Bytes(), snapshot)
}
// InstallSnapshot RPC arguments
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
}
// InstallSnapshot RPC reply
type InstallSnapshotReply struct {
Term int
}
// InstallSnapshot RPC handler
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = Follower
rf.persist()
}
rf.lastHeartbeat = time.Now()
if args.LastIncludedIndex <= rf.lastIncludedIndex {
return
}
// If existing log entry has same index and term as snapshot's last included entry,
// retain log entries following it
if args.LastIncludedIndex <= rf.getLastLogIndex() &&
rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm {
rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:]
} else {
rf.log = make([]LogEntry, 0)
}
rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
if rf.commitIndex < args.LastIncludedIndex {
rf.commitIndex = args.LastIncludedIndex
}
if rf.lastApplied < args.LastIncludedIndex {
rf.lastApplied = args.LastIncludedIndex
}
// Save state and snapshot
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
rf.persister.SaveStateAndSnapshot(w.Bytes(), args.Data)
// Send snapshot to applyCh
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
}
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
return ok
}
// RequestVote RPC arguments structure
//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
// Your data here (2A, 2B).
}
// RequestVote RPC reply structure
//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
Term int
VoteGranted bool
// Your data here (2A).
}
// RequestVote RPC handler
//
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.VoteGranted = false
// Reply false if term < currentTerm
if args.Term < rf.currentTerm {
return
}
// If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = Follower
rf.persist()
}
reply.Term = rf.currentTerm
// If votedFor is null or candidateId, and candidate's log is at least as up-to-date
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
// Check if candidate's log is at least as up-to-date as receiver's log
lastLogTerm := rf.getLastLogTerm()
lastLogIndex := rf.getLastLogIndex()
if args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.lastHeartbeat = time.Now()
rf.persist()
}
}
// Your code here (2A, 2B).
}
//
// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return. Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
//
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
// AppendEntries RPC arguments
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
// AppendEntries RPC reply
type AppendEntriesReply struct {
Term int
Success bool
// For fast backup
XTerm int // Term in the conflicting entry (if any)
XIndex int // Index of first entry with XTerm
XLen int // Log length
}
// AppendEntries RPC handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.Success = false
// Reply false if term < currentTerm
if args.Term < rf.currentTerm {
return
}
// If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.persist()
}
rf.state = Follower
rf.lastHeartbeat = time.Now()
// Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm
if args.PrevLogIndex < rf.lastIncludedIndex {
reply.XLen = rf.getLastLogIndex() + 1
return
}
if args.PrevLogIndex > rf.getLastLogIndex() {
reply.XLen = rf.getLastLogIndex() + 1
return
}
if args.PrevLogIndex > rf.lastIncludedIndex && rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm {
reply.XTerm = rf.getLogTerm(args.PrevLogIndex)
// Find first index with XTerm
for i := rf.lastIncludedIndex + 1; i <= args.PrevLogIndex; i++ {
if rf.getLogTerm(i) == reply.XTerm {
reply.XIndex = i
break
}
}
return
}
// Append any new entries not already in the log
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index <= rf.lastIncludedIndex {
continue
}
if index <= rf.getLastLogIndex() {
if rf.getLogTerm(index) != entry.Term {
// Conflict: delete the existing entry and all that follow it
rf.log = rf.log[:index-rf.lastIncludedIndex-1]
rf.log = append(rf.log, entry)
}
} else {
rf.log = append(rf.log, entry)
}
}
rf.persist()
reply.Success = true
// If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
lastNewEntry := args.PrevLogIndex + len(args.Entries)
if args.LeaderCommit < lastNewEntry {
rf.commitIndex = args.LeaderCommit
} else {
rf.commitIndex = lastNewEntry
}
rf.applyCond.Signal()
}
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
// Start - the service using Raft wants to start agreement on the next command
//
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
//
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := -1
isLeader := true
if rf.state != Leader {
return -1, -1, false
// Your code here (2B).
return index, term, isLeader
}
index := rf.getLastLogIndex() + 1
term := rf.currentTerm
rf.log = append(rf.log, LogEntry{Term: term, Command: command})
rf.persist()
return index, term, true
}
// Kill - the tester doesn't halt goroutines created by Raft after each test
//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
func (rf *Raft) killed() bool {
@@ -472,328 +241,29 @@ func (rf *Raft) killed() bool {
return z == 1
}
// ticker - starts a new election if this peer hasn't received heartbeats recently
// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
for rf.killed() == false {
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
switch state {
case Follower, Candidate:
rf.checkElectionTimeout()
case Leader:
rf.sendHeartbeats()
}
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
time.Sleep(10 * time.Millisecond)
}
}
// checkElectionTimeout checks if election timeout has elapsed
func (rf *Raft) checkElectionTimeout() {
timeout := time.Duration(ElectionTimeoutMin+rand.Intn(ElectionTimeoutMax-ElectionTimeoutMin)) * time.Millisecond
rf.mu.Lock()
elapsed := time.Since(rf.lastHeartbeat)
rf.mu.Unlock()
if elapsed >= timeout {
rf.startElection()
}
}
// startElection starts a new election
func (rf *Raft) startElection() {
rf.mu.Lock()
rf.state = Candidate
rf.currentTerm++
rf.votedFor = rf.me
rf.lastHeartbeat = time.Now()
rf.persist()
term := rf.currentTerm
lastLogIndex := rf.getLastLogIndex()
lastLogTerm := rf.getLastLogTerm()
rf.mu.Unlock()
votes := 1
finished := 1
var mu sync.Mutex
cond := sync.NewCond(&mu)
for i := range rf.peers {
if i == rf.me {
continue
}
go func(server int) {
args := RequestVoteArgs{
Term: term,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
reply := RequestVoteReply{}
ok := rf.sendRequestVote(server, &args, &reply)
mu.Lock()
defer mu.Unlock()
if ok {
rf.mu.Lock()
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
rf.persist()
}
rf.mu.Unlock()
if reply.VoteGranted {
votes++
}
}
finished++
cond.Broadcast()
}(i)
}
mu.Lock()
for votes <= len(rf.peers)/2 && finished < len(rf.peers) {
cond.Wait()
}
mu.Unlock()
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != Candidate || rf.currentTerm != term {
return
}
if votes > len(rf.peers)/2 {
rf.state = Leader
// Initialize nextIndex and matchIndex
for i := range rf.peers {
rf.nextIndex[i] = rf.getLastLogIndex() + 1
rf.matchIndex[i] = 0
}
rf.matchIndex[rf.me] = rf.getLastLogIndex()
}
}
// sendHeartbeats sends heartbeats to all peers
func (rf *Raft) sendHeartbeats() {
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}
rf.mu.Unlock()
for i := range rf.peers {
if i == rf.me {
continue
}
go rf.sendAppendEntriesToPeer(i)
}
time.Sleep(HeartbeatInterval)
}
// sendAppendEntriesToPeer sends AppendEntries RPC to a peer
func (rf *Raft) sendAppendEntriesToPeer(server int) {
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}
// Check if we need to send snapshot
if rf.nextIndex[server] <= rf.lastIncludedIndex {
rf.mu.Unlock()
rf.sendSnapshotToPeer(server)
return
}
prevLogIndex := rf.nextIndex[server] - 1
prevLogTerm := 0
if prevLogIndex == rf.lastIncludedIndex {
prevLogTerm = rf.lastIncludedTerm
} else if prevLogIndex > rf.lastIncludedIndex {
prevLogTerm = rf.getLogTerm(prevLogIndex)
}
entries := make([]LogEntry, 0)
if rf.nextIndex[server] <= rf.getLastLogIndex() {
entries = append(entries, rf.log[rf.nextIndex[server]-rf.lastIncludedIndex-1:]...)
}
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: rf.commitIndex,
}
term := rf.currentTerm
rf.mu.Unlock()
reply := AppendEntriesReply{}
ok := rf.sendAppendEntries(server, &args, &reply)
if !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != Leader || rf.currentTerm != term {
return
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
rf.persist()
return
}
if reply.Success {
newMatchIndex := args.PrevLogIndex + len(args.Entries)
if newMatchIndex > rf.matchIndex[server] {
rf.matchIndex[server] = newMatchIndex
}
rf.nextIndex[server] = rf.matchIndex[server] + 1
rf.updateCommitIndex()
} else {
// Fast backup
if reply.XTerm != 0 {
// Find last entry with XTerm
found := false
for i := rf.getLastLogIndex(); i > rf.lastIncludedIndex; i-- {
if rf.getLogTerm(i) == reply.XTerm {
rf.nextIndex[server] = i + 1
found = true
break
}
}
if !found {
rf.nextIndex[server] = reply.XIndex
}
} else {
rf.nextIndex[server] = reply.XLen
}
if rf.nextIndex[server] < 1 {
rf.nextIndex[server] = 1
}
}
}
// sendSnapshotToPeer sends InstallSnapshot RPC to a peer
func (rf *Raft) sendSnapshotToPeer(server int) {
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludedIndex: rf.lastIncludedIndex,
LastIncludedTerm: rf.lastIncludedTerm,
Data: rf.persister.ReadSnapshot(),
}
term := rf.currentTerm
rf.mu.Unlock()
reply := InstallSnapshotReply{}
ok := rf.sendInstallSnapshot(server, &args, &reply)
if !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != Leader || rf.currentTerm != term {
return
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
rf.persist()
return
}
rf.nextIndex[server] = args.LastIncludedIndex + 1
rf.matchIndex[server] = args.LastIncludedIndex
}
// updateCommitIndex updates commitIndex based on matchIndex
func (rf *Raft) updateCommitIndex() {
// Find N such that N > commitIndex, a majority of matchIndex[i] >= N,
// and log[N].term == currentTerm
for n := rf.getLastLogIndex(); n > rf.commitIndex && n > rf.lastIncludedIndex; n-- {
if rf.getLogTerm(n) != rf.currentTerm {
continue
}
count := 1
for i := range rf.peers {
if i != rf.me && rf.matchIndex[i] >= n {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = n
rf.applyCond.Signal()
break
}
}
}
// applier applies committed log entries to the state machine
func (rf *Raft) applier() {
for rf.killed() == false {
rf.mu.Lock()
for rf.lastApplied >= rf.commitIndex {
rf.applyCond.Wait()
}
commitIndex := rf.commitIndex
lastApplied := rf.lastApplied
entries := make([]LogEntry, 0)
for i := lastApplied + 1; i <= commitIndex; i++ {
if i > rf.lastIncludedIndex {
entries = append(entries, rf.getLogEntry(i))
}
}
rf.mu.Unlock()
for i, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandIndex: lastApplied + 1 + i,
}
}
rf.mu.Lock()
if rf.lastApplied < commitIndex {
rf.lastApplied = commitIndex
}
rf.mu.Unlock()
}
}
// Make - the service or tester wants to create a Raft server
//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
@@ -801,33 +271,14 @@ func Make(peers []*labrpc.ClientEnd, me int,
rf.persister = persister
rf.me = me
// Initialize state
rf.currentTerm = 0
rf.votedFor = -1
rf.log = make([]LogEntry, 0)
// Your initialization code here (2A, 2B, 2C).
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
rf.state = Follower
rf.lastHeartbeat = time.Now()
rf.applyCh = applyCh
rf.applyCond = sync.NewCond(&rf.mu)
rf.lastIncludedIndex = 0
rf.lastIncludedTerm = 0
// Initialize from state persisted before a crash
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// Start ticker goroutine to start elections
// start ticker goroutine to start elections
go rf.ticker()
// Start applier goroutine
go rf.applier()
return rf
}