From 9455bfb2a319092419815b2e8de41074e61678f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9D=92=E9=BB=9B?= Date: Thu, 26 Feb 2026 08:45:54 +0000 Subject: [PATCH] feat: complete Lab1 MapReduce implementation - Implemented Coordinator with task scheduling and timeout handling - Implemented Worker with Map and Reduce task execution - Added atomic file writes for crash recovery - All 7 tests passed: wc, indexer, map parallelism, reduce parallelism, job count, early exit, crash --- src/mr/coordinator.go | 197 +++++++++++++++++++++++++++++++++++++---- src/mr/rpc.go | 46 +++++++++- src/mr/worker.go | 200 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 415 insertions(+), 28 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index cafda57..3d5c78c 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,18 +1,163 @@ package mr -import "log" -import "net" -import "os" -import "net/rpc" -import "net/http" +import ( + "log" + "net" + "net/http" + "net/rpc" + "os" + "sync" + "time" +) +// Task timeout duration (10 seconds) +const TaskTimeout = 10 * time.Second -type Coordinator struct { - // Your definitions here. - +type Task struct { + Type TaskType + ID int + Status TaskStatus + Filename string // For map task + StartTime time.Time // When task was assigned } -// Your code here -- RPC handlers for the worker to call. +type Coordinator struct { + mu sync.Mutex + + // Input files + files []string + + // Number of reduce tasks + nReduce int + + // Map tasks + mapTasks []Task + mapDone int + mapFinished bool + + // Reduce tasks + reduceTasks []Task + reduceDone int + reduceFinished bool +} + +// RequestTask - Worker requests a task +func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error { + c.mu.Lock() + defer c.mu.Unlock() + + reply.NReduce = c.nReduce + reply.NMap = len(c.files) + + // Check for timed out tasks and reset them + c.checkTimeouts() + + // If map phase not done, assign map tasks + if !c.mapFinished { + for i := range c.mapTasks { + if c.mapTasks[i].Status == Idle { + c.mapTasks[i].Status = InProgress + c.mapTasks[i].StartTime = time.Now() + + reply.TaskType = MapTask + reply.TaskID = i + reply.Filename = c.mapTasks[i].Filename + return nil + } + } + // All map tasks are in progress, tell worker to wait + reply.TaskType = WaitTask + return nil + } + + // Map phase done, assign reduce tasks + if !c.reduceFinished { + for i := range c.reduceTasks { + if c.reduceTasks[i].Status == Idle { + c.reduceTasks[i].Status = InProgress + c.reduceTasks[i].StartTime = time.Now() + + reply.TaskType = ReduceTask + reply.TaskID = i + return nil + } + } + // All reduce tasks are in progress, tell worker to wait + reply.TaskType = WaitTask + return nil + } + + // All done, tell worker to exit + reply.TaskType = ExitTask + return nil +} + +// ReportTask - Worker reports task completion +func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error { + c.mu.Lock() + defer c.mu.Unlock() + + reply.Acknowledged = true + + if args.TaskType == MapTask { + if args.TaskID >= 0 && args.TaskID < len(c.mapTasks) { + if c.mapTasks[args.TaskID].Status == InProgress { + if args.Success { + c.mapTasks[args.TaskID].Status = Completed + c.mapDone++ + if c.mapDone == len(c.mapTasks) { + c.mapFinished = true + } + } else { + // Failed, reset to idle for retry + c.mapTasks[args.TaskID].Status = Idle + } + } + } + } else if args.TaskType == ReduceTask { + if args.TaskID >= 0 && args.TaskID < len(c.reduceTasks) { + if c.reduceTasks[args.TaskID].Status == InProgress { + if args.Success { + c.reduceTasks[args.TaskID].Status = Completed + c.reduceDone++ + if c.reduceDone == c.nReduce { + c.reduceFinished = true + } + } else { + // Failed, reset to idle for retry + c.reduceTasks[args.TaskID].Status = Idle + } + } + } + } + + return nil +} + +// checkTimeouts - Reset timed out tasks to idle (must hold lock) +func (c *Coordinator) checkTimeouts() { + now := time.Now() + + if !c.mapFinished { + for i := range c.mapTasks { + if c.mapTasks[i].Status == InProgress { + if now.Sub(c.mapTasks[i].StartTime) > TaskTimeout { + c.mapTasks[i].Status = Idle + } + } + } + } + + if !c.reduceFinished { + for i := range c.reduceTasks { + if c.reduceTasks[i].Status == InProgress { + if now.Sub(c.reduceTasks[i].StartTime) > TaskTimeout { + c.reduceTasks[i].Status = Idle + } + } + } + } +} // // an example RPC handler. @@ -24,7 +169,6 @@ func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { return nil } - // // start a thread that listens for RPCs from worker.go // @@ -46,12 +190,10 @@ func (c *Coordinator) server() { // if the entire job has finished. // func (c *Coordinator) Done() bool { - ret := false + c.mu.Lock() + defer c.mu.Unlock() - // Your code here. - - - return ret + return c.reduceFinished } // @@ -60,10 +202,31 @@ func (c *Coordinator) Done() bool { // nReduce is the number of reduce tasks to use. // func MakeCoordinator(files []string, nReduce int) *Coordinator { - c := Coordinator{} + c := Coordinator{ + files: files, + nReduce: nReduce, + } - // Your code here. + // Initialize map tasks + c.mapTasks = make([]Task, len(files)) + for i, file := range files { + c.mapTasks[i] = Task{ + Type: MapTask, + ID: i, + Status: Idle, + Filename: file, + } + } + // Initialize reduce tasks + c.reduceTasks = make([]Task, nReduce) + for i := 0; i < nReduce; i++ { + c.reduceTasks[i] = Task{ + Type: ReduceTask, + ID: i, + Status: Idle, + } + } c.server() return &c diff --git a/src/mr/rpc.go b/src/mr/rpc.go index abffa81..f3e457b 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -22,8 +22,52 @@ type ExampleReply struct { Y int } -// Add your RPC definitions here. +// Task types +type TaskType int +const ( + MapTask TaskType = 0 + ReduceTask TaskType = 1 + WaitTask TaskType = 2 // No task available, worker should wait + ExitTask TaskType = 3 // All tasks done, worker should exit +) + +// Task status +type TaskStatus int + +const ( + Idle TaskStatus = 0 + InProgress TaskStatus = 1 + Completed TaskStatus = 2 +) + +// RequestTaskArgs - Worker requests a task +type RequestTaskArgs struct { + WorkerID int +} + +// RequestTaskReply - Coordinator assigns a task +type RequestTaskReply struct { + TaskType TaskType + TaskID int + NReduce int // Number of reduce tasks + NMap int // Number of map tasks + Filename string // Input file for map task + MapTaskIDs []int // For reduce task: which map tasks to read from +} + +// ReportTaskArgs - Worker reports task completion +type ReportTaskArgs struct { + WorkerID int + TaskType TaskType + TaskID int + Success bool +} + +// ReportTaskReply - Coordinator acknowledges +type ReportTaskReply struct { + Acknowledged bool +} // Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. diff --git a/src/mr/worker.go b/src/mr/worker.go index 243b768..6199658 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -1,10 +1,16 @@ package mr -import "fmt" -import "log" -import "net/rpc" -import "hash/fnv" - +import ( + "encoding/json" + "fmt" + "hash/fnv" + "io/ioutil" + "log" + "net/rpc" + "os" + "sort" + "time" +) // // Map functions return a slice of KeyValue. @@ -14,6 +20,13 @@ type KeyValue struct { Value string } +// ByKey implements sort.Interface for []KeyValue +type ByKey []KeyValue + +func (a ByKey) Len() int { return len(a) } +func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + // // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. @@ -24,18 +37,185 @@ func ihash(key string) int { return int(h.Sum32() & 0x7fffffff) } - // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { - // Your worker implementation here. + for { + reply := requestTask() - // uncomment to send the Example RPC to the coordinator. - // CallExample() + switch reply.TaskType { + case MapTask: + doMapTask(mapf, reply) + case ReduceTask: + doReduceTask(reducef, reply) + case WaitTask: + // No task available, wait a bit + time.Sleep(100 * time.Millisecond) + case ExitTask: + // All done, exit + return + } + } +} +// requestTask asks the coordinator for a task +func requestTask() RequestTaskReply { + args := RequestTaskArgs{} + reply := RequestTaskReply{} + + ok := call("Coordinator.RequestTask", &args, &reply) + if !ok { + // Coordinator is gone, exit + reply.TaskType = ExitTask + } + + return reply +} + +// reportTask tells the coordinator that a task is done +func reportTask(taskType TaskType, taskID int, success bool) { + args := ReportTaskArgs{ + TaskType: taskType, + TaskID: taskID, + Success: success, + } + reply := ReportTaskReply{} + + call("Coordinator.ReportTask", &args, &reply) +} + +// doMapTask executes a map task +func doMapTask(mapf func(string, string) []KeyValue, task RequestTaskReply) { + // Read input file + content, err := ioutil.ReadFile(task.Filename) + if err != nil { + log.Printf("cannot read %v: %v", task.Filename, err) + reportTask(MapTask, task.TaskID, false) + return + } + + // Apply map function + kva := mapf(task.Filename, string(content)) + + // Partition into nReduce buckets + buckets := make([][]KeyValue, task.NReduce) + for i := range buckets { + buckets[i] = []KeyValue{} + } + + for _, kv := range kva { + bucket := ihash(kv.Key) % task.NReduce + buckets[bucket] = append(buckets[bucket], kv) + } + + // Write intermediate files + for reduceID, bucket := range buckets { + // Use temp file for atomic write + tmpFile, err := ioutil.TempFile("", "mr-map-*") + if err != nil { + log.Printf("cannot create temp file: %v", err) + reportTask(MapTask, task.TaskID, false) + return + } + + enc := json.NewEncoder(tmpFile) + for _, kv := range bucket { + if err := enc.Encode(&kv); err != nil { + log.Printf("cannot encode kv: %v", err) + tmpFile.Close() + os.Remove(tmpFile.Name()) + reportTask(MapTask, task.TaskID, false) + return + } + } + tmpFile.Close() + + // Atomic rename + outName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID) + if err := os.Rename(tmpFile.Name(), outName); err != nil { + log.Printf("cannot rename temp file: %v", err) + os.Remove(tmpFile.Name()) + reportTask(MapTask, task.TaskID, false) + return + } + } + + reportTask(MapTask, task.TaskID, true) +} + +// doReduceTask executes a reduce task +func doReduceTask(reducef func(string, []string) string, task RequestTaskReply) { + // Read all intermediate files for this reduce task + var kva []KeyValue + + for mapID := 0; mapID < task.NMap; mapID++ { + filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID) + file, err := os.Open(filename) + if err != nil { + // File might not exist if map task produced no output for this reduce + continue + } + + dec := json.NewDecoder(file) + for { + var kv KeyValue + if err := dec.Decode(&kv); err != nil { + break + } + kva = append(kva, kv) + } + file.Close() + } + + // Sort by key + sort.Sort(ByKey(kva)) + + // Create temp output file + tmpFile, err := ioutil.TempFile("", "mr-reduce-*") + if err != nil { + log.Printf("cannot create temp file: %v", err) + reportTask(ReduceTask, task.TaskID, false) + return + } + + // Apply reduce function to each distinct key + i := 0 + for i < len(kva) { + j := i + 1 + for j < len(kva) && kva[j].Key == kva[i].Key { + j++ + } + + // Collect all values for this key + values := []string{} + for k := i; k < j; k++ { + values = append(values, kva[k].Value) + } + + // Apply reduce function + output := reducef(kva[i].Key, values) + + // Write output + fmt.Fprintf(tmpFile, "%v %v\n", kva[i].Key, output) + + i = j + } + + tmpFile.Close() + + // Atomic rename + outName := fmt.Sprintf("mr-out-%d", task.TaskID) + if err := os.Rename(tmpFile.Name(), outName); err != nil { + log.Printf("cannot rename temp file: %v", err) + os.Remove(tmpFile.Name()) + reportTask(ReduceTask, task.TaskID, false) + return + } + + reportTask(ReduceTask, task.TaskID, true) } // @@ -71,7 +251,7 @@ func call(rpcname string, args interface{}, reply interface{}) bool { sockname := coordinatorSock() c, err := rpc.DialHTTP("unix", sockname) if err != nil { - log.Fatal("dialing:", err) + return false } defer c.Close()