This commit is contained in:
Jose Javier
2021-04-06 19:50:34 -04:00
parent fcb8090623
commit 04a0ed2d03
6 changed files with 100 additions and 75 deletions

View File

@@ -9,7 +9,7 @@ LABS=" lab1 lab2a lab2b lab2c lab2d lab3a lab3b lab4a lab4b "
@echo "Preparing $@-handin.tar.gz"
@if echo $(LABS) | grep -q " $@ " ; then \
echo "Tarring up your submission..." ; \
tar cvzf $@-handin.tar.gz \
COPYFILE_DISABLE=1 tar cvzf $@-handin.tar.gz \
"--exclude=src/main/pg-*.txt" \
"--exclude=src/main/diskvd" \
"--exclude=src/mapreduce/824-mrinput-*.txt" \

View File

@@ -301,9 +301,11 @@ func (cfg *config) Leader() (bool, int) {
defer cfg.mu.Unlock()
for i := 0; i < cfg.n; i++ {
_, is_leader := cfg.servers[i].rf.GetState()
if is_leader {
return true, i
if cfg.servers[i] != nil {
_, is_leader := cfg.servers[i].rf.GetState()
if is_leader {
return true, i
}
}
}
return false, 0

View File

@@ -59,7 +59,7 @@ func (sc *ShardCtrler) Raft() *raft.Raft {
//
// servers[] contains the ports of the set of
// servers that will cooperate via Paxos to
// servers that will cooperate via Raft to
// form the fault-tolerant shardctrler service.
// me is the index of the current server in servers[].
//

View File

@@ -1,12 +1,13 @@
package shardctrler
import (
"fmt"
"sync"
"testing"
"time"
)
// import "time"
import "fmt"
func check(t *testing.T, groups []int, ck *Clerk) {
c := ck.Query(-1)
@@ -377,4 +378,26 @@ func TestMulti(t *testing.T) {
}
fmt.Printf(" ... Passed\n")
fmt.Printf("Test: Check Same config on servers ...\n")
isLeader, leader := cfg.Leader()
if !isLeader {
t.Fatalf("Leader not found")
}
c := ck.Query(-1) // Config leader claims
cfg.ShutdownServer(leader)
attempts := 0
for isLeader, leader = cfg.Leader(); isLeader; time.Sleep(1 * time.Second) {
if attempts++; attempts >= 3 {
t.Fatalf("Leader not found")
}
}
c1 = ck.Query(-1)
check_same_config(t, c, c1)
fmt.Printf(" ... Passed\n")
}

View File

@@ -2,7 +2,7 @@ package shardkv
//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Lots of replica groups, each running Raft.
// Shardctrler decides which group serves each shard.
// Shardctrler may change shard assignment from time to time.
//

View File

@@ -453,6 +453,74 @@ func TestConcurrent2(t *testing.T) {
fmt.Printf(" ... Passed\n")
}
func TestConcurrent3(t *testing.T) {
fmt.Printf("Test: concurrent configuration change and restart...\n")
cfg := make_config(t, 3, false, 300)
defer cfg.cleanup()
ck := cfg.makeClient()
cfg.join(0)
n := 10
ka := make([]string, n)
va := make([]string, n)
for i := 0; i < n; i++ {
ka[i] = strconv.Itoa(i)
va[i] = randstring(1)
ck.Put(ka[i], va[i])
}
var done int32
ch := make(chan bool)
ff := func(i int, ck1 *Clerk) {
defer func() { ch <- true }()
for atomic.LoadInt32(&done) == 0 {
x := randstring(1)
ck1.Append(ka[i], x)
va[i] += x
}
}
for i := 0; i < n; i++ {
ck1 := cfg.makeClient()
go ff(i, ck1)
}
t0 := time.Now()
for time.Since(t0) < 12*time.Second {
cfg.join(2)
cfg.join(1)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.ShutdownGroup(0)
cfg.ShutdownGroup(1)
cfg.ShutdownGroup(2)
cfg.StartGroup(0)
cfg.StartGroup(1)
cfg.StartGroup(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.leave(1)
cfg.leave(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
}
time.Sleep(2 * time.Second)
atomic.StoreInt32(&done, 1)
for i := 0; i < n; i++ {
<-ch
}
for i := 0; i < n; i++ {
check(t, ck, ka[i], va[i])
}
fmt.Printf(" ... Passed\n")
}
func TestUnreliable1(t *testing.T) {
fmt.Printf("Test: unreliable 1...\n")
@@ -748,74 +816,6 @@ func TestChallenge1Delete(t *testing.T) {
fmt.Printf(" ... Passed\n")
}
func TestChallenge1Concurrent(t *testing.T) {
fmt.Printf("Test: concurrent configuration change and restart (challenge 1)...\n")
cfg := make_config(t, 3, false, 300)
defer cfg.cleanup()
ck := cfg.makeClient()
cfg.join(0)
n := 10
ka := make([]string, n)
va := make([]string, n)
for i := 0; i < n; i++ {
ka[i] = strconv.Itoa(i)
va[i] = randstring(1)
ck.Put(ka[i], va[i])
}
var done int32
ch := make(chan bool)
ff := func(i int, ck1 *Clerk) {
defer func() { ch <- true }()
for atomic.LoadInt32(&done) == 0 {
x := randstring(1)
ck1.Append(ka[i], x)
va[i] += x
}
}
for i := 0; i < n; i++ {
ck1 := cfg.makeClient()
go ff(i, ck1)
}
t0 := time.Now()
for time.Since(t0) < 12*time.Second {
cfg.join(2)
cfg.join(1)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.ShutdownGroup(0)
cfg.ShutdownGroup(1)
cfg.ShutdownGroup(2)
cfg.StartGroup(0)
cfg.StartGroup(1)
cfg.StartGroup(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
cfg.leave(1)
cfg.leave(2)
time.Sleep(time.Duration(rand.Int()%900) * time.Millisecond)
}
time.Sleep(2 * time.Second)
atomic.StoreInt32(&done, 1)
for i := 0; i < n; i++ {
<-ch
}
for i := 0; i < n; i++ {
check(t, ck, ka[i], va[i])
}
fmt.Printf(" ... Passed\n")
}
//
// optional test to see whether servers can handle
// shards that are not affected by a config change