# MIT 6.824 Lab1: MapReduce 实验报告 > 📅 日期:2026-02-26 > 🔗 代码仓库:https://git.rc707blog.top/rose_cat707/6.824-golabs-2021-6.824 > 🌿 分支:answer/20260226 --- ## 1. 实验概述 本实验要求实现一个分布式 MapReduce 系统,包括 Coordinator(协调者)和 Worker(工作者)两个核心组件。系统需要能够: - 将 Map 和 Reduce 任务分配给多个 Worker 并行执行 - 处理 Worker 崩溃的情况(故障容错) - 确保输出文件的原子性写入 ## 2. 系统架构 ``` ┌──────────────────────────────────────────────────────────┐ │ Coordinator │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ │ Map Tasks │ │ Reduce Tasks│ │ Timeout Checker │ │ │ │ (8 files) │ │ (10 tasks) │ │ (10 seconds) │ │ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ └──────────────────────────────────────────────────────────┘ │ RequestTask/ReportTask (RPC) ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Worker1 │ │ Worker2 │ │ Worker3 │ └─────────┘ └─────────┘ └─────────┘ ``` ## 3. 核心实现 ### 3.1 RPC 定义 (`rpc.go`) 定义了 Worker 和 Coordinator 之间的通信协议: ```go // 任务类型 type TaskType int const ( MapTask TaskType = 0 // Map 任务 ReduceTask TaskType = 1 // Reduce 任务 WaitTask TaskType = 2 // 等待(无可用任务) ExitTask TaskType = 3 // 退出(全部完成) ) // 任务状态 type TaskStatus int const ( Idle TaskStatus = 0 // 空闲 InProgress TaskStatus = 1 // 进行中 Completed TaskStatus = 2 // 已完成 ) ``` ### 3.2 Coordinator (`coordinator.go`) Coordinator 负责任务调度和状态管理: #### 数据结构 ```go type Coordinator struct { mu sync.Mutex // 保护并发访问 files []string // 输入文件列表 nReduce int // Reduce 任务数量 mapTasks []Task // Map 任务列表 mapDone int // 已完成的 Map 任务数 mapFinished bool // Map 阶段是否完成 reduceTasks []Task // Reduce 任务列表 reduceDone int // 已完成的 Reduce 任务数 reduceFinished bool // Reduce 阶段是否完成 } ``` #### 任务分配策略 1. **两阶段执行**:先完成所有 Map 任务,再开始 Reduce 任务 2. **超时重试**:任务分配 10 秒后未完成则重新分配(处理 Worker 崩溃) 3. **状态追踪**:每个任务有 Idle → InProgress → Completed 的状态转换 ```go func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error { c.mu.Lock() defer c.mu.Unlock() // 检查超时任务 c.checkTimeouts() // 优先分配 Map 任务 if !c.mapFinished { for i := range c.mapTasks { if c.mapTasks[i].Status == Idle { // 分配任务并记录开始时间 c.mapTasks[i].Status = InProgress c.mapTasks[i].StartTime = time.Now() reply.TaskType = MapTask reply.TaskID = i reply.Filename = c.mapTasks[i].Filename return nil } } reply.TaskType = WaitTask // 所有任务都在进行中 return nil } // Map 完成后分配 Reduce 任务 // ... } ``` ### 3.3 Worker (`worker.go`) Worker 负责执行具体的 Map 和 Reduce 任务: #### 主循环 ```go func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { for { reply := requestTask() switch reply.TaskType { case MapTask: doMapTask(mapf, reply) case ReduceTask: doReduceTask(reducef, reply) case WaitTask: time.Sleep(100 * time.Millisecond) case ExitTask: return } } } ``` #### Map 任务执行 ```go func doMapTask(mapf func(string, string) []KeyValue, task RequestTaskReply) { // 1. 读取输入文件 content, _ := ioutil.ReadFile(task.Filename) // 2. 调用 Map 函数 kva := mapf(task.Filename, string(content)) // 3. 按 key 的 hash 值分桶 buckets := make([][]KeyValue, task.NReduce) for _, kv := range kva { bucket := ihash(kv.Key) % task.NReduce buckets[bucket] = append(buckets[bucket], kv) } // 4. 写入中间文件 mr-X-Y (X=mapID, Y=reduceID) for reduceID, bucket := range buckets { tmpFile, _ := ioutil.TempFile("", "mr-map-*") enc := json.NewEncoder(tmpFile) for _, kv := range bucket { enc.Encode(&kv) } tmpFile.Close() // 原子重命名 os.Rename(tmpFile.Name(), fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)) } // 5. 报告完成 reportTask(MapTask, task.TaskID, true) } ``` #### Reduce 任务执行 ```go func doReduceTask(reducef func(string, []string) string, task RequestTaskReply) { // 1. 读取所有相关的中间文件 mr-*-Y var kva []KeyValue for mapID := 0; mapID < task.NMap; mapID++ { filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID) // 读取并解码 JSON } // 2. 按 key 排序 sort.Sort(ByKey(kva)) // 3. 对每个 key 调用 Reduce 函数 for i < len(kva) { // 收集相同 key 的所有 value values := collectValues(kva, i, &j) output := reducef(kva[i].Key, values) fmt.Fprintf(tmpFile, "%v %v\n", kva[i].Key, output) i = j } // 4. 原子重命名为 mr-out-Y os.Rename(tmpFile.Name(), fmt.Sprintf("mr-out-%d", task.TaskID)) // 5. 报告完成 reportTask(ReduceTask, task.TaskID, true) } ``` ## 4. 关键设计决策 ### 4.1 故障容错 - **超时机制**:任务分配后 10 秒未完成,自动重置为 Idle 状态 - **原子写入**:使用临时文件 + rename 确保崩溃时不产生部分写入的文件 - **幂等性**:相同任务可以被多次执行,最终结果一致 ### 4.2 并发控制 - 使用 `sync.Mutex` 保护 Coordinator 的共享状态 - Worker 是无状态的,可以随时崩溃和重启 ### 4.3 中间文件格式 - 文件名:`mr-X-Y`(X = Map 任务 ID,Y = Reduce 任务 ID) - 编码:JSON(便于调试和兼容性) ## 5. 测试结果 ``` *** Starting wc test. --- wc test: PASS *** Starting indexer test. --- indexer test: PASS *** Starting map parallelism test. --- map parallelism test: PASS *** Starting reduce parallelism test. --- reduce parallelism test: PASS *** Starting job count test. --- job count test: PASS *** Starting early exit test. --- early exit test: PASS *** Starting crash test. --- crash test: PASS *** PASSED ALL TESTS ✅ ``` ### 测试说明 | 测试名称 | 测试内容 | |---------|---------| | wc | 基本的 word count 功能 | | indexer | 倒排索引功能 | | map parallelism | Map 任务并行执行 | | reduce parallelism | Reduce 任务并行执行 | | job count | 确保每个 Map 任务只执行一次 | | early exit | Worker 在任务完成前不会退出 | | crash | Worker 崩溃时的容错恢复 | ## 6. 总结与收获 ### 实现要点 1. **两阶段同步**:Map 阶段必须全部完成才能开始 Reduce 阶段 2. **超时重试**:处理 Worker 崩溃的核心机制 3. **原子操作**:确保输出文件的一致性 ### 学到的知识 - 分布式系统中的任务调度 - RPC 通信机制 - 故障容错设计模式 - Go 语言的并发编程 --- > 💡 这是 MIT 6.824 分布式系统课程的第一个 Lab,为后续的 Raft 和分布式 KV 存储奠定了基础。