Compare commits
10 Commits
ff788722b7
...
7e5eb65220
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e5eb65220 | ||
|
|
04a0ed2d03 | ||
|
|
fcb8090623 | ||
|
|
9992edfa79 | ||
|
|
8947299250 | ||
|
|
e3d565ccb7 | ||
|
|
bc1e228980 | ||
|
|
e0e726d4a4 | ||
|
|
aa04d6cf34 | ||
|
|
b8053cc3f5 |
@@ -55,7 +55,7 @@ main() {
|
||||
|
||||
case $labnum in
|
||||
"lab1") check_lab1;;
|
||||
"lab2a"|"lab2b"|"lab2c") check_lab2;;
|
||||
"lab2a"|"lab2b"|"lab2c"|"lab2d") check_lab2;;
|
||||
"lab3a"|"lab3b") check_lab3;;
|
||||
"lab4a") check_lab4a;;
|
||||
"lab4b") check_lab4b;;
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
pkg/
|
||||
api.key
|
||||
.api.key.trimmed
|
||||
*-handin.tar.gz
|
||||
|
||||
12
Makefile
12
Makefile
@@ -1,15 +1,15 @@
|
||||
# This is the Makefile helping you submit the labs.
|
||||
# Just create 6.824/api.key with your API key in it,
|
||||
# and submit your lab with the following command:
|
||||
# $ make [lab1|lab2a|lab2b|lab2c|lab3a|lab3b|lab4a|lab4b]
|
||||
# $ make [lab1|lab2a|lab2b|lab2c|lab2d|lab3a|lab3b|lab4a|lab4b]
|
||||
|
||||
LABS=" lab1 lab2a lab2b lab2c lab3a lab3b lab4a lab4b "
|
||||
LABS=" lab1 lab2a lab2b lab2c lab2d lab3a lab3b lab4a lab4b "
|
||||
|
||||
%: check-%
|
||||
@echo "Preparing $@-handin.tar.gz"
|
||||
@if echo $(LABS) | grep -q " $@ " ; then \
|
||||
echo "Tarring up your submission..." ; \
|
||||
tar cvzf $@-handin.tar.gz \
|
||||
COPYFILE_DISABLE=1 tar cvzf $@-handin.tar.gz \
|
||||
"--exclude=src/main/pg-*.txt" \
|
||||
"--exclude=src/main/diskvd" \
|
||||
"--exclude=src/mapreduce/824-mrinput-*.txt" \
|
||||
@@ -28,10 +28,8 @@ LABS=" lab1 lab2a lab2b lab2c lab3a lab3b lab4a lab4b "
|
||||
read line; \
|
||||
if test "$$line" != "yes" ; then echo "Giving up submission"; exit; fi; \
|
||||
if test `stat -c "%s" "$@-handin.tar.gz" 2>/dev/null || stat -f "%z" "$@-handin.tar.gz"` -ge 20971520 ; then echo "File exceeds 20MB."; exit; fi; \
|
||||
mv api.key api.key.fix ; \
|
||||
cat api.key.fix | tr -d '\n' > api.key ; \
|
||||
rm api.key.fix ; \
|
||||
curl -F file=@$@-handin.tar.gz -F "key=<api.key" \
|
||||
cat api.key | tr -d '\n' > .api.key.trimmed ; \
|
||||
curl --silent --fail --show-error -F file=@$@-handin.tar.gz -F "key=<.api.key.trimmed" \
|
||||
https://6824.scripts.mit.edu/2021/handin.py/upload > /dev/null || { \
|
||||
echo ; \
|
||||
echo "Submit seems to have failed."; \
|
||||
|
||||
@@ -412,6 +412,8 @@ func GenericTestSpeed(t *testing.T, part string, maxraftstate int) {
|
||||
if dur > numOps*timePerOp {
|
||||
t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp)
|
||||
}
|
||||
|
||||
cfg.end()
|
||||
}
|
||||
|
||||
func TestBasic3A(t *testing.T) {
|
||||
|
||||
@@ -1,17 +1,21 @@
|
||||
#!/bin/sh
|
||||
#!/usr/bin/env bash
|
||||
|
||||
if [ $# -ne 1 ]; then
|
||||
echo "Usage: $0 numTrials"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
trap 'kill -INT -$pid; exit 1' INT
|
||||
|
||||
# Note: because the socketID is based on the current userID,
|
||||
# ./test-mr.sh cannot be run in parallel
|
||||
runs=$1
|
||||
chmod +x test-mr.sh
|
||||
|
||||
for i in $(seq 1 $runs); do
|
||||
if ! timeout -k 2s 900s ./test-mr.sh
|
||||
then
|
||||
timeout -k 2s 900s ./test-mr.sh &
|
||||
pid=$!
|
||||
if ! wait $pid; then
|
||||
echo '***' FAILED TESTS IN TRIAL $i
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
#
|
||||
# basic map-reduce test
|
||||
@@ -102,7 +102,7 @@ wait
|
||||
#########################################################
|
||||
echo '***' Starting map parallelism test.
|
||||
|
||||
rm -f mr-out* mr-worker*
|
||||
rm -f mr-*
|
||||
|
||||
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
||||
sleep 1
|
||||
@@ -133,7 +133,7 @@ wait
|
||||
#########################################################
|
||||
echo '***' Starting reduce parallelism test.
|
||||
|
||||
rm -f mr-out* mr-worker*
|
||||
rm -f mr-*
|
||||
|
||||
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
||||
sleep 1
|
||||
@@ -156,7 +156,7 @@ wait
|
||||
#########################################################
|
||||
echo '***' Starting job count test.
|
||||
|
||||
rm -f mr-out* mr-worker*
|
||||
rm -f mr-*
|
||||
|
||||
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
||||
sleep 1
|
||||
|
||||
@@ -37,7 +37,12 @@ func (a byTime) Swap(i, j int) {
|
||||
}
|
||||
|
||||
func (a byTime) Less(i, j int) bool {
|
||||
if a[i].time != a[j].time {
|
||||
return a[i].time < a[j].time
|
||||
}
|
||||
// if the timestamps are the same, we need to make sure we order calls
|
||||
// before returns
|
||||
return a[i].kind == callEntry && a[j].kind == returnEntry
|
||||
}
|
||||
|
||||
func makeEntries(history []Operation) []entry {
|
||||
|
||||
@@ -127,8 +127,9 @@ func (cfg *config) crash1(i int) {
|
||||
|
||||
if cfg.saved[i] != nil {
|
||||
raftlog := cfg.saved[i].ReadRaftState()
|
||||
snapshot := cfg.saved[i].ReadSnapshot()
|
||||
cfg.saved[i] = &Persister{}
|
||||
cfg.saved[i].SaveRaftState(raftlog)
|
||||
cfg.saved[i].SaveStateAndSnapshot(raftlog, snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -178,8 +179,11 @@ const SnapShotInterval = 10
|
||||
|
||||
// periodically snapshot raft state
|
||||
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
||||
lastApplied := 0
|
||||
for m := range applyCh {
|
||||
if m.SnapshotValid {
|
||||
//DPrintf("Installsnapshot %v %v\n", m.SnapshotIndex, lastApplied)
|
||||
cfg.mu.Lock()
|
||||
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
|
||||
m.SnapshotIndex, m.Snapshot) {
|
||||
cfg.logs[i] = make(map[int]interface{})
|
||||
@@ -190,8 +194,11 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
||||
log.Fatalf("decode error\n")
|
||||
}
|
||||
cfg.logs[i][m.SnapshotIndex] = v
|
||||
lastApplied = m.SnapshotIndex
|
||||
}
|
||||
} else if m.CommandValid {
|
||||
cfg.mu.Unlock()
|
||||
} else if m.CommandValid && m.CommandIndex > lastApplied {
|
||||
//DPrintf("apply %v lastApplied %v\n", m.CommandIndex, lastApplied)
|
||||
cfg.mu.Lock()
|
||||
err_msg, prevok := cfg.checkLogs(i, m)
|
||||
cfg.mu.Unlock()
|
||||
@@ -204,6 +211,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
||||
// keep reading after error so that Raft doesn't block
|
||||
// holding locks...
|
||||
}
|
||||
lastApplied = m.CommandIndex
|
||||
if (m.CommandIndex+1)%SnapShotInterval == 0 {
|
||||
w := new(bytes.Buffer)
|
||||
e := labgob.NewEncoder(w)
|
||||
@@ -212,7 +220,12 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
||||
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
|
||||
}
|
||||
} else {
|
||||
// ignore other types of ApplyMsg
|
||||
// Ignore other types of ApplyMsg or old
|
||||
// commands. Old command may never happen,
|
||||
// depending on the Raft implementation, but
|
||||
// just in case.
|
||||
// DPrintf("Ignore: Index %v lastApplied %v\n", m.CommandIndex, lastApplied)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -256,7 +269,6 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
|
||||
cfg.mu.Unlock()
|
||||
|
||||
applyCh := make(chan ApplyMsg)
|
||||
go applier(i, applyCh)
|
||||
|
||||
rf := Make(ends, i, cfg.saved[i], applyCh)
|
||||
|
||||
@@ -264,6 +276,8 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
|
||||
cfg.rafts[i] = rf
|
||||
cfg.mu.Unlock()
|
||||
|
||||
go applier(i, applyCh)
|
||||
|
||||
svc := labrpc.MakeService(rf)
|
||||
srv := labrpc.MakeServer()
|
||||
srv.AddService(svc)
|
||||
|
||||
@@ -17,13 +17,14 @@ package raft
|
||||
// in the same server.
|
||||
//
|
||||
|
||||
import "sync"
|
||||
import "sync/atomic"
|
||||
import "6.824/labrpc"
|
||||
|
||||
// import "bytes"
|
||||
// import "6.824/labgob"
|
||||
import (
|
||||
// "bytes"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
// "6.824/labgob"
|
||||
"6.824/labrpc"
|
||||
)
|
||||
|
||||
|
||||
//
|
||||
@@ -121,7 +122,7 @@ func (rf *Raft) readPersist(data []byte) {
|
||||
//
|
||||
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
|
||||
|
||||
// Your code here (2C).
|
||||
// Your code here (2D).
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -131,7 +132,7 @@ func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int,
|
||||
// service no longer needs the log through (and including)
|
||||
// that index. Raft should now trim its log as much as possible.
|
||||
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
||||
// Your code here (2C).
|
||||
// Your code here (2D).
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -301,11 +301,13 @@ func (cfg *config) Leader() (bool, int) {
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
if cfg.servers[i] != nil {
|
||||
_, is_leader := cfg.servers[i].rf.GetState()
|
||||
if is_leader {
|
||||
return true, i
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, 0
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ func (sc *ShardCtrler) Raft() *raft.Raft {
|
||||
|
||||
//
|
||||
// servers[] contains the ports of the set of
|
||||
// servers that will cooperate via Paxos to
|
||||
// servers that will cooperate via Raft to
|
||||
// form the fault-tolerant shardctrler service.
|
||||
// me is the index of the current server in servers[].
|
||||
//
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package shardctrler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// import "time"
|
||||
import "fmt"
|
||||
|
||||
func check(t *testing.T, groups []int, ck *Clerk) {
|
||||
c := ck.Query(-1)
|
||||
@@ -377,4 +378,26 @@ func TestMulti(t *testing.T) {
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
|
||||
fmt.Printf("Test: Check Same config on servers ...\n")
|
||||
|
||||
isLeader, leader := cfg.Leader()
|
||||
if !isLeader {
|
||||
t.Fatalf("Leader not found")
|
||||
}
|
||||
c := ck.Query(-1) // Config leader claims
|
||||
|
||||
cfg.ShutdownServer(leader)
|
||||
|
||||
attempts := 0
|
||||
for isLeader, leader = cfg.Leader(); isLeader; time.Sleep(1 * time.Second) {
|
||||
if attempts++; attempts >= 3 {
|
||||
t.Fatalf("Leader not found")
|
||||
}
|
||||
}
|
||||
|
||||
c1 = ck.Query(-1)
|
||||
check_same_config(t, c, c1)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package shardkv
|
||||
|
||||
//
|
||||
// Sharded key/value server.
|
||||
// Lots of replica groups, each running op-at-a-time paxos.
|
||||
// Lots of replica groups, each running Raft.
|
||||
// Shardctrler decides which group serves each shard.
|
||||
// Shardctrler may change shard assignment from time to time.
|
||||
//
|
||||
|
||||
@@ -453,6 +453,74 @@ func TestConcurrent2(t *testing.T) {
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestConcurrent3(t *testing.T) {
|
||||
fmt.Printf("Test: concurrent configuration change and restart...\n")
|
||||
|
||||
cfg := make_config(t, 3, false, 300)
|
||||
defer cfg.cleanup()
|
||||
|
||||
ck := cfg.makeClient()
|
||||
|
||||
cfg.join(0)
|
||||
|
||||
n := 10
|
||||
ka := make([]string, n)
|
||||
va := make([]string, n)
|
||||
for i := 0; i < n; i++ {
|
||||
ka[i] = strconv.Itoa(i)
|
||||
va[i] = randstring(1)
|
||||
ck.Put(ka[i], va[i])
|
||||
}
|
||||
|
||||
var done int32
|
||||
ch := make(chan bool)
|
||||
|
||||
ff := func(i int, ck1 *Clerk) {
|
||||
defer func() { ch <- true }()
|
||||
for atomic.LoadInt32(&done) == 0 {
|
||||
x := randstring(1)
|
||||
ck1.Append(ka[i], x)
|
||||
va[i] += x
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
ck1 := cfg.makeClient()
|
||||
go ff(i, ck1)
|
||||
}
|
||||
|
||||
t0 := time.Now()
|
||||
for time.Since(t0) < 12*time.Second {
|
||||
cfg.join(2)
|
||||
cfg.join(1)
|
||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||
cfg.ShutdownGroup(0)
|
||||
cfg.ShutdownGroup(1)
|
||||
cfg.ShutdownGroup(2)
|
||||
cfg.StartGroup(0)
|
||||
cfg.StartGroup(1)
|
||||
cfg.StartGroup(2)
|
||||
|
||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||
cfg.leave(1)
|
||||
cfg.leave(2)
|
||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
atomic.StoreInt32(&done, 1)
|
||||
for i := 0; i < n; i++ {
|
||||
<-ch
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
check(t, ck, ka[i], va[i])
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestUnreliable1(t *testing.T) {
|
||||
fmt.Printf("Test: unreliable 1...\n")
|
||||
|
||||
@@ -748,74 +816,6 @@ func TestChallenge1Delete(t *testing.T) {
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestChallenge1Concurrent(t *testing.T) {
|
||||
fmt.Printf("Test: concurrent configuration change and restart (challenge 1)...\n")
|
||||
|
||||
cfg := make_config(t, 3, false, 300)
|
||||
defer cfg.cleanup()
|
||||
|
||||
ck := cfg.makeClient()
|
||||
|
||||
cfg.join(0)
|
||||
|
||||
n := 10
|
||||
ka := make([]string, n)
|
||||
va := make([]string, n)
|
||||
for i := 0; i < n; i++ {
|
||||
ka[i] = strconv.Itoa(i)
|
||||
va[i] = randstring(1)
|
||||
ck.Put(ka[i], va[i])
|
||||
}
|
||||
|
||||
var done int32
|
||||
ch := make(chan bool)
|
||||
|
||||
ff := func(i int, ck1 *Clerk) {
|
||||
defer func() { ch <- true }()
|
||||
for atomic.LoadInt32(&done) == 0 {
|
||||
x := randstring(1)
|
||||
ck1.Append(ka[i], x)
|
||||
va[i] += x
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
ck1 := cfg.makeClient()
|
||||
go ff(i, ck1)
|
||||
}
|
||||
|
||||
t0 := time.Now()
|
||||
for time.Since(t0) < 12*time.Second {
|
||||
cfg.join(2)
|
||||
cfg.join(1)
|
||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||
cfg.ShutdownGroup(0)
|
||||
cfg.ShutdownGroup(1)
|
||||
cfg.ShutdownGroup(2)
|
||||
cfg.StartGroup(0)
|
||||
cfg.StartGroup(1)
|
||||
cfg.StartGroup(2)
|
||||
|
||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||
cfg.leave(1)
|
||||
cfg.leave(2)
|
||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
atomic.StoreInt32(&done, 1)
|
||||
for i := 0; i < n; i++ {
|
||||
<-ch
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
check(t, ck, ka[i], va[i])
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
//
|
||||
// optional test to see whether servers can handle
|
||||
// shards that are not affected by a config change
|
||||
|
||||
Reference in New Issue
Block a user