Compare commits

..

10 Commits

Author SHA1 Message Date
Robert Morris
7e5eb65220 update 2022-01-10 15:13:53 -05:00
Jose Javier
04a0ed2d03 update 2021-04-06 19:50:34 -04:00
Jose Javier
fcb8090623 update 2021-03-21 13:54:17 -04:00
Cel A. Skeggs
9992edfa79 update 2021-03-16 14:47:36 -04:00
Jose Javier
8947299250 update 2021-03-01 15:06:47 -05:00
Cel A. Skeggs
e3d565ccb7 update 2021-02-25 14:34:00 -05:00
Lily Tsai
bc1e228980 update 2021-02-23 12:07:54 -08:00
Cel A. Skeggs
e0e726d4a4 update 2021-02-17 16:52:03 -05:00
Cel A. Skeggs
aa04d6cf34 update 2021-02-15 17:58:11 -05:00
Lily Tsai
b8053cc3f5 update 2021-02-15 14:06:34 -08:00
14 changed files with 155 additions and 105 deletions

View File

@@ -55,7 +55,7 @@ main() {
case $labnum in case $labnum in
"lab1") check_lab1;; "lab1") check_lab1;;
"lab2a"|"lab2b"|"lab2c") check_lab2;; "lab2a"|"lab2b"|"lab2c"|"lab2d") check_lab2;;
"lab3a"|"lab3b") check_lab3;; "lab3a"|"lab3b") check_lab3;;
"lab4a") check_lab4a;; "lab4a") check_lab4a;;
"lab4b") check_lab4b;; "lab4b") check_lab4b;;

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
pkg/ pkg/
api.key api.key
.api.key.trimmed
*-handin.tar.gz *-handin.tar.gz

View File

@@ -1,15 +1,15 @@
# This is the Makefile helping you submit the labs. # This is the Makefile helping you submit the labs.
# Just create 6.824/api.key with your API key in it, # Just create 6.824/api.key with your API key in it,
# and submit your lab with the following command: # and submit your lab with the following command:
# $ make [lab1|lab2a|lab2b|lab2c|lab3a|lab3b|lab4a|lab4b] # $ make [lab1|lab2a|lab2b|lab2c|lab2d|lab3a|lab3b|lab4a|lab4b]
LABS=" lab1 lab2a lab2b lab2c lab3a lab3b lab4a lab4b " LABS=" lab1 lab2a lab2b lab2c lab2d lab3a lab3b lab4a lab4b "
%: check-% %: check-%
@echo "Preparing $@-handin.tar.gz" @echo "Preparing $@-handin.tar.gz"
@if echo $(LABS) | grep -q " $@ " ; then \ @if echo $(LABS) | grep -q " $@ " ; then \
echo "Tarring up your submission..." ; \ echo "Tarring up your submission..." ; \
tar cvzf $@-handin.tar.gz \ COPYFILE_DISABLE=1 tar cvzf $@-handin.tar.gz \
"--exclude=src/main/pg-*.txt" \ "--exclude=src/main/pg-*.txt" \
"--exclude=src/main/diskvd" \ "--exclude=src/main/diskvd" \
"--exclude=src/mapreduce/824-mrinput-*.txt" \ "--exclude=src/mapreduce/824-mrinput-*.txt" \
@@ -28,10 +28,8 @@ LABS=" lab1 lab2a lab2b lab2c lab3a lab3b lab4a lab4b "
read line; \ read line; \
if test "$$line" != "yes" ; then echo "Giving up submission"; exit; fi; \ if test "$$line" != "yes" ; then echo "Giving up submission"; exit; fi; \
if test `stat -c "%s" "$@-handin.tar.gz" 2>/dev/null || stat -f "%z" "$@-handin.tar.gz"` -ge 20971520 ; then echo "File exceeds 20MB."; exit; fi; \ if test `stat -c "%s" "$@-handin.tar.gz" 2>/dev/null || stat -f "%z" "$@-handin.tar.gz"` -ge 20971520 ; then echo "File exceeds 20MB."; exit; fi; \
mv api.key api.key.fix ; \ cat api.key | tr -d '\n' > .api.key.trimmed ; \
cat api.key.fix | tr -d '\n' > api.key ; \ curl --silent --fail --show-error -F file=@$@-handin.tar.gz -F "key=<.api.key.trimmed" \
rm api.key.fix ; \
curl -F file=@$@-handin.tar.gz -F "key=<api.key" \
https://6824.scripts.mit.edu/2021/handin.py/upload > /dev/null || { \ https://6824.scripts.mit.edu/2021/handin.py/upload > /dev/null || { \
echo ; \ echo ; \
echo "Submit seems to have failed."; \ echo "Submit seems to have failed."; \

View File

@@ -412,6 +412,8 @@ func GenericTestSpeed(t *testing.T, part string, maxraftstate int) {
if dur > numOps*timePerOp { if dur > numOps*timePerOp {
t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp) t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp)
} }
cfg.end()
} }
func TestBasic3A(t *testing.T) { func TestBasic3A(t *testing.T) {

View File

@@ -1,17 +1,21 @@
#!/bin/sh #!/usr/bin/env bash
if [ $# -ne 1 ]; then if [ $# -ne 1 ]; then
echo "Usage: $0 numTrials" echo "Usage: $0 numTrials"
exit 1 exit 1
fi fi
trap 'kill -INT -$pid; exit 1' INT
# Note: because the socketID is based on the current userID, # Note: because the socketID is based on the current userID,
# ./test-mr.sh cannot be run in parallel # ./test-mr.sh cannot be run in parallel
runs=$1 runs=$1
chmod +x test-mr.sh chmod +x test-mr.sh
for i in $(seq 1 $runs); do for i in $(seq 1 $runs); do
if ! timeout -k 2s 900s ./test-mr.sh timeout -k 2s 900s ./test-mr.sh &
then pid=$!
if ! wait $pid; then
echo '***' FAILED TESTS IN TRIAL $i echo '***' FAILED TESTS IN TRIAL $i
exit 1 exit 1
fi fi

View File

@@ -1,4 +1,4 @@
#!/bin/bash #!/usr/bin/env bash
# #
# basic map-reduce test # basic map-reduce test
@@ -102,7 +102,7 @@ wait
######################################################### #########################################################
echo '***' Starting map parallelism test. echo '***' Starting map parallelism test.
rm -f mr-out* mr-worker* rm -f mr-*
timeout -k 2s 180s ../mrcoordinator ../pg*txt & timeout -k 2s 180s ../mrcoordinator ../pg*txt &
sleep 1 sleep 1
@@ -133,7 +133,7 @@ wait
######################################################### #########################################################
echo '***' Starting reduce parallelism test. echo '***' Starting reduce parallelism test.
rm -f mr-out* mr-worker* rm -f mr-*
timeout -k 2s 180s ../mrcoordinator ../pg*txt & timeout -k 2s 180s ../mrcoordinator ../pg*txt &
sleep 1 sleep 1
@@ -156,7 +156,7 @@ wait
######################################################### #########################################################
echo '***' Starting job count test. echo '***' Starting job count test.
rm -f mr-out* mr-worker* rm -f mr-*
timeout -k 2s 180s ../mrcoordinator ../pg*txt & timeout -k 2s 180s ../mrcoordinator ../pg*txt &
sleep 1 sleep 1

View File

@@ -37,7 +37,12 @@ func (a byTime) Swap(i, j int) {
} }
func (a byTime) Less(i, j int) bool { func (a byTime) Less(i, j int) bool {
return a[i].time < a[j].time if a[i].time != a[j].time {
return a[i].time < a[j].time
}
// if the timestamps are the same, we need to make sure we order calls
// before returns
return a[i].kind == callEntry && a[j].kind == returnEntry
} }
func makeEntries(history []Operation) []entry { func makeEntries(history []Operation) []entry {

View File

@@ -127,8 +127,9 @@ func (cfg *config) crash1(i int) {
if cfg.saved[i] != nil { if cfg.saved[i] != nil {
raftlog := cfg.saved[i].ReadRaftState() raftlog := cfg.saved[i].ReadRaftState()
snapshot := cfg.saved[i].ReadSnapshot()
cfg.saved[i] = &Persister{} cfg.saved[i] = &Persister{}
cfg.saved[i].SaveRaftState(raftlog) cfg.saved[i].SaveStateAndSnapshot(raftlog, snapshot)
} }
} }
@@ -178,8 +179,11 @@ const SnapShotInterval = 10
// periodically snapshot raft state // periodically snapshot raft state
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
lastApplied := 0
for m := range applyCh { for m := range applyCh {
if m.SnapshotValid { if m.SnapshotValid {
//DPrintf("Installsnapshot %v %v\n", m.SnapshotIndex, lastApplied)
cfg.mu.Lock()
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm, if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
m.SnapshotIndex, m.Snapshot) { m.SnapshotIndex, m.Snapshot) {
cfg.logs[i] = make(map[int]interface{}) cfg.logs[i] = make(map[int]interface{})
@@ -190,8 +194,11 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
log.Fatalf("decode error\n") log.Fatalf("decode error\n")
} }
cfg.logs[i][m.SnapshotIndex] = v cfg.logs[i][m.SnapshotIndex] = v
lastApplied = m.SnapshotIndex
} }
} else if m.CommandValid { cfg.mu.Unlock()
} else if m.CommandValid && m.CommandIndex > lastApplied {
//DPrintf("apply %v lastApplied %v\n", m.CommandIndex, lastApplied)
cfg.mu.Lock() cfg.mu.Lock()
err_msg, prevok := cfg.checkLogs(i, m) err_msg, prevok := cfg.checkLogs(i, m)
cfg.mu.Unlock() cfg.mu.Unlock()
@@ -204,6 +211,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
// keep reading after error so that Raft doesn't block // keep reading after error so that Raft doesn't block
// holding locks... // holding locks...
} }
lastApplied = m.CommandIndex
if (m.CommandIndex+1)%SnapShotInterval == 0 { if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer) w := new(bytes.Buffer)
e := labgob.NewEncoder(w) e := labgob.NewEncoder(w)
@@ -212,7 +220,12 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes()) cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
} }
} else { } else {
// ignore other types of ApplyMsg // Ignore other types of ApplyMsg or old
// commands. Old command may never happen,
// depending on the Raft implementation, but
// just in case.
// DPrintf("Ignore: Index %v lastApplied %v\n", m.CommandIndex, lastApplied)
} }
} }
} }
@@ -256,7 +269,6 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
cfg.mu.Unlock() cfg.mu.Unlock()
applyCh := make(chan ApplyMsg) applyCh := make(chan ApplyMsg)
go applier(i, applyCh)
rf := Make(ends, i, cfg.saved[i], applyCh) rf := Make(ends, i, cfg.saved[i], applyCh)
@@ -264,6 +276,8 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
cfg.rafts[i] = rf cfg.rafts[i] = rf
cfg.mu.Unlock() cfg.mu.Unlock()
go applier(i, applyCh)
svc := labrpc.MakeService(rf) svc := labrpc.MakeService(rf)
srv := labrpc.MakeServer() srv := labrpc.MakeServer()
srv.AddService(svc) srv.AddService(svc)

View File

@@ -17,13 +17,14 @@ package raft
// in the same server. // in the same server.
// //
import "sync" import (
import "sync/atomic" // "bytes"
import "6.824/labrpc" "sync"
"sync/atomic"
// import "bytes"
// import "6.824/labgob"
// "6.824/labgob"
"6.824/labrpc"
)
// //
@@ -121,7 +122,7 @@ func (rf *Raft) readPersist(data []byte) {
// //
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool { func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Your code here (2C). // Your code here (2D).
return true return true
} }
@@ -131,7 +132,7 @@ func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int,
// service no longer needs the log through (and including) // service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible. // that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) { func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2C). // Your code here (2D).
} }

View File

@@ -301,9 +301,11 @@ func (cfg *config) Leader() (bool, int) {
defer cfg.mu.Unlock() defer cfg.mu.Unlock()
for i := 0; i < cfg.n; i++ { for i := 0; i < cfg.n; i++ {
_, is_leader := cfg.servers[i].rf.GetState() if cfg.servers[i] != nil {
if is_leader { _, is_leader := cfg.servers[i].rf.GetState()
return true, i if is_leader {
return true, i
}
} }
} }
return false, 0 return false, 0

View File

@@ -59,7 +59,7 @@ func (sc *ShardCtrler) Raft() *raft.Raft {
// //
// servers[] contains the ports of the set of // servers[] contains the ports of the set of
// servers that will cooperate via Paxos to // servers that will cooperate via Raft to
// form the fault-tolerant shardctrler service. // form the fault-tolerant shardctrler service.
// me is the index of the current server in servers[]. // me is the index of the current server in servers[].
// //

View File

@@ -1,12 +1,13 @@
package shardctrler package shardctrler
import ( import (
"fmt"
"sync" "sync"
"testing" "testing"
"time"
) )
// import "time" // import "time"
import "fmt"
func check(t *testing.T, groups []int, ck *Clerk) { func check(t *testing.T, groups []int, ck *Clerk) {
c := ck.Query(-1) c := ck.Query(-1)
@@ -377,4 +378,26 @@ func TestMulti(t *testing.T) {
} }
fmt.Printf(" ... Passed\n") fmt.Printf(" ... Passed\n")
fmt.Printf("Test: Check Same config on servers ...\n")
isLeader, leader := cfg.Leader()
if !isLeader {
t.Fatalf("Leader not found")
}
c := ck.Query(-1) // Config leader claims
cfg.ShutdownServer(leader)
attempts := 0
for isLeader, leader = cfg.Leader(); isLeader; time.Sleep(1 * time.Second) {
if attempts++; attempts >= 3 {
t.Fatalf("Leader not found")
}
}
c1 = ck.Query(-1)
check_same_config(t, c, c1)
fmt.Printf(" ... Passed\n")
} }

View File

@@ -2,7 +2,7 @@ package shardkv
// //
// Sharded key/value server. // Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos. // Lots of replica groups, each running Raft.
// Shardctrler decides which group serves each shard. // Shardctrler decides which group serves each shard.
// Shardctrler may change shard assignment from time to time. // Shardctrler may change shard assignment from time to time.
// //

View File

@@ -453,6 +453,74 @@ func TestConcurrent2(t *testing.T) {
fmt.Printf(" ... Passed\n") fmt.Printf(" ... Passed\n")
} }
func TestConcurrent3(t *testing.T) {
fmt.Printf("Test: concurrent configuration change and restart...\n")
cfg := make_config(t, 3, false, 300)
defer cfg.cleanup()
ck := cfg.makeClient()
cfg.join(0)
n := 10
ka := make([]string, n)
va := make([]string, n)
for i := 0; i < n; i++ {
ka[i] = strconv.Itoa(i)
va[i] = randstring(1)
ck.Put(ka[i], va[i])
}
var done int32
ch := make(chan bool)
ff := func(i int, ck1 *Clerk) {
defer func() { ch <- true }()
for atomic.LoadInt32(&done) == 0 {
x := randstring(1)
ck1.Append(ka[i], x)
va[i] += x
}
}
for i := 0; i < n; i++ {
ck1 := cfg.makeClient()
go ff(i, ck1)
}
t0 := time.Now()
for time.Since(t0) < 12*time.Second {
cfg.join(2)
cfg.join(1)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.ShutdownGroup(0)
cfg.ShutdownGroup(1)
cfg.ShutdownGroup(2)
cfg.StartGroup(0)
cfg.StartGroup(1)
cfg.StartGroup(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.leave(1)
cfg.leave(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
}
time.Sleep(2 * time.Second)
atomic.StoreInt32(&done, 1)
for i := 0; i < n; i++ {
<-ch
}
for i := 0; i < n; i++ {
check(t, ck, ka[i], va[i])
}
fmt.Printf(" ... Passed\n")
}
func TestUnreliable1(t *testing.T) { func TestUnreliable1(t *testing.T) {
fmt.Printf("Test: unreliable 1...\n") fmt.Printf("Test: unreliable 1...\n")
@@ -748,74 +816,6 @@ func TestChallenge1Delete(t *testing.T) {
fmt.Printf(" ... Passed\n") fmt.Printf(" ... Passed\n")
} }
func TestChallenge1Concurrent(t *testing.T) {
fmt.Printf("Test: concurrent configuration change and restart (challenge 1)...\n")
cfg := make_config(t, 3, false, 300)
defer cfg.cleanup()
ck := cfg.makeClient()
cfg.join(0)
n := 10
ka := make([]string, n)
va := make([]string, n)
for i := 0; i < n; i++ {
ka[i] = strconv.Itoa(i)
va[i] = randstring(1)
ck.Put(ka[i], va[i])
}
var done int32
ch := make(chan bool)
ff := func(i int, ck1 *Clerk) {
defer func() { ch <- true }()
for atomic.LoadInt32(&done) == 0 {
x := randstring(1)
ck1.Append(ka[i], x)
va[i] += x
}
}
for i := 0; i < n; i++ {
ck1 := cfg.makeClient()
go ff(i, ck1)
}
t0 := time.Now()
for time.Since(t0) < 12*time.Second {
cfg.join(2)
cfg.join(1)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.ShutdownGroup(0)
cfg.ShutdownGroup(1)
cfg.ShutdownGroup(2)
cfg.StartGroup(0)
cfg.StartGroup(1)
cfg.StartGroup(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.leave(1)
cfg.leave(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
}
time.Sleep(2 * time.Second)
atomic.StoreInt32(&done, 1)
for i := 0; i < n; i++ {
<-ch
}
for i := 0; i < n; i++ {
check(t, ck, ka[i], va[i])
}
fmt.Printf(" ... Passed\n")
}
// //
// optional test to see whether servers can handle // optional test to see whether servers can handle
// shards that are not affected by a config change // shards that are not affected by a config change