feat: complete Lab1 MapReduce implementation

- Implemented Coordinator with task scheduling and timeout handling
- Implemented Worker with Map and Reduce task execution
- Added atomic file writes for crash recovery
- All 7 tests passed: wc, indexer, map parallelism, reduce parallelism, job count, early exit, crash
This commit is contained in:
青黛
2026-02-26 08:45:54 +00:00
parent 8be50722e8
commit 9455bfb2a3
3 changed files with 415 additions and 28 deletions

View File

@@ -1,18 +1,163 @@
package mr package mr
import "log" import (
import "net" "log"
import "os" "net"
import "net/rpc" "net/http"
import "net/http" "net/rpc"
"os"
"sync"
"time"
)
// Task timeout duration (10 seconds)
const TaskTimeout = 10 * time.Second
type Coordinator struct { type Task struct {
// Your definitions here. Type TaskType
ID int
Status TaskStatus
Filename string // For map task
StartTime time.Time // When task was assigned
} }
// Your code here -- RPC handlers for the worker to call. type Coordinator struct {
mu sync.Mutex
// Input files
files []string
// Number of reduce tasks
nReduce int
// Map tasks
mapTasks []Task
mapDone int
mapFinished bool
// Reduce tasks
reduceTasks []Task
reduceDone int
reduceFinished bool
}
// RequestTask - Worker requests a task
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
reply.NReduce = c.nReduce
reply.NMap = len(c.files)
// Check for timed out tasks and reset them
c.checkTimeouts()
// If map phase not done, assign map tasks
if !c.mapFinished {
for i := range c.mapTasks {
if c.mapTasks[i].Status == Idle {
c.mapTasks[i].Status = InProgress
c.mapTasks[i].StartTime = time.Now()
reply.TaskType = MapTask
reply.TaskID = i
reply.Filename = c.mapTasks[i].Filename
return nil
}
}
// All map tasks are in progress, tell worker to wait
reply.TaskType = WaitTask
return nil
}
// Map phase done, assign reduce tasks
if !c.reduceFinished {
for i := range c.reduceTasks {
if c.reduceTasks[i].Status == Idle {
c.reduceTasks[i].Status = InProgress
c.reduceTasks[i].StartTime = time.Now()
reply.TaskType = ReduceTask
reply.TaskID = i
return nil
}
}
// All reduce tasks are in progress, tell worker to wait
reply.TaskType = WaitTask
return nil
}
// All done, tell worker to exit
reply.TaskType = ExitTask
return nil
}
// ReportTask - Worker reports task completion
func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
reply.Acknowledged = true
if args.TaskType == MapTask {
if args.TaskID >= 0 && args.TaskID < len(c.mapTasks) {
if c.mapTasks[args.TaskID].Status == InProgress {
if args.Success {
c.mapTasks[args.TaskID].Status = Completed
c.mapDone++
if c.mapDone == len(c.mapTasks) {
c.mapFinished = true
}
} else {
// Failed, reset to idle for retry
c.mapTasks[args.TaskID].Status = Idle
}
}
}
} else if args.TaskType == ReduceTask {
if args.TaskID >= 0 && args.TaskID < len(c.reduceTasks) {
if c.reduceTasks[args.TaskID].Status == InProgress {
if args.Success {
c.reduceTasks[args.TaskID].Status = Completed
c.reduceDone++
if c.reduceDone == c.nReduce {
c.reduceFinished = true
}
} else {
// Failed, reset to idle for retry
c.reduceTasks[args.TaskID].Status = Idle
}
}
}
}
return nil
}
// checkTimeouts - Reset timed out tasks to idle (must hold lock)
func (c *Coordinator) checkTimeouts() {
now := time.Now()
if !c.mapFinished {
for i := range c.mapTasks {
if c.mapTasks[i].Status == InProgress {
if now.Sub(c.mapTasks[i].StartTime) > TaskTimeout {
c.mapTasks[i].Status = Idle
}
}
}
}
if !c.reduceFinished {
for i := range c.reduceTasks {
if c.reduceTasks[i].Status == InProgress {
if now.Sub(c.reduceTasks[i].StartTime) > TaskTimeout {
c.reduceTasks[i].Status = Idle
}
}
}
}
}
// //
// an example RPC handler. // an example RPC handler.
@@ -24,7 +169,6 @@ func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
return nil return nil
} }
// //
// start a thread that listens for RPCs from worker.go // start a thread that listens for RPCs from worker.go
// //
@@ -46,12 +190,10 @@ func (c *Coordinator) server() {
// if the entire job has finished. // if the entire job has finished.
// //
func (c *Coordinator) Done() bool { func (c *Coordinator) Done() bool {
ret := false c.mu.Lock()
defer c.mu.Unlock()
// Your code here. return c.reduceFinished
return ret
} }
// //
@@ -60,10 +202,31 @@ func (c *Coordinator) Done() bool {
// nReduce is the number of reduce tasks to use. // nReduce is the number of reduce tasks to use.
// //
func MakeCoordinator(files []string, nReduce int) *Coordinator { func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{} c := Coordinator{
files: files,
nReduce: nReduce,
}
// Your code here. // Initialize map tasks
c.mapTasks = make([]Task, len(files))
for i, file := range files {
c.mapTasks[i] = Task{
Type: MapTask,
ID: i,
Status: Idle,
Filename: file,
}
}
// Initialize reduce tasks
c.reduceTasks = make([]Task, nReduce)
for i := 0; i < nReduce; i++ {
c.reduceTasks[i] = Task{
Type: ReduceTask,
ID: i,
Status: Idle,
}
}
c.server() c.server()
return &c return &c

View File

@@ -22,8 +22,52 @@ type ExampleReply struct {
Y int Y int
} }
// Add your RPC definitions here. // Task types
type TaskType int
const (
MapTask TaskType = 0
ReduceTask TaskType = 1
WaitTask TaskType = 2 // No task available, worker should wait
ExitTask TaskType = 3 // All tasks done, worker should exit
)
// Task status
type TaskStatus int
const (
Idle TaskStatus = 0
InProgress TaskStatus = 1
Completed TaskStatus = 2
)
// RequestTaskArgs - Worker requests a task
type RequestTaskArgs struct {
WorkerID int
}
// RequestTaskReply - Coordinator assigns a task
type RequestTaskReply struct {
TaskType TaskType
TaskID int
NReduce int // Number of reduce tasks
NMap int // Number of map tasks
Filename string // Input file for map task
MapTaskIDs []int // For reduce task: which map tasks to read from
}
// ReportTaskArgs - Worker reports task completion
type ReportTaskArgs struct {
WorkerID int
TaskType TaskType
TaskID int
Success bool
}
// ReportTaskReply - Coordinator acknowledges
type ReportTaskReply struct {
Acknowledged bool
}
// Cook up a unique-ish UNIX-domain socket name // Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator. // in /var/tmp, for the coordinator.

View File

@@ -1,10 +1,16 @@
package mr package mr
import "fmt" import (
import "log" "encoding/json"
import "net/rpc" "fmt"
import "hash/fnv" "hash/fnv"
"io/ioutil"
"log"
"net/rpc"
"os"
"sort"
"time"
)
// //
// Map functions return a slice of KeyValue. // Map functions return a slice of KeyValue.
@@ -14,6 +20,13 @@ type KeyValue struct {
Value string Value string
} }
// ByKey implements sort.Interface for []KeyValue
type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// //
// use ihash(key) % NReduce to choose the reduce // use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map. // task number for each KeyValue emitted by Map.
@@ -24,18 +37,185 @@ func ihash(key string) int {
return int(h.Sum32() & 0x7fffffff) return int(h.Sum32() & 0x7fffffff)
} }
// //
// main/mrworker.go calls this function. // main/mrworker.go calls this function.
// //
func Worker(mapf func(string, string) []KeyValue, func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) { reducef func(string, []string) string) {
// Your worker implementation here. for {
reply := requestTask()
// uncomment to send the Example RPC to the coordinator. switch reply.TaskType {
// CallExample() case MapTask:
doMapTask(mapf, reply)
case ReduceTask:
doReduceTask(reducef, reply)
case WaitTask:
// No task available, wait a bit
time.Sleep(100 * time.Millisecond)
case ExitTask:
// All done, exit
return
}
}
}
// requestTask asks the coordinator for a task
func requestTask() RequestTaskReply {
args := RequestTaskArgs{}
reply := RequestTaskReply{}
ok := call("Coordinator.RequestTask", &args, &reply)
if !ok {
// Coordinator is gone, exit
reply.TaskType = ExitTask
}
return reply
}
// reportTask tells the coordinator that a task is done
func reportTask(taskType TaskType, taskID int, success bool) {
args := ReportTaskArgs{
TaskType: taskType,
TaskID: taskID,
Success: success,
}
reply := ReportTaskReply{}
call("Coordinator.ReportTask", &args, &reply)
}
// doMapTask executes a map task
func doMapTask(mapf func(string, string) []KeyValue, task RequestTaskReply) {
// Read input file
content, err := ioutil.ReadFile(task.Filename)
if err != nil {
log.Printf("cannot read %v: %v", task.Filename, err)
reportTask(MapTask, task.TaskID, false)
return
}
// Apply map function
kva := mapf(task.Filename, string(content))
// Partition into nReduce buckets
buckets := make([][]KeyValue, task.NReduce)
for i := range buckets {
buckets[i] = []KeyValue{}
}
for _, kv := range kva {
bucket := ihash(kv.Key) % task.NReduce
buckets[bucket] = append(buckets[bucket], kv)
}
// Write intermediate files
for reduceID, bucket := range buckets {
// Use temp file for atomic write
tmpFile, err := ioutil.TempFile("", "mr-map-*")
if err != nil {
log.Printf("cannot create temp file: %v", err)
reportTask(MapTask, task.TaskID, false)
return
}
enc := json.NewEncoder(tmpFile)
for _, kv := range bucket {
if err := enc.Encode(&kv); err != nil {
log.Printf("cannot encode kv: %v", err)
tmpFile.Close()
os.Remove(tmpFile.Name())
reportTask(MapTask, task.TaskID, false)
return
}
}
tmpFile.Close()
// Atomic rename
outName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)
if err := os.Rename(tmpFile.Name(), outName); err != nil {
log.Printf("cannot rename temp file: %v", err)
os.Remove(tmpFile.Name())
reportTask(MapTask, task.TaskID, false)
return
}
}
reportTask(MapTask, task.TaskID, true)
}
// doReduceTask executes a reduce task
func doReduceTask(reducef func(string, []string) string, task RequestTaskReply) {
// Read all intermediate files for this reduce task
var kva []KeyValue
for mapID := 0; mapID < task.NMap; mapID++ {
filename := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
file, err := os.Open(filename)
if err != nil {
// File might not exist if map task produced no output for this reduce
continue
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
}
// Sort by key
sort.Sort(ByKey(kva))
// Create temp output file
tmpFile, err := ioutil.TempFile("", "mr-reduce-*")
if err != nil {
log.Printf("cannot create temp file: %v", err)
reportTask(ReduceTask, task.TaskID, false)
return
}
// Apply reduce function to each distinct key
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
// Collect all values for this key
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
// Apply reduce function
output := reducef(kva[i].Key, values)
// Write output
fmt.Fprintf(tmpFile, "%v %v\n", kva[i].Key, output)
i = j
}
tmpFile.Close()
// Atomic rename
outName := fmt.Sprintf("mr-out-%d", task.TaskID)
if err := os.Rename(tmpFile.Name(), outName); err != nil {
log.Printf("cannot rename temp file: %v", err)
os.Remove(tmpFile.Name())
reportTask(ReduceTask, task.TaskID, false)
return
}
reportTask(ReduceTask, task.TaskID, true)
} }
// //
@@ -71,7 +251,7 @@ func call(rpcname string, args interface{}, reply interface{}) bool {
sockname := coordinatorSock() sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname) c, err := rpc.DialHTTP("unix", sockname)
if err != nil { if err != nil {
log.Fatal("dialing:", err) return false
} }
defer c.Close() defer c.Close()