Compare commits
10 Commits
ff788722b7
...
7e5eb65220
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e5eb65220 | ||
|
|
04a0ed2d03 | ||
|
|
fcb8090623 | ||
|
|
9992edfa79 | ||
|
|
8947299250 | ||
|
|
e3d565ccb7 | ||
|
|
bc1e228980 | ||
|
|
e0e726d4a4 | ||
|
|
aa04d6cf34 | ||
|
|
b8053cc3f5 |
@@ -55,7 +55,7 @@ main() {
|
|||||||
|
|
||||||
case $labnum in
|
case $labnum in
|
||||||
"lab1") check_lab1;;
|
"lab1") check_lab1;;
|
||||||
"lab2a"|"lab2b"|"lab2c") check_lab2;;
|
"lab2a"|"lab2b"|"lab2c"|"lab2d") check_lab2;;
|
||||||
"lab3a"|"lab3b") check_lab3;;
|
"lab3a"|"lab3b") check_lab3;;
|
||||||
"lab4a") check_lab4a;;
|
"lab4a") check_lab4a;;
|
||||||
"lab4b") check_lab4b;;
|
"lab4b") check_lab4b;;
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
pkg/
|
pkg/
|
||||||
api.key
|
api.key
|
||||||
|
.api.key.trimmed
|
||||||
*-handin.tar.gz
|
*-handin.tar.gz
|
||||||
|
|||||||
12
Makefile
12
Makefile
@@ -1,15 +1,15 @@
|
|||||||
# This is the Makefile helping you submit the labs.
|
# This is the Makefile helping you submit the labs.
|
||||||
# Just create 6.824/api.key with your API key in it,
|
# Just create 6.824/api.key with your API key in it,
|
||||||
# and submit your lab with the following command:
|
# and submit your lab with the following command:
|
||||||
# $ make [lab1|lab2a|lab2b|lab2c|lab3a|lab3b|lab4a|lab4b]
|
# $ make [lab1|lab2a|lab2b|lab2c|lab2d|lab3a|lab3b|lab4a|lab4b]
|
||||||
|
|
||||||
LABS=" lab1 lab2a lab2b lab2c lab3a lab3b lab4a lab4b "
|
LABS=" lab1 lab2a lab2b lab2c lab2d lab3a lab3b lab4a lab4b "
|
||||||
|
|
||||||
%: check-%
|
%: check-%
|
||||||
@echo "Preparing $@-handin.tar.gz"
|
@echo "Preparing $@-handin.tar.gz"
|
||||||
@if echo $(LABS) | grep -q " $@ " ; then \
|
@if echo $(LABS) | grep -q " $@ " ; then \
|
||||||
echo "Tarring up your submission..." ; \
|
echo "Tarring up your submission..." ; \
|
||||||
tar cvzf $@-handin.tar.gz \
|
COPYFILE_DISABLE=1 tar cvzf $@-handin.tar.gz \
|
||||||
"--exclude=src/main/pg-*.txt" \
|
"--exclude=src/main/pg-*.txt" \
|
||||||
"--exclude=src/main/diskvd" \
|
"--exclude=src/main/diskvd" \
|
||||||
"--exclude=src/mapreduce/824-mrinput-*.txt" \
|
"--exclude=src/mapreduce/824-mrinput-*.txt" \
|
||||||
@@ -28,10 +28,8 @@ LABS=" lab1 lab2a lab2b lab2c lab3a lab3b lab4a lab4b "
|
|||||||
read line; \
|
read line; \
|
||||||
if test "$$line" != "yes" ; then echo "Giving up submission"; exit; fi; \
|
if test "$$line" != "yes" ; then echo "Giving up submission"; exit; fi; \
|
||||||
if test `stat -c "%s" "$@-handin.tar.gz" 2>/dev/null || stat -f "%z" "$@-handin.tar.gz"` -ge 20971520 ; then echo "File exceeds 20MB."; exit; fi; \
|
if test `stat -c "%s" "$@-handin.tar.gz" 2>/dev/null || stat -f "%z" "$@-handin.tar.gz"` -ge 20971520 ; then echo "File exceeds 20MB."; exit; fi; \
|
||||||
mv api.key api.key.fix ; \
|
cat api.key | tr -d '\n' > .api.key.trimmed ; \
|
||||||
cat api.key.fix | tr -d '\n' > api.key ; \
|
curl --silent --fail --show-error -F file=@$@-handin.tar.gz -F "key=<.api.key.trimmed" \
|
||||||
rm api.key.fix ; \
|
|
||||||
curl -F file=@$@-handin.tar.gz -F "key=<api.key" \
|
|
||||||
https://6824.scripts.mit.edu/2021/handin.py/upload > /dev/null || { \
|
https://6824.scripts.mit.edu/2021/handin.py/upload > /dev/null || { \
|
||||||
echo ; \
|
echo ; \
|
||||||
echo "Submit seems to have failed."; \
|
echo "Submit seems to have failed."; \
|
||||||
|
|||||||
@@ -412,6 +412,8 @@ func GenericTestSpeed(t *testing.T, part string, maxraftstate int) {
|
|||||||
if dur > numOps*timePerOp {
|
if dur > numOps*timePerOp {
|
||||||
t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp)
|
t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfg.end()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBasic3A(t *testing.T) {
|
func TestBasic3A(t *testing.T) {
|
||||||
|
|||||||
@@ -1,17 +1,21 @@
|
|||||||
#!/bin/sh
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $0 numTrials"
|
echo "Usage: $0 numTrials"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
trap 'kill -INT -$pid; exit 1' INT
|
||||||
|
|
||||||
# Note: because the socketID is based on the current userID,
|
# Note: because the socketID is based on the current userID,
|
||||||
# ./test-mr.sh cannot be run in parallel
|
# ./test-mr.sh cannot be run in parallel
|
||||||
runs=$1
|
runs=$1
|
||||||
chmod +x test-mr.sh
|
chmod +x test-mr.sh
|
||||||
|
|
||||||
for i in $(seq 1 $runs); do
|
for i in $(seq 1 $runs); do
|
||||||
if ! timeout -k 2s 900s ./test-mr.sh
|
timeout -k 2s 900s ./test-mr.sh &
|
||||||
then
|
pid=$!
|
||||||
|
if ! wait $pid; then
|
||||||
echo '***' FAILED TESTS IN TRIAL $i
|
echo '***' FAILED TESTS IN TRIAL $i
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
#!/bin/bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
#
|
#
|
||||||
# basic map-reduce test
|
# basic map-reduce test
|
||||||
@@ -102,7 +102,7 @@ wait
|
|||||||
#########################################################
|
#########################################################
|
||||||
echo '***' Starting map parallelism test.
|
echo '***' Starting map parallelism test.
|
||||||
|
|
||||||
rm -f mr-out* mr-worker*
|
rm -f mr-*
|
||||||
|
|
||||||
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
||||||
sleep 1
|
sleep 1
|
||||||
@@ -133,7 +133,7 @@ wait
|
|||||||
#########################################################
|
#########################################################
|
||||||
echo '***' Starting reduce parallelism test.
|
echo '***' Starting reduce parallelism test.
|
||||||
|
|
||||||
rm -f mr-out* mr-worker*
|
rm -f mr-*
|
||||||
|
|
||||||
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
||||||
sleep 1
|
sleep 1
|
||||||
@@ -156,7 +156,7 @@ wait
|
|||||||
#########################################################
|
#########################################################
|
||||||
echo '***' Starting job count test.
|
echo '***' Starting job count test.
|
||||||
|
|
||||||
rm -f mr-out* mr-worker*
|
rm -f mr-*
|
||||||
|
|
||||||
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
timeout -k 2s 180s ../mrcoordinator ../pg*txt &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|||||||
@@ -37,7 +37,12 @@ func (a byTime) Swap(i, j int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a byTime) Less(i, j int) bool {
|
func (a byTime) Less(i, j int) bool {
|
||||||
return a[i].time < a[j].time
|
if a[i].time != a[j].time {
|
||||||
|
return a[i].time < a[j].time
|
||||||
|
}
|
||||||
|
// if the timestamps are the same, we need to make sure we order calls
|
||||||
|
// before returns
|
||||||
|
return a[i].kind == callEntry && a[j].kind == returnEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeEntries(history []Operation) []entry {
|
func makeEntries(history []Operation) []entry {
|
||||||
|
|||||||
@@ -127,8 +127,9 @@ func (cfg *config) crash1(i int) {
|
|||||||
|
|
||||||
if cfg.saved[i] != nil {
|
if cfg.saved[i] != nil {
|
||||||
raftlog := cfg.saved[i].ReadRaftState()
|
raftlog := cfg.saved[i].ReadRaftState()
|
||||||
|
snapshot := cfg.saved[i].ReadSnapshot()
|
||||||
cfg.saved[i] = &Persister{}
|
cfg.saved[i] = &Persister{}
|
||||||
cfg.saved[i].SaveRaftState(raftlog)
|
cfg.saved[i].SaveStateAndSnapshot(raftlog, snapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -178,8 +179,11 @@ const SnapShotInterval = 10
|
|||||||
|
|
||||||
// periodically snapshot raft state
|
// periodically snapshot raft state
|
||||||
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
||||||
|
lastApplied := 0
|
||||||
for m := range applyCh {
|
for m := range applyCh {
|
||||||
if m.SnapshotValid {
|
if m.SnapshotValid {
|
||||||
|
//DPrintf("Installsnapshot %v %v\n", m.SnapshotIndex, lastApplied)
|
||||||
|
cfg.mu.Lock()
|
||||||
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
|
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
|
||||||
m.SnapshotIndex, m.Snapshot) {
|
m.SnapshotIndex, m.Snapshot) {
|
||||||
cfg.logs[i] = make(map[int]interface{})
|
cfg.logs[i] = make(map[int]interface{})
|
||||||
@@ -190,8 +194,11 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
|||||||
log.Fatalf("decode error\n")
|
log.Fatalf("decode error\n")
|
||||||
}
|
}
|
||||||
cfg.logs[i][m.SnapshotIndex] = v
|
cfg.logs[i][m.SnapshotIndex] = v
|
||||||
|
lastApplied = m.SnapshotIndex
|
||||||
}
|
}
|
||||||
} else if m.CommandValid {
|
cfg.mu.Unlock()
|
||||||
|
} else if m.CommandValid && m.CommandIndex > lastApplied {
|
||||||
|
//DPrintf("apply %v lastApplied %v\n", m.CommandIndex, lastApplied)
|
||||||
cfg.mu.Lock()
|
cfg.mu.Lock()
|
||||||
err_msg, prevok := cfg.checkLogs(i, m)
|
err_msg, prevok := cfg.checkLogs(i, m)
|
||||||
cfg.mu.Unlock()
|
cfg.mu.Unlock()
|
||||||
@@ -204,6 +211,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
|||||||
// keep reading after error so that Raft doesn't block
|
// keep reading after error so that Raft doesn't block
|
||||||
// holding locks...
|
// holding locks...
|
||||||
}
|
}
|
||||||
|
lastApplied = m.CommandIndex
|
||||||
if (m.CommandIndex+1)%SnapShotInterval == 0 {
|
if (m.CommandIndex+1)%SnapShotInterval == 0 {
|
||||||
w := new(bytes.Buffer)
|
w := new(bytes.Buffer)
|
||||||
e := labgob.NewEncoder(w)
|
e := labgob.NewEncoder(w)
|
||||||
@@ -212,7 +220,12 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
|||||||
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
|
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// ignore other types of ApplyMsg
|
// Ignore other types of ApplyMsg or old
|
||||||
|
// commands. Old command may never happen,
|
||||||
|
// depending on the Raft implementation, but
|
||||||
|
// just in case.
|
||||||
|
// DPrintf("Ignore: Index %v lastApplied %v\n", m.CommandIndex, lastApplied)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -256,7 +269,6 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
|
|||||||
cfg.mu.Unlock()
|
cfg.mu.Unlock()
|
||||||
|
|
||||||
applyCh := make(chan ApplyMsg)
|
applyCh := make(chan ApplyMsg)
|
||||||
go applier(i, applyCh)
|
|
||||||
|
|
||||||
rf := Make(ends, i, cfg.saved[i], applyCh)
|
rf := Make(ends, i, cfg.saved[i], applyCh)
|
||||||
|
|
||||||
@@ -264,6 +276,8 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
|
|||||||
cfg.rafts[i] = rf
|
cfg.rafts[i] = rf
|
||||||
cfg.mu.Unlock()
|
cfg.mu.Unlock()
|
||||||
|
|
||||||
|
go applier(i, applyCh)
|
||||||
|
|
||||||
svc := labrpc.MakeService(rf)
|
svc := labrpc.MakeService(rf)
|
||||||
srv := labrpc.MakeServer()
|
srv := labrpc.MakeServer()
|
||||||
srv.AddService(svc)
|
srv.AddService(svc)
|
||||||
|
|||||||
@@ -17,13 +17,14 @@ package raft
|
|||||||
// in the same server.
|
// in the same server.
|
||||||
//
|
//
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
import "sync/atomic"
|
// "bytes"
|
||||||
import "6.824/labrpc"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
// import "bytes"
|
|
||||||
// import "6.824/labgob"
|
|
||||||
|
|
||||||
|
// "6.824/labgob"
|
||||||
|
"6.824/labrpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -121,7 +122,7 @@ func (rf *Raft) readPersist(data []byte) {
|
|||||||
//
|
//
|
||||||
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
|
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
|
||||||
|
|
||||||
// Your code here (2C).
|
// Your code here (2D).
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -131,7 +132,7 @@ func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int,
|
|||||||
// service no longer needs the log through (and including)
|
// service no longer needs the log through (and including)
|
||||||
// that index. Raft should now trim its log as much as possible.
|
// that index. Raft should now trim its log as much as possible.
|
||||||
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
||||||
// Your code here (2C).
|
// Your code here (2D).
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -301,9 +301,11 @@ func (cfg *config) Leader() (bool, int) {
|
|||||||
defer cfg.mu.Unlock()
|
defer cfg.mu.Unlock()
|
||||||
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
for i := 0; i < cfg.n; i++ {
|
||||||
_, is_leader := cfg.servers[i].rf.GetState()
|
if cfg.servers[i] != nil {
|
||||||
if is_leader {
|
_, is_leader := cfg.servers[i].rf.GetState()
|
||||||
return true, i
|
if is_leader {
|
||||||
|
return true, i
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false, 0
|
return false, 0
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ func (sc *ShardCtrler) Raft() *raft.Raft {
|
|||||||
|
|
||||||
//
|
//
|
||||||
// servers[] contains the ports of the set of
|
// servers[] contains the ports of the set of
|
||||||
// servers that will cooperate via Paxos to
|
// servers that will cooperate via Raft to
|
||||||
// form the fault-tolerant shardctrler service.
|
// form the fault-tolerant shardctrler service.
|
||||||
// me is the index of the current server in servers[].
|
// me is the index of the current server in servers[].
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
package shardctrler
|
package shardctrler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// import "time"
|
// import "time"
|
||||||
import "fmt"
|
|
||||||
|
|
||||||
func check(t *testing.T, groups []int, ck *Clerk) {
|
func check(t *testing.T, groups []int, ck *Clerk) {
|
||||||
c := ck.Query(-1)
|
c := ck.Query(-1)
|
||||||
@@ -377,4 +378,26 @@ func TestMulti(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf(" ... Passed\n")
|
fmt.Printf(" ... Passed\n")
|
||||||
|
|
||||||
|
fmt.Printf("Test: Check Same config on servers ...\n")
|
||||||
|
|
||||||
|
isLeader, leader := cfg.Leader()
|
||||||
|
if !isLeader {
|
||||||
|
t.Fatalf("Leader not found")
|
||||||
|
}
|
||||||
|
c := ck.Query(-1) // Config leader claims
|
||||||
|
|
||||||
|
cfg.ShutdownServer(leader)
|
||||||
|
|
||||||
|
attempts := 0
|
||||||
|
for isLeader, leader = cfg.Leader(); isLeader; time.Sleep(1 * time.Second) {
|
||||||
|
if attempts++; attempts >= 3 {
|
||||||
|
t.Fatalf("Leader not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c1 = ck.Query(-1)
|
||||||
|
check_same_config(t, c, c1)
|
||||||
|
|
||||||
|
fmt.Printf(" ... Passed\n")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package shardkv
|
|||||||
|
|
||||||
//
|
//
|
||||||
// Sharded key/value server.
|
// Sharded key/value server.
|
||||||
// Lots of replica groups, each running op-at-a-time paxos.
|
// Lots of replica groups, each running Raft.
|
||||||
// Shardctrler decides which group serves each shard.
|
// Shardctrler decides which group serves each shard.
|
||||||
// Shardctrler may change shard assignment from time to time.
|
// Shardctrler may change shard assignment from time to time.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -453,6 +453,74 @@ func TestConcurrent2(t *testing.T) {
|
|||||||
fmt.Printf(" ... Passed\n")
|
fmt.Printf(" ... Passed\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrent3(t *testing.T) {
|
||||||
|
fmt.Printf("Test: concurrent configuration change and restart...\n")
|
||||||
|
|
||||||
|
cfg := make_config(t, 3, false, 300)
|
||||||
|
defer cfg.cleanup()
|
||||||
|
|
||||||
|
ck := cfg.makeClient()
|
||||||
|
|
||||||
|
cfg.join(0)
|
||||||
|
|
||||||
|
n := 10
|
||||||
|
ka := make([]string, n)
|
||||||
|
va := make([]string, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
ka[i] = strconv.Itoa(i)
|
||||||
|
va[i] = randstring(1)
|
||||||
|
ck.Put(ka[i], va[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
var done int32
|
||||||
|
ch := make(chan bool)
|
||||||
|
|
||||||
|
ff := func(i int, ck1 *Clerk) {
|
||||||
|
defer func() { ch <- true }()
|
||||||
|
for atomic.LoadInt32(&done) == 0 {
|
||||||
|
x := randstring(1)
|
||||||
|
ck1.Append(ka[i], x)
|
||||||
|
va[i] += x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
ck1 := cfg.makeClient()
|
||||||
|
go ff(i, ck1)
|
||||||
|
}
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
|
for time.Since(t0) < 12*time.Second {
|
||||||
|
cfg.join(2)
|
||||||
|
cfg.join(1)
|
||||||
|
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||||
|
cfg.ShutdownGroup(0)
|
||||||
|
cfg.ShutdownGroup(1)
|
||||||
|
cfg.ShutdownGroup(2)
|
||||||
|
cfg.StartGroup(0)
|
||||||
|
cfg.StartGroup(1)
|
||||||
|
cfg.StartGroup(2)
|
||||||
|
|
||||||
|
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||||
|
cfg.leave(1)
|
||||||
|
cfg.leave(2)
|
||||||
|
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
atomic.StoreInt32(&done, 1)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
check(t, ck, ka[i], va[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(" ... Passed\n")
|
||||||
|
}
|
||||||
|
|
||||||
func TestUnreliable1(t *testing.T) {
|
func TestUnreliable1(t *testing.T) {
|
||||||
fmt.Printf("Test: unreliable 1...\n")
|
fmt.Printf("Test: unreliable 1...\n")
|
||||||
|
|
||||||
@@ -748,74 +816,6 @@ func TestChallenge1Delete(t *testing.T) {
|
|||||||
fmt.Printf(" ... Passed\n")
|
fmt.Printf(" ... Passed\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestChallenge1Concurrent(t *testing.T) {
|
|
||||||
fmt.Printf("Test: concurrent configuration change and restart (challenge 1)...\n")
|
|
||||||
|
|
||||||
cfg := make_config(t, 3, false, 300)
|
|
||||||
defer cfg.cleanup()
|
|
||||||
|
|
||||||
ck := cfg.makeClient()
|
|
||||||
|
|
||||||
cfg.join(0)
|
|
||||||
|
|
||||||
n := 10
|
|
||||||
ka := make([]string, n)
|
|
||||||
va := make([]string, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
ka[i] = strconv.Itoa(i)
|
|
||||||
va[i] = randstring(1)
|
|
||||||
ck.Put(ka[i], va[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
var done int32
|
|
||||||
ch := make(chan bool)
|
|
||||||
|
|
||||||
ff := func(i int, ck1 *Clerk) {
|
|
||||||
defer func() { ch <- true }()
|
|
||||||
for atomic.LoadInt32(&done) == 0 {
|
|
||||||
x := randstring(1)
|
|
||||||
ck1.Append(ka[i], x)
|
|
||||||
va[i] += x
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
ck1 := cfg.makeClient()
|
|
||||||
go ff(i, ck1)
|
|
||||||
}
|
|
||||||
|
|
||||||
t0 := time.Now()
|
|
||||||
for time.Since(t0) < 12*time.Second {
|
|
||||||
cfg.join(2)
|
|
||||||
cfg.join(1)
|
|
||||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
|
||||||
cfg.ShutdownGroup(0)
|
|
||||||
cfg.ShutdownGroup(1)
|
|
||||||
cfg.ShutdownGroup(2)
|
|
||||||
cfg.StartGroup(0)
|
|
||||||
cfg.StartGroup(1)
|
|
||||||
cfg.StartGroup(2)
|
|
||||||
|
|
||||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
|
||||||
cfg.leave(1)
|
|
||||||
cfg.leave(2)
|
|
||||||
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
atomic.StoreInt32(&done, 1)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
<-ch
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
check(t, ck, ka[i], va[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf(" ... Passed\n")
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// optional test to see whether servers can handle
|
// optional test to see whether servers can handle
|
||||||
// shards that are not affected by a config change
|
// shards that are not affected by a config change
|
||||||
|
|||||||
Reference in New Issue
Block a user