# 6.5840 Lab 1: MapReduce ## 简介 在本实验中你将实现一个 MapReduce 系统。你需要实现:调用应用 Map 和 Reduce 函数并负责读写文件的 worker 进程,以及向 worker 分配任务并应对 worker 失败的 coordinator 进程。你将实现与 [MapReduce 论文](../papers/mapreduce-cn.md) 中类似的内容。(注:本实验使用 "coordinator" 代替论文中的 "master"。) ## 起步 你需要先安装并配置 Go 才能完成实验。 用 git 获取初始实验代码。进一步了解 git 可参阅 [Pro Git 书](https://git-scm.com/book/en/v2) 或 [git 用户手册](https://git-scm.com/docs/user-manual)。 ```bash $ git clone git://g.csail.mit.edu/6.5840-golabs-2026 6.5840 $ cd 6.5840 $ ls Makefile src $ ``` 我们在 `src/main/mrsequential.go` 中提供了一个简单的单进程顺序 MapReduce 实现,在一个进程内逐个执行 map 和 reduce。我们还提供了若干 MapReduce 应用:`mrapps/wc.go` 中的词频统计,以及 `mrapps/indexer.go` 中的文本索引。可以按如下方式顺序运行词频统计: ```bash $ cd ~/6.5840 $ cd src/main $ go build -buildmode=plugin ../mrapps/wc.go $ rm mr-out* $ go run mrsequential.go wc.so pg*.txt $ sort mr-out-0 A 509 ABOUT 2 ACT 8 ACTRESS 1 ... ``` (若希望 sort 产生上述输出,可能需设置环境变量 `LC_COLLATE=C`:`LC_COLLATE=C sort mr-out-0`) `mrsequential.go` 将输出写入文件 `mr-out-0`。输入来自名为 `pg-xxx.txt` 的文本文件。 可以复用 `mrsequential.go` 中的代码。也可查看 `mrapps/wc.go` 了解 MapReduce 应用代码的形式。 对本实验及后续实验,我们可能会对提供的代码进行更新。为便于用 `git pull` 获取并合并更新,建议保留我们提供的代码在原始文件中。你可以按实验说明在现有代码上增补,但不要移动它们。可以把你自己新增的函数放在新文件中。 ## 你的任务 你的任务是实现一个分布式 MapReduce,包含两个程序:**coordinator** 和 **worker**。只有一个 coordinator 进程,以及一个或多个并行运行的 worker 进程。在实际系统中 worker 会运行在多台机器上,本实验中将全部在同一台机器上运行。Worker 通过 RPC 与 coordinator 通信。每个 worker 进程循环向 coordinator 请求任务,从若干文件中读取该任务的输入,执行任务,将输出写入若干文件,然后再次向 coordinator 请求新任务。Coordinator 应在合理时间内(本实验为十秒)检测 worker 是否未完成任务,并将同一任务交给其他 worker。 我们已提供少量起步代码。Coordinator 和 worker 的 "main" 例程在 `main/mrcoordinator.go` 和 `main/mrworker.go` 中;**不要修改这两个文件**。你的实现应放在 `mr/coordinator.go`、`mr/worker.go` 和 `mr/rpc.go` 中。 在词频统计 MapReduce 应用上运行你的代码的步骤。首先构建词频统计插件: ```bash $ cd main $ go build -buildmode=plugin ../mrapps/wc.go ``` 在一个终端中运行 coordinator: ```bash $ rm mr-out* $ go run mrcoordinator.go sock123 pg-*.txt ``` 参数 `sock123` 指定 coordinator 接收 worker RPC 的 socket。传给 `mrcoordinator.go` 的 `pg-*.txt` 是输入文件;每个文件对应一个 "split",即一个 Map 任务的输入。 在另一个或多个终端中运行若干 worker: ```bash $ go run mrworker.go wc.so sock123 ``` 当 worker 和 coordinator 都结束后,查看 `mr-out-*` 中的输出。完成实验后,对所有输出文件排序后的并集应与顺序实现的输出一致,例如: ```bash $ cat mr-out-* | sort | more A 509 ABOUT 2 ACT 8 ACTRESS 1 ... ``` 我们提供了批改时将使用的全部测试。测试源码在 `mr/mr_test.go`。可在 src 目录下运行测试: ```bash $ cd src $ make mr ... ``` 测试会检查:在给定 pg-xxx.txt 作为输入时,wc 和 indexer 两个 MapReduce 应用是否产生正确输出;你的实现是否并行执行 Map 和 Reduce 任务;以及是否能在运行任务的 worker 崩溃后恢复。 若现在运行测试,会在第一个测试中卡住: ```bash $ cd ~/6.5840/src $ make mr ... cd mr; go test -v -race === RUN TestWc ... ``` 你可以把 `mr/coordinator.go` 中 `Done` 函数里的 `ret := false` 改为 `true`,这样 coordinator 会立即退出。然后: ```bash $ make mr ... === RUN TestWc 2026/01/22 14:56:24 reduce created no mr-out-X output files! exit status 1 FAIL 6.5840/mr 4.516s make: *** [Makefile:44: mr] Error 1 $ ``` 测试期望看到名为 `mr-out-X` 的输出文件,每个 reduce 任务一个。`mr/coordinator.go` 和 `mr/worker.go` 的空实现不会生成这些文件(也几乎不做别的事),因此测试会失败。 完成后,测试输出应类似: ```bash $ make mr ... === RUN TestWc --- PASS: TestWc (8.64s) === RUN TestIndexer --- PASS: TestIndexer (5.90s) === RUN TestMapParallel --- PASS: TestMapParallel (7.05s) === RUN TestReduceParallel --- PASS: TestReduceParallel (8.05s) === RUN TestJobCount --- PASS: TestJobCount (10.04s) === RUN TestEarlyExit --- PASS: TestEarlyExit (6.05s) === RUN TestCrashWorker 2026/01/22 14:58:14 *re*-starting map ../../main/pg-tom_sawyer.txt 0 2026/01/22 14:58:14 *re*-starting map ../../main/pg-metamorphosis.txt 2 2026/01/22 14:58:39 *re*-starting map ../../main/pg-metamorphosis.txt 2 2026/01/22 14:58:40 map 2 already done 2026/01/22 14:58:45 *re*-starting reduce 0 --- PASS: TestCrashWorker (40.18s) PASS ok 6.5840/mr 86.932s $ ``` 根据你终止 worker 进程的方式,可能会看到类似错误: ``` 2026/02/11 16:21:32 dialing:dial unix /var/tmp/5840-mr-501: connect: connection refused ``` 每个测试中出现少量这类消息是可以的;它们出现在 coordinator 已退出后 worker 无法联系到 coordinator 的 RPC 服务时。 ## 若干规则 - Map 阶段应将中间 key 划分到 **nReduce** 个 reduce 任务的桶中,其中 nReduce 是 reduce 任务数量,即 `main/mrcoordinator.go` 传给 `MakeCoordinator()` 的参数。每个 mapper 应为 reduce 任务创建 nReduce 个中间文件。 - Worker 实现应把第 X 个 reduce 任务的输出放在文件 **mr-out-X** 中。 - **mr-out-X** 文件应包含 Reduce 函数输出的每一行。该行应由 Go 的 `"%v %v"` 格式生成,传入 key 和 value。可参考 `main/mrsequential.go` 中注释为 "this is the correct format" 的那行。若你的实现与该格式偏差过大,测试会失败。 - 可以修改 `mr/worker.go`、`mr/coordinator.go` 和 `mr/rpc.go`。可以临时修改其他文件做测试,但须确保在原始版本下你的代码能正确运行;我们会用原始版本测试。 - Worker 应将 Map 的中间输出放在当前目录的文件中,以便之后在 Reduce 任务中读取。 - `main/mrcoordinator.go` 期望 `mr/coordinator.go` 实现 **Done()** 方法,在 MapReduce 作业完全结束时返回 true;此时 mrcoordinator.go 会退出。 - 当作业完全结束时,worker 进程应退出。一种简单做法是利用 `call()` 的返回值:若 worker 无法联系到 coordinator,可认为 coordinator 因作业结束已退出,于是 worker 也可终止。根据你的设计,也可以让 coordinator 给 worker 一个 "please exit" 的伪任务。 ## 提示 - [Guidance](./2.%20Lab%20Guidance-cn.md) 页有一些开发和调试建议。 - 一种起步方式是修改 `mr/worker.go` 的 `Worker()`,向 coordinator 发 RPC 请求任务。然后修改 coordinator,用尚未开始的 map 任务的文件名回复。再修改 worker 读取该文件并调用应用的 Map 函数,如 `mrsequential.go` 中所示。 - 应用的 Map 和 Reduce 函数在运行时通过 Go 的 plugin 包从以 `.so` 结尾的文件加载。 - 若修改了 `mr/` 目录下的任何内容,很可能需要重新构建所用 MapReduce 插件,例如 `go build -buildmode=plugin ../mrapps/wc.go`。`make mr` 会为你构建插件。可用 `make RUN="-run Wc" mr` 运行单个测试,该命令会把 `-run Wc` 传给 go test,只运行 `mr/mr_test.go` 中匹配 Wc 的测试。 - 本实验依赖 worker 共享文件系统。所有 worker 在同一台机器上时很简单;若 worker 在不同机器上,则需要 GFS 之类的全局文件系统。 - 中间文件的合理命名是 **mr-X-Y**,其中 X 为 Map 任务编号,Y 为 reduce 任务编号。 - Worker 的 map 任务代码需要一种方式将中间 key/value 对写入文件,以便在 reduce 任务中正确读回。一种做法是使用 Go 的 `encoding/json` 包。将 key/value 对以 JSON 格式写入已打开的文件: ```go enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv) } ``` 读回该文件: ```go dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } ``` - Worker 的 map 部分可使用 **ihash(key)** 函数(在 worker.go 中)为给定 key 选择对应的 reduce 任务。 - 可从 `mrsequential.go` 借鉴读取 Map 输入文件、在 Map 和 Reduce 之间排序中间 key/value 对、以及将 Reduce 输出写入文件的代码。 - Coordinator 作为 RPC 服务器是并发的;别忘了**对共享数据加锁**。 - Worker 有时需要等待,例如 reduce 须等最后一个 map 完成才能开始。一种做法是 worker 周期性地向 coordinator 请求工作,每次请求之间用 `time.Sleep()` 休眠。另一种做法是 coordinator 中相应的 RPC 处理函数里用循环等待,可用 `time.Sleep()` 或 `sync.Cond`。Go 为每个 RPC 在独立线程中运行处理函数,因此一个处理函数在等待不会阻止 coordinator 处理其他 RPC。 - Coordinator 无法可靠区分崩溃的 worker、存活但卡住的 worker、以及执行过慢的 worker。能做的是让 coordinator 等待一段时间后放弃,并把任务重新发给其他 worker。本实验中请让 coordinator 等待**十秒**;之后应假定该 worker 已死(当然也可能没死)。 - 若选择实现 Backup Tasks(论文 3.6 节),请注意我们测试在 worker 不崩溃时你的代码不会调度多余任务。Backup tasks 应只在相对较长时间(例如 10 秒)后才调度。 - 测试崩溃恢复可使用 **mrapps/crash.go** 应用插件,它会在 Map 和 Reduce 函数中随机退出。 - 为确保在崩溃情况下无人看到未写完的文件,MapReduce 论文提到使用临时文件并在完全写完后原子重命名的技巧。可用 `ioutil.TempFile`(或 Go 1.17 及以上的 `os.CreateTemp`)创建临时文件,用 `os.Rename` 原子重命名。 - Go RPC 只发送**首字母大写**的 struct 字段名。子结构体的字段名也须大写。 - 调用 RPC 的 `call()` 时,reply 结构体应包含全部默认值。RPC 调用应类似: ```go reply := SomeType{} call(..., &reply) ``` 在 call 之前不要设置 reply 的任何字段。若传入的 reply 结构体含有非默认字段,RPC 系统可能静默返回错误值。 ## 不计分挑战 - 实现你自己的 MapReduce 应用(参考 `mrapps/*` 中的示例),例如分布式 Grep(MapReduce 论文 2.3 节)。 - 让 MapReduce coordinator 和 worker 在不同机器上运行,与实际部署一致。需要将 RPC 改为通过 TCP/IP 而非 Unix socket 通信(参见 `Coordinator.server()` 中的注释行),并通过共享文件系统读写文件。例如可 ssh 到 MIT 的多台 Athena 集群机器,它们使用 AFS 共享文件;或租用几台 AWS 实例并用 S3 存储。 --- *来源: [6.5840 Lab 1: MapReduce](https://pdos.csail.mit.edu/6.824/labs/lab-mr.html)*