From 8dc5fea24d6de62b18feb2c5726e97609fe67ae1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9D=92=E9=BB=9B?= Date: Thu, 26 Feb 2026 09:14:19 +0000 Subject: [PATCH] docs: add Lab1 MapReduce experiment report --- docs/answers/lab1-mapreduce.md | 277 +++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 docs/answers/lab1-mapreduce.md diff --git a/docs/answers/lab1-mapreduce.md b/docs/answers/lab1-mapreduce.md new file mode 100644 index 0000000..0af9150 --- /dev/null +++ b/docs/answers/lab1-mapreduce.md @@ -0,0 +1,277 @@ +# MIT 6.824 Lab1: MapReduce 实验报告 + +> 📅 日期:2026-02-26 +> 🔗 代码仓库:https://git.rc707blog.top/rose_cat707/6.824-golabs-2021-6.824 +> 🌿 分支:answer/20260226 + +--- + +## 1. 实验概述 + +本实验要求实现一个分布式 MapReduce 系统,包括 Coordinator(协调者)和 Worker(工作者)两个核心组件。系统需要能够: + +- 将 Map 和 Reduce 任务分配给多个 Worker 并行执行 +- 处理 Worker 崩溃的情况(故障容错) +- 确保输出文件的原子性写入 + +## 2. 系统架构 + +``` +┌──────────────────────────────────────────────────────────┐ +│ Coordinator │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ +│ │ Map Tasks │ │ Reduce Tasks│ │ Timeout Checker │ │ +│ │ (8 files) │ │ (10 tasks) │ │ (10 seconds) │ │ +│ └─────────────┘ └─────────────┘ └─────────────────┘ │ +└──────────────────────────────────────────────────────────┘ + │ RequestTask/ReportTask (RPC) + ▼ +┌─────────┐ ┌─────────┐ ┌─────────┐ +│ Worker1 │ │ Worker2 │ │ Worker3 │ +└─────────┘ └─────────┘ └─────────┘ +``` + +## 3. 核心实现 + +### 3.1 RPC 定义 (`rpc.go`) + +定义了 Worker 和 Coordinator 之间的通信协议: + +```go +// 任务类型 +type TaskType int +const ( + MapTask TaskType = 0 // Map 任务 + ReduceTask TaskType = 1 // Reduce 任务 + WaitTask TaskType = 2 // 等待(无可用任务) + ExitTask TaskType = 3 // 退出(全部完成) +) + +// 任务状态 +type TaskStatus int +const ( + Idle TaskStatus = 0 // 空闲 + InProgress TaskStatus = 1 // 进行中 + Completed TaskStatus = 2 // 已完成 +) +``` + +### 3.2 Coordinator (`coordinator.go`) + +Coordinator 负责任务调度和状态管理: + +#### 数据结构 + +```go +type Coordinator struct { + mu sync.Mutex // 保护并发访问 + + files []string // 输入文件列表 + nReduce int // Reduce 任务数量 + + mapTasks []Task // Map 任务列表 + mapDone int // 已完成的 Map 任务数 + mapFinished bool // Map 阶段是否完成 + + reduceTasks []Task // Reduce 任务列表 + reduceDone int // 已完成的 Reduce 任务数 + reduceFinished bool // Reduce 阶段是否完成 +} +``` + +#### 任务分配策略 + +1. **两阶段执行**:先完成所有 Map 任务,再开始 Reduce 任务 +2. **超时重试**:任务分配 10 秒后未完成则重新分配(处理 Worker 崩溃) +3. **状态追踪**:每个任务有 Idle → InProgress → Completed 的状态转换 + +```go +func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error { + c.mu.Lock() + defer c.mu.Unlock() + + // 检查超时任务 + c.checkTimeouts() + + // 优先分配 Map 任务 + if !c.mapFinished { + for i := range c.mapTasks { + if c.mapTasks[i].Status == Idle { + // 分配任务并记录开始时间 + c.mapTasks[i].Status = InProgress + c.mapTasks[i].StartTime = time.Now() + reply.TaskType = MapTask + reply.TaskID = i + reply.Filename = c.mapTasks[i].Filename + return nil + } + } + reply.TaskType = WaitTask // 所有任务都在进行中 + return nil + } + + // Map 完成后分配 Reduce 任务 + // ... +} +``` + +### 3.3 Worker (`worker.go`) + +Worker 负责执行具体的 Map 和 Reduce 任务: + +#### 主循环 + +```go +func Worker(mapf func(string, string) []KeyValue, + reducef func(string, []string) string) { + + for { + reply := requestTask() + + switch reply.TaskType { + case MapTask: + doMapTask(mapf, reply) + case ReduceTask: + doReduceTask(reducef, reply) + case WaitTask: + time.Sleep(100 * time.Millisecond) + case ExitTask: + return + } + } +} +``` + +#### Map 任务执行 + +```go +func doMapTask(mapf func(string, string) []KeyValue, task RequestTaskReply) { + // 1. 读取输入文件 + content, _ := ioutil.ReadFile(task.Filename) + + // 2. 调用 Map 函数 + kva := mapf(task.Filename, string(content)) + + // 3. 按 key 的 hash 值分桶 + buckets := make([][]KeyValue, task.NReduce) + for _, kv := range kva { + bucket := ihash(kv.Key) % task.NReduce + buckets[bucket] = append(buckets[bucket], kv) + } + + // 4. 写入中间文件 mr-X-Y (X=mapID, Y=reduceID) + for reduceID, bucket := range buckets { + tmpFile, _ := ioutil.TempFile("", "mr-map-*") + enc := json.NewEncoder(tmpFile) + for _, kv := range bucket { + enc.Encode(&kv) + } + tmpFile.Close() + // 原子重命名 + os.Rename(tmpFile.Name(), fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)) + } + + // 5. 报告完成 + reportTask(MapTask, task.TaskID, true) +} +``` + +#### Reduce 任务执行 + +```go +func doReduceTask(reducef func(string, []string) string, task RequestTaskReply) { + // 1. 读取所有相关的中间文件 mr-*-Y + var kva []KeyValue + for mapID := 0; mapID < task.NMap; mapID++ { + filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID) + // 读取并解码 JSON + } + + // 2. 按 key 排序 + sort.Sort(ByKey(kva)) + + // 3. 对每个 key 调用 Reduce 函数 + for i < len(kva) { + // 收集相同 key 的所有 value + values := collectValues(kva, i, &j) + output := reducef(kva[i].Key, values) + fmt.Fprintf(tmpFile, "%v %v\n", kva[i].Key, output) + i = j + } + + // 4. 原子重命名为 mr-out-Y + os.Rename(tmpFile.Name(), fmt.Sprintf("mr-out-%d", task.TaskID)) + + // 5. 报告完成 + reportTask(ReduceTask, task.TaskID, true) +} +``` + +## 4. 关键设计决策 + +### 4.1 故障容错 + +- **超时机制**:任务分配后 10 秒未完成,自动重置为 Idle 状态 +- **原子写入**:使用临时文件 + rename 确保崩溃时不产生部分写入的文件 +- **幂等性**:相同任务可以被多次执行,最终结果一致 + +### 4.2 并发控制 + +- 使用 `sync.Mutex` 保护 Coordinator 的共享状态 +- Worker 是无状态的,可以随时崩溃和重启 + +### 4.3 中间文件格式 + +- 文件名:`mr-X-Y`(X = Map 任务 ID,Y = Reduce 任务 ID) +- 编码:JSON(便于调试和兼容性) + +## 5. 测试结果 + +``` +*** Starting wc test. +--- wc test: PASS +*** Starting indexer test. +--- indexer test: PASS +*** Starting map parallelism test. +--- map parallelism test: PASS +*** Starting reduce parallelism test. +--- reduce parallelism test: PASS +*** Starting job count test. +--- job count test: PASS +*** Starting early exit test. +--- early exit test: PASS +*** Starting crash test. +--- crash test: PASS +*** PASSED ALL TESTS ✅ +``` + +### 测试说明 + +| 测试名称 | 测试内容 | +|---------|---------| +| wc | 基本的 word count 功能 | +| indexer | 倒排索引功能 | +| map parallelism | Map 任务并行执行 | +| reduce parallelism | Reduce 任务并行执行 | +| job count | 确保每个 Map 任务只执行一次 | +| early exit | Worker 在任务完成前不会退出 | +| crash | Worker 崩溃时的容错恢复 | + +## 6. 总结与收获 + +### 实现要点 + +1. **两阶段同步**:Map 阶段必须全部完成才能开始 Reduce 阶段 +2. **超时重试**:处理 Worker 崩溃的核心机制 +3. **原子操作**:确保输出文件的一致性 + +### 学到的知识 + +- 分布式系统中的任务调度 +- RPC 通信机制 +- 故障容错设计模式 +- Go 语言的并发编程 + +--- + +> 💡 这是 MIT 6.824 分布式系统课程的第一个 Lab,为后续的 Raft 和分布式 KV 存储奠定了基础。