This commit is contained in:
Lily Tsai
2021-02-10 08:39:42 -08:00
commit 34a311648c
63 changed files with 76273 additions and 0 deletions

64
src/kvraft/client.go Normal file
View File

@@ -0,0 +1,64 @@
package kvraft
import "6.824/labrpc"
import "crypto/rand"
import "math/big"
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
}
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
return ck
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
return ""
}
//
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}

33
src/kvraft/common.go Normal file
View File

@@ -0,0 +1,33 @@
package kvraft
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
ErrWrongLeader = "ErrWrongLeader"
)
type Err string
// Put or Append
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
// You'll have to add definitions here.
}
type GetReply struct {
Err Err
Value string
}

425
src/kvraft/config.go Normal file
View File

@@ -0,0 +1,425 @@
package kvraft
import "6.824/labrpc"
import "testing"
import "os"
// import "log"
import crand "crypto/rand"
import "math/big"
import "math/rand"
import "encoding/base64"
import "sync"
import "runtime"
import "6.824/raft"
import "fmt"
import "time"
import "sync/atomic"
func randstring(n int) string {
b := make([]byte, 2*n)
crand.Read(b)
s := base64.URLEncoding.EncodeToString(b)
return s[0:n]
}
func makeSeed() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := crand.Int(crand.Reader, max)
x := bigx.Int64()
return x
}
// Randomize server handles
func random_handles(kvh []*labrpc.ClientEnd) []*labrpc.ClientEnd {
sa := make([]*labrpc.ClientEnd, len(kvh))
copy(sa, kvh)
for i := range sa {
j := rand.Intn(i + 1)
sa[i], sa[j] = sa[j], sa[i]
}
return sa
}
type config struct {
mu sync.Mutex
t *testing.T
net *labrpc.Network
n int
kvservers []*KVServer
saved []*raft.Persister
endnames [][]string // names of each server's sending ClientEnds
clerks map[*Clerk][]string
nextClientId int
maxraftstate int
start time.Time // time at which make_config() was called
// begin()/end() statistics
t0 time.Time // time at which test_test.go called cfg.begin()
rpcs0 int // rpcTotal() at start of test
ops int32 // number of clerk get/put/append method calls
}
func (cfg *config) checkTimeout() {
// enforce a two minute real-time limit on each test
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
cfg.t.Fatal("test took longer than 120 seconds")
}
}
func (cfg *config) cleanup() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
for i := 0; i < len(cfg.kvservers); i++ {
if cfg.kvservers[i] != nil {
cfg.kvservers[i].Kill()
}
}
cfg.net.Cleanup()
cfg.checkTimeout()
}
// Maximum log size across all servers
func (cfg *config) LogSize() int {
logsize := 0
for i := 0; i < cfg.n; i++ {
n := cfg.saved[i].RaftStateSize()
if n > logsize {
logsize = n
}
}
return logsize
}
// Maximum snapshot size across all servers
func (cfg *config) SnapshotSize() int {
snapshotsize := 0
for i := 0; i < cfg.n; i++ {
n := cfg.saved[i].SnapshotSize()
if n > snapshotsize {
snapshotsize = n
}
}
return snapshotsize
}
// attach server i to servers listed in to
// caller must hold cfg.mu
func (cfg *config) connectUnlocked(i int, to []int) {
// log.Printf("connect peer %d to %v\n", i, to)
// outgoing socket files
for j := 0; j < len(to); j++ {
endname := cfg.endnames[i][to[j]]
cfg.net.Enable(endname, true)
}
// incoming socket files
for j := 0; j < len(to); j++ {
endname := cfg.endnames[to[j]][i]
cfg.net.Enable(endname, true)
}
}
func (cfg *config) connect(i int, to []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.connectUnlocked(i, to)
}
// detach server i from the servers listed in from
// caller must hold cfg.mu
func (cfg *config) disconnectUnlocked(i int, from []int) {
// log.Printf("disconnect peer %d from %v\n", i, from)
// outgoing socket files
for j := 0; j < len(from); j++ {
if cfg.endnames[i] != nil {
endname := cfg.endnames[i][from[j]]
cfg.net.Enable(endname, false)
}
}
// incoming socket files
for j := 0; j < len(from); j++ {
if cfg.endnames[j] != nil {
endname := cfg.endnames[from[j]][i]
cfg.net.Enable(endname, false)
}
}
}
func (cfg *config) disconnect(i int, from []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.disconnectUnlocked(i, from)
}
func (cfg *config) All() []int {
all := make([]int, cfg.n)
for i := 0; i < cfg.n; i++ {
all[i] = i
}
return all
}
func (cfg *config) ConnectAll() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
for i := 0; i < cfg.n; i++ {
cfg.connectUnlocked(i, cfg.All())
}
}
// Sets up 2 partitions with connectivity between servers in each partition.
func (cfg *config) partition(p1 []int, p2 []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
// log.Printf("partition servers into: %v %v\n", p1, p2)
for i := 0; i < len(p1); i++ {
cfg.disconnectUnlocked(p1[i], p2)
cfg.connectUnlocked(p1[i], p1)
}
for i := 0; i < len(p2); i++ {
cfg.disconnectUnlocked(p2[i], p1)
cfg.connectUnlocked(p2[i], p2)
}
}
// Create a clerk with clerk specific server names.
// Give it connections to all of the servers, but for
// now enable only connections to servers in to[].
func (cfg *config) makeClient(to []int) *Clerk {
cfg.mu.Lock()
defer cfg.mu.Unlock()
// a fresh set of ClientEnds.
ends := make([]*labrpc.ClientEnd, cfg.n)
endnames := make([]string, cfg.n)
for j := 0; j < cfg.n; j++ {
endnames[j] = randstring(20)
ends[j] = cfg.net.MakeEnd(endnames[j])
cfg.net.Connect(endnames[j], j)
}
ck := MakeClerk(random_handles(ends))
cfg.clerks[ck] = endnames
cfg.nextClientId++
cfg.ConnectClientUnlocked(ck, to)
return ck
}
func (cfg *config) deleteClient(ck *Clerk) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
v := cfg.clerks[ck]
for i := 0; i < len(v); i++ {
os.Remove(v[i])
}
delete(cfg.clerks, ck)
}
// caller should hold cfg.mu
func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
// log.Printf("ConnectClient %v to %v\n", ck, to)
endnames := cfg.clerks[ck]
for j := 0; j < len(to); j++ {
s := endnames[to[j]]
cfg.net.Enable(s, true)
}
}
func (cfg *config) ConnectClient(ck *Clerk, to []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.ConnectClientUnlocked(ck, to)
}
// caller should hold cfg.mu
func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
// log.Printf("DisconnectClient %v from %v\n", ck, from)
endnames := cfg.clerks[ck]
for j := 0; j < len(from); j++ {
s := endnames[from[j]]
cfg.net.Enable(s, false)
}
}
func (cfg *config) DisconnectClient(ck *Clerk, from []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.DisconnectClientUnlocked(ck, from)
}
// Shutdown a server by isolating it
func (cfg *config) ShutdownServer(i int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.disconnectUnlocked(i, cfg.All())
// disable client connections to the server.
// it's important to do this before creating
// the new Persister in saved[i], to avoid
// the possibility of the server returning a
// positive reply to an Append but persisting
// the result in the superseded Persister.
cfg.net.DeleteServer(i)
// a fresh persister, in case old instance
// continues to update the Persister.
// but copy old persister's content so that we always
// pass Make() the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
}
kv := cfg.kvservers[i]
if kv != nil {
cfg.mu.Unlock()
kv.Kill()
cfg.mu.Lock()
cfg.kvservers[i] = nil
}
}
// If restart servers, first call ShutdownServer
func (cfg *config) StartServer(i int) {
cfg.mu.Lock()
// a fresh set of outgoing ClientEnd names.
cfg.endnames[i] = make([]string, cfg.n)
for j := 0; j < cfg.n; j++ {
cfg.endnames[i][j] = randstring(20)
}
// a fresh set of ClientEnds.
ends := make([]*labrpc.ClientEnd, cfg.n)
for j := 0; j < cfg.n; j++ {
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
cfg.net.Connect(cfg.endnames[i][j], j)
}
// a fresh persister, so old instance doesn't overwrite
// new instance's persisted state.
// give the fresh persister a copy of the old persister's
// state, so that the spec is that we pass StartKVServer()
// the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
} else {
cfg.saved[i] = raft.MakePersister()
}
cfg.mu.Unlock()
cfg.kvservers[i] = StartKVServer(ends, i, cfg.saved[i], cfg.maxraftstate)
kvsvc := labrpc.MakeService(cfg.kvservers[i])
rfsvc := labrpc.MakeService(cfg.kvservers[i].rf)
srv := labrpc.MakeServer()
srv.AddService(kvsvc)
srv.AddService(rfsvc)
cfg.net.AddServer(i, srv)
}
func (cfg *config) Leader() (bool, int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
for i := 0; i < cfg.n; i++ {
_, is_leader := cfg.kvservers[i].rf.GetState()
if is_leader {
return true, i
}
}
return false, 0
}
// Partition servers into 2 groups and put current leader in minority
func (cfg *config) make_partition() ([]int, []int) {
_, l := cfg.Leader()
p1 := make([]int, cfg.n/2+1)
p2 := make([]int, cfg.n/2)
j := 0
for i := 0; i < cfg.n; i++ {
if i != l {
if j < len(p1) {
p1[j] = i
} else {
p2[j-len(p1)] = i
}
j++
}
}
p2[len(p2)-1] = l
return p1, p2
}
var ncpu_once sync.Once
func make_config(t *testing.T, n int, unreliable bool, maxraftstate int) *config {
ncpu_once.Do(func() {
if runtime.NumCPU() < 2 {
fmt.Printf("warning: only one CPU, which may conceal locking bugs\n")
}
rand.Seed(makeSeed())
})
runtime.GOMAXPROCS(4)
cfg := &config{}
cfg.t = t
cfg.net = labrpc.MakeNetwork()
cfg.n = n
cfg.kvservers = make([]*KVServer, cfg.n)
cfg.saved = make([]*raft.Persister, cfg.n)
cfg.endnames = make([][]string, cfg.n)
cfg.clerks = make(map[*Clerk][]string)
cfg.nextClientId = cfg.n + 1000 // client ids start 1000 above the highest serverid
cfg.maxraftstate = maxraftstate
cfg.start = time.Now()
// create a full set of KV servers.
for i := 0; i < cfg.n; i++ {
cfg.StartServer(i)
}
cfg.ConnectAll()
cfg.net.Reliable(!unreliable)
return cfg
}
func (cfg *config) rpcTotal() int {
return cfg.net.GetTotalCount()
}
// start a Test.
// print the Test message.
// e.g. cfg.begin("Test (2B): RPC counts aren't too high")
func (cfg *config) begin(description string) {
fmt.Printf("%s ...\n", description)
cfg.t0 = time.Now()
cfg.rpcs0 = cfg.rpcTotal()
atomic.StoreInt32(&cfg.ops, 0)
}
func (cfg *config) op() {
atomic.AddInt32(&cfg.ops, 1)
}
// end a Test -- the fact that we got here means there
// was no failure.
// print the Passed message,
// and some performance numbers.
func (cfg *config) end() {
cfg.checkTimeout()
if cfg.t.Failed() == false {
t := time.Since(cfg.t0).Seconds() // real time
npeers := cfg.n // number of Raft peers
nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends
ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls
fmt.Printf(" ... Passed --")
fmt.Printf(" %4.1f %d %5d %4d\n", t, npeers, nrpc, ops)
}
}

101
src/kvraft/server.go Normal file
View File

@@ -0,0 +1,101 @@
package kvraft
import (
"6.824/labgob"
"6.824/labrpc"
"6.824/raft"
"log"
"sync"
"sync/atomic"
)
const Debug = false
func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug {
log.Printf(format, a...)
}
return
}
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
}
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
}
//
// the tester calls Kill() when a KVServer instance won't
// be needed again. for your convenience, we supply
// code to set rf.dead (without needing a lock),
// and a killed() method to test rf.dead in
// long-running loops. you can also add your own
// code to Kill(). you're not required to do anything
// about this, but it may be convenient (for example)
// to suppress debug output from a Kill()ed instance.
//
func (kv *KVServer) Kill() {
atomic.StoreInt32(&kv.dead, 1)
kv.rf.Kill()
// Your code here, if desired.
}
func (kv *KVServer) killed() bool {
z := atomic.LoadInt32(&kv.dead)
return z == 1
}
//
// servers[] contains the ports of the set of
// servers that will cooperate via Raft to
// form the fault-tolerant key/value service.
// me is the index of the current server in servers[].
// the k/v server should store snapshots through the underlying Raft
// implementation, which should call persister.SaveStateAndSnapshot() to
// atomically save the Raft state along with the snapshot.
// the k/v server should snapshot when Raft's saved state exceeds maxraftstate bytes,
// in order to allow Raft to garbage-collect its log. if maxraftstate is -1,
// you don't need to snapshot.
// StartKVServer() must return quickly, so it should start goroutines
// for any long-running work.
//
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
// You may need initialization code here.
return kv
}

714
src/kvraft/test_test.go Normal file
View File

@@ -0,0 +1,714 @@
package kvraft
import "6.824/porcupine"
import "6.824/models"
import "testing"
import "strconv"
import "time"
import "math/rand"
import "strings"
import "sync"
import "sync/atomic"
import "fmt"
import "io/ioutil"
// The tester generously allows solutions to complete elections in one second
// (much more than the paper's range of timeouts).
const electionTimeout = 1 * time.Second
const linearizabilityCheckTimeout = 1 * time.Second
type OpLog struct {
operations []porcupine.Operation
sync.Mutex
}
func (log *OpLog) Append(op porcupine.Operation) {
log.Lock()
defer log.Unlock()
log.operations = append(log.operations, op)
}
func (log *OpLog) Read() []porcupine.Operation {
log.Lock()
defer log.Unlock()
ops := make([]porcupine.Operation, len(log.operations))
copy(ops, log.operations)
return ops
}
// get/put/putappend that keep counts
func Get(cfg *config, ck *Clerk, key string, log *OpLog, cli int) string {
start := time.Now().UnixNano()
v := ck.Get(key)
end := time.Now().UnixNano()
cfg.op()
if log != nil {
log.Append(porcupine.Operation{
Input: models.KvInput{Op: 0, Key: key},
Output: models.KvOutput{Value: v},
Call: start,
Return: end,
ClientId: cli,
})
}
return v
}
func Put(cfg *config, ck *Clerk, key string, value string, log *OpLog, cli int) {
start := time.Now().UnixNano()
ck.Put(key, value)
end := time.Now().UnixNano()
cfg.op()
if log != nil {
log.Append(porcupine.Operation{
Input: models.KvInput{Op: 1, Key: key, Value: value},
Output: models.KvOutput{},
Call: start,
Return: end,
ClientId: cli,
})
}
}
func Append(cfg *config, ck *Clerk, key string, value string, log *OpLog, cli int) {
start := time.Now().UnixNano()
ck.Append(key, value)
end := time.Now().UnixNano()
cfg.op()
if log != nil {
log.Append(porcupine.Operation{
Input: models.KvInput{Op: 2, Key: key, Value: value},
Output: models.KvOutput{},
Call: start,
Return: end,
ClientId: cli,
})
}
}
func check(cfg *config, t *testing.T, ck *Clerk, key string, value string) {
v := Get(cfg, ck, key, nil, -1)
if v != value {
t.Fatalf("Get(%v): expected:\n%v\nreceived:\n%v", key, value, v)
}
}
// a client runs the function f and then signals it is done
func run_client(t *testing.T, cfg *config, me int, ca chan bool, fn func(me int, ck *Clerk, t *testing.T)) {
ok := false
defer func() { ca <- ok }()
ck := cfg.makeClient(cfg.All())
fn(me, ck, t)
ok = true
cfg.deleteClient(ck)
}
// spawn ncli clients and wait until they are all done
func spawn_clients_and_wait(t *testing.T, cfg *config, ncli int, fn func(me int, ck *Clerk, t *testing.T)) {
ca := make([]chan bool, ncli)
for cli := 0; cli < ncli; cli++ {
ca[cli] = make(chan bool)
go run_client(t, cfg, cli, ca[cli], fn)
}
// log.Printf("spawn_clients_and_wait: waiting for clients")
for cli := 0; cli < ncli; cli++ {
ok := <-ca[cli]
// log.Printf("spawn_clients_and_wait: client %d is done\n", cli)
if ok == false {
t.Fatalf("failure")
}
}
}
// predict effect of Append(k, val) if old value is prev.
func NextValue(prev string, val string) string {
return prev + val
}
// check that for a specific client all known appends are present in a value,
// and in order
func checkClntAppends(t *testing.T, clnt int, v string, count int) {
lastoff := -1
for j := 0; j < count; j++ {
wanted := "x " + strconv.Itoa(clnt) + " " + strconv.Itoa(j) + " y"
off := strings.Index(v, wanted)
if off < 0 {
t.Fatalf("%v missing element %v in Append result %v", clnt, wanted, v)
}
off1 := strings.LastIndex(v, wanted)
if off1 != off {
t.Fatalf("duplicate element %v in Append result", wanted)
}
if off <= lastoff {
t.Fatalf("wrong order for element %v in Append result", wanted)
}
lastoff = off
}
}
// check that all known appends are present in a value,
// and are in order for each concurrent client.
func checkConcurrentAppends(t *testing.T, v string, counts []int) {
nclients := len(counts)
for i := 0; i < nclients; i++ {
lastoff := -1
for j := 0; j < counts[i]; j++ {
wanted := "x " + strconv.Itoa(i) + " " + strconv.Itoa(j) + " y"
off := strings.Index(v, wanted)
if off < 0 {
t.Fatalf("%v missing element %v in Append result %v", i, wanted, v)
}
off1 := strings.LastIndex(v, wanted)
if off1 != off {
t.Fatalf("duplicate element %v in Append result", wanted)
}
if off <= lastoff {
t.Fatalf("wrong order for element %v in Append result", wanted)
}
lastoff = off
}
}
}
// repartition the servers periodically
func partitioner(t *testing.T, cfg *config, ch chan bool, done *int32) {
defer func() { ch <- true }()
for atomic.LoadInt32(done) == 0 {
a := make([]int, cfg.n)
for i := 0; i < cfg.n; i++ {
a[i] = (rand.Int() % 2)
}
pa := make([][]int, 2)
for i := 0; i < 2; i++ {
pa[i] = make([]int, 0)
for j := 0; j < cfg.n; j++ {
if a[j] == i {
pa[i] = append(pa[i], j)
}
}
}
cfg.partition(pa[0], pa[1])
time.Sleep(electionTimeout + time.Duration(rand.Int63()%200)*time.Millisecond)
}
}
// Basic test is as follows: one or more clients submitting Append/Get
// operations to set of servers for some period of time. After the period is
// over, test checks that all appended values are present and in order for a
// particular key. If unreliable is set, RPCs may fail. If crash is set, the
// servers crash after the period is over and restart. If partitions is set,
// the test repartitions the network concurrently with the clients and servers. If
// maxraftstate is a positive number, the size of the state for Raft (i.e., log
// size) shouldn't exceed 8*maxraftstate. If maxraftstate is negative,
// snapshots shouldn't be used.
func GenericTest(t *testing.T, part string, nclients int, nservers int, unreliable bool, crash bool, partitions bool, maxraftstate int, randomkeys bool) {
title := "Test: "
if unreliable {
// the network drops RPC requests and replies.
title = title + "unreliable net, "
}
if crash {
// peers re-start, and thus persistence must work.
title = title + "restarts, "
}
if partitions {
// the network may partition
title = title + "partitions, "
}
if maxraftstate != -1 {
title = title + "snapshots, "
}
if randomkeys {
title = title + "random keys, "
}
if nclients > 1 {
title = title + "many clients"
} else {
title = title + "one client"
}
title = title + " (" + part + ")" // 3A or 3B
cfg := make_config(t, nservers, unreliable, maxraftstate)
defer cfg.cleanup()
cfg.begin(title)
opLog := &OpLog{}
ck := cfg.makeClient(cfg.All())
done_partitioner := int32(0)
done_clients := int32(0)
ch_partitioner := make(chan bool)
clnts := make([]chan int, nclients)
for i := 0; i < nclients; i++ {
clnts[i] = make(chan int)
}
for i := 0; i < 3; i++ {
// log.Printf("Iteration %v\n", i)
atomic.StoreInt32(&done_clients, 0)
atomic.StoreInt32(&done_partitioner, 0)
go spawn_clients_and_wait(t, cfg, nclients, func(cli int, myck *Clerk, t *testing.T) {
j := 0
defer func() {
clnts[cli] <- j
}()
last := "" // only used when not randomkeys
if !randomkeys {
Put(cfg, myck, strconv.Itoa(cli), last, opLog, cli)
}
for atomic.LoadInt32(&done_clients) == 0 {
var key string
if randomkeys {
key = strconv.Itoa(rand.Intn(nclients))
} else {
key = strconv.Itoa(cli)
}
nv := "x " + strconv.Itoa(cli) + " " + strconv.Itoa(j) + " y"
if (rand.Int() % 1000) < 500 {
// log.Printf("%d: client new append %v\n", cli, nv)
Append(cfg, myck, key, nv, opLog, cli)
if !randomkeys {
last = NextValue(last, nv)
}
j++
} else if randomkeys && (rand.Int()%1000) < 100 {
// we only do this when using random keys, because it would break the
// check done after Get() operations
Put(cfg, myck, key, nv, opLog, cli)
j++
} else {
// log.Printf("%d: client new get %v\n", cli, key)
v := Get(cfg, myck, key, opLog, cli)
// the following check only makes sense when we're not using random keys
if !randomkeys && v != last {
t.Fatalf("get wrong value, key %v, wanted:\n%v\n, got\n%v\n", key, last, v)
}
}
}
})
if partitions {
// Allow the clients to perform some operations without interruption
time.Sleep(1 * time.Second)
go partitioner(t, cfg, ch_partitioner, &done_partitioner)
}
time.Sleep(5 * time.Second)
atomic.StoreInt32(&done_clients, 1) // tell clients to quit
atomic.StoreInt32(&done_partitioner, 1) // tell partitioner to quit
if partitions {
// log.Printf("wait for partitioner\n")
<-ch_partitioner
// reconnect network and submit a request. A client may
// have submitted a request in a minority. That request
// won't return until that server discovers a new term
// has started.
cfg.ConnectAll()
// wait for a while so that we have a new term
time.Sleep(electionTimeout)
}
if crash {
// log.Printf("shutdown servers\n")
for i := 0; i < nservers; i++ {
cfg.ShutdownServer(i)
}
// Wait for a while for servers to shutdown, since
// shutdown isn't a real crash and isn't instantaneous
time.Sleep(electionTimeout)
// log.Printf("restart servers\n")
// crash and re-start all
for i := 0; i < nservers; i++ {
cfg.StartServer(i)
}
cfg.ConnectAll()
}
// log.Printf("wait for clients\n")
for i := 0; i < nclients; i++ {
// log.Printf("read from clients %d\n", i)
j := <-clnts[i]
// if j < 10 {
// log.Printf("Warning: client %d managed to perform only %d put operations in 1 sec?\n", i, j)
// }
key := strconv.Itoa(i)
// log.Printf("Check %v for client %d\n", j, i)
v := Get(cfg, ck, key, opLog, 0)
if !randomkeys {
checkClntAppends(t, i, v, j)
}
}
if maxraftstate > 0 {
// Check maximum after the servers have processed all client
// requests and had time to checkpoint.
sz := cfg.LogSize()
if sz > 8*maxraftstate {
t.Fatalf("logs were not trimmed (%v > 8*%v)", sz, maxraftstate)
}
}
if maxraftstate < 0 {
// Check that snapshots are not used
ssz := cfg.SnapshotSize()
if ssz > 0 {
t.Fatalf("snapshot too large (%v), should not be used when maxraftstate = %d", ssz, maxraftstate)
}
}
}
res, info := porcupine.CheckOperationsVerbose(models.KvModel, opLog.Read(), linearizabilityCheckTimeout)
if res == porcupine.Illegal {
file, err := ioutil.TempFile("", "*.html")
if err != nil {
fmt.Printf("info: failed to create temp file for visualization")
} else {
err = porcupine.Visualize(models.KvModel, info, file)
if err != nil {
fmt.Printf("info: failed to write history visualization to %s\n", file.Name())
} else {
fmt.Printf("info: wrote history visualization to %s\n", file.Name())
}
}
t.Fatal("history is not linearizable")
} else if res == porcupine.Unknown {
fmt.Println("info: linearizability check timed out, assuming history is ok")
}
cfg.end()
}
// Check that ops are committed fast enough, better than 1 per heartbeat interval
func GenericTestSpeed(t *testing.T, part string, maxraftstate int) {
const nservers = 3
const numOps = 1000
cfg := make_config(t, nservers, false, maxraftstate)
defer cfg.cleanup()
ck := cfg.makeClient(cfg.All())
cfg.begin(fmt.Sprintf("Test: ops complete fast enough (%s)", part))
// wait until first op completes, so we know a leader is elected
// and KV servers are ready to process client requests
ck.Get("x")
start := time.Now()
for i := 0; i < numOps; i++ {
ck.Append("x", "x 0 "+strconv.Itoa(i)+" y")
}
dur := time.Since(start)
v := ck.Get("x")
checkClntAppends(t, 0, v, numOps)
// heartbeat interval should be ~ 100 ms; require at least 3 ops per
const heartbeatInterval = 100 * time.Millisecond
const opsPerInterval = 3
const timePerOp = heartbeatInterval / opsPerInterval
if dur > numOps*timePerOp {
t.Fatalf("Operations completed too slowly %v/op > %v/op\n", dur/numOps, timePerOp)
}
}
func TestBasic3A(t *testing.T) {
// Test: one client (3A) ...
GenericTest(t, "3A", 1, 5, false, false, false, -1, false)
}
func TestSpeed3A(t *testing.T) {
GenericTestSpeed(t, "3A", -1)
}
func TestConcurrent3A(t *testing.T) {
// Test: many clients (3A) ...
GenericTest(t, "3A", 5, 5, false, false, false, -1, false)
}
func TestUnreliable3A(t *testing.T) {
// Test: unreliable net, many clients (3A) ...
GenericTest(t, "3A", 5, 5, true, false, false, -1, false)
}
func TestUnreliableOneKey3A(t *testing.T) {
const nservers = 3
cfg := make_config(t, nservers, true, -1)
defer cfg.cleanup()
ck := cfg.makeClient(cfg.All())
cfg.begin("Test: concurrent append to same key, unreliable (3A)")
Put(cfg, ck, "k", "", nil, -1)
const nclient = 5
const upto = 10
spawn_clients_and_wait(t, cfg, nclient, func(me int, myck *Clerk, t *testing.T) {
n := 0
for n < upto {
Append(cfg, myck, "k", "x "+strconv.Itoa(me)+" "+strconv.Itoa(n)+" y", nil, -1)
n++
}
})
var counts []int
for i := 0; i < nclient; i++ {
counts = append(counts, upto)
}
vx := Get(cfg, ck, "k", nil, -1)
checkConcurrentAppends(t, vx, counts)
cfg.end()
}
// Submit a request in the minority partition and check that the requests
// doesn't go through until the partition heals. The leader in the original
// network ends up in the minority partition.
func TestOnePartition3A(t *testing.T) {
const nservers = 5
cfg := make_config(t, nservers, false, -1)
defer cfg.cleanup()
ck := cfg.makeClient(cfg.All())
Put(cfg, ck, "1", "13", nil, -1)
cfg.begin("Test: progress in majority (3A)")
p1, p2 := cfg.make_partition()
cfg.partition(p1, p2)
ckp1 := cfg.makeClient(p1) // connect ckp1 to p1
ckp2a := cfg.makeClient(p2) // connect ckp2a to p2
ckp2b := cfg.makeClient(p2) // connect ckp2b to p2
Put(cfg, ckp1, "1", "14", nil, -1)
check(cfg, t, ckp1, "1", "14")
cfg.end()
done0 := make(chan bool)
done1 := make(chan bool)
cfg.begin("Test: no progress in minority (3A)")
go func() {
Put(cfg, ckp2a, "1", "15", nil, -1)
done0 <- true
}()
go func() {
Get(cfg, ckp2b, "1", nil, -1) // different clerk in p2
done1 <- true
}()
select {
case <-done0:
t.Fatalf("Put in minority completed")
case <-done1:
t.Fatalf("Get in minority completed")
case <-time.After(time.Second):
}
check(cfg, t, ckp1, "1", "14")
Put(cfg, ckp1, "1", "16", nil, -1)
check(cfg, t, ckp1, "1", "16")
cfg.end()
cfg.begin("Test: completion after heal (3A)")
cfg.ConnectAll()
cfg.ConnectClient(ckp2a, cfg.All())
cfg.ConnectClient(ckp2b, cfg.All())
time.Sleep(electionTimeout)
select {
case <-done0:
case <-time.After(30 * 100 * time.Millisecond):
t.Fatalf("Put did not complete")
}
select {
case <-done1:
case <-time.After(30 * 100 * time.Millisecond):
t.Fatalf("Get did not complete")
default:
}
check(cfg, t, ck, "1", "15")
cfg.end()
}
func TestManyPartitionsOneClient3A(t *testing.T) {
// Test: partitions, one client (3A) ...
GenericTest(t, "3A", 1, 5, false, false, true, -1, false)
}
func TestManyPartitionsManyClients3A(t *testing.T) {
// Test: partitions, many clients (3A) ...
GenericTest(t, "3A", 5, 5, false, false, true, -1, false)
}
func TestPersistOneClient3A(t *testing.T) {
// Test: restarts, one client (3A) ...
GenericTest(t, "3A", 1, 5, false, true, false, -1, false)
}
func TestPersistConcurrent3A(t *testing.T) {
// Test: restarts, many clients (3A) ...
GenericTest(t, "3A", 5, 5, false, true, false, -1, false)
}
func TestPersistConcurrentUnreliable3A(t *testing.T) {
// Test: unreliable net, restarts, many clients (3A) ...
GenericTest(t, "3A", 5, 5, true, true, false, -1, false)
}
func TestPersistPartition3A(t *testing.T) {
// Test: restarts, partitions, many clients (3A) ...
GenericTest(t, "3A", 5, 5, false, true, true, -1, false)
}
func TestPersistPartitionUnreliable3A(t *testing.T) {
// Test: unreliable net, restarts, partitions, many clients (3A) ...
GenericTest(t, "3A", 5, 5, true, true, true, -1, false)
}
func TestPersistPartitionUnreliableLinearizable3A(t *testing.T) {
// Test: unreliable net, restarts, partitions, random keys, many clients (3A) ...
GenericTest(t, "3A", 15, 7, true, true, true, -1, true)
}
//
// if one server falls behind, then rejoins, does it
// recover by using the InstallSnapshot RPC?
// also checks that majority discards committed log entries
// even if minority doesn't respond.
//
func TestSnapshotRPC3B(t *testing.T) {
const nservers = 3
maxraftstate := 1000
cfg := make_config(t, nservers, false, maxraftstate)
defer cfg.cleanup()
ck := cfg.makeClient(cfg.All())
cfg.begin("Test: InstallSnapshot RPC (3B)")
Put(cfg, ck, "a", "A", nil, -1)
check(cfg, t, ck, "a", "A")
// a bunch of puts into the majority partition.
cfg.partition([]int{0, 1}, []int{2})
{
ck1 := cfg.makeClient([]int{0, 1})
for i := 0; i < 50; i++ {
Put(cfg, ck1, strconv.Itoa(i), strconv.Itoa(i), nil, -1)
}
time.Sleep(electionTimeout)
Put(cfg, ck1, "b", "B", nil, -1)
}
// check that the majority partition has thrown away
// most of its log entries.
sz := cfg.LogSize()
if sz > 8*maxraftstate {
t.Fatalf("logs were not trimmed (%v > 8*%v)", sz, maxraftstate)
}
// now make group that requires participation of
// lagging server, so that it has to catch up.
cfg.partition([]int{0, 2}, []int{1})
{
ck1 := cfg.makeClient([]int{0, 2})
Put(cfg, ck1, "c", "C", nil, -1)
Put(cfg, ck1, "d", "D", nil, -1)
check(cfg, t, ck1, "a", "A")
check(cfg, t, ck1, "b", "B")
check(cfg, t, ck1, "1", "1")
check(cfg, t, ck1, "49", "49")
}
// now everybody
cfg.partition([]int{0, 1, 2}, []int{})
Put(cfg, ck, "e", "E", nil, -1)
check(cfg, t, ck, "c", "C")
check(cfg, t, ck, "e", "E")
check(cfg, t, ck, "1", "1")
cfg.end()
}
// are the snapshots not too huge? 500 bytes is a generous bound for the
// operations we're doing here.
func TestSnapshotSize3B(t *testing.T) {
const nservers = 3
maxraftstate := 1000
maxsnapshotstate := 500
cfg := make_config(t, nservers, false, maxraftstate)
defer cfg.cleanup()
ck := cfg.makeClient(cfg.All())
cfg.begin("Test: snapshot size is reasonable (3B)")
for i := 0; i < 200; i++ {
Put(cfg, ck, "x", "0", nil, -1)
check(cfg, t, ck, "x", "0")
Put(cfg, ck, "x", "1", nil, -1)
check(cfg, t, ck, "x", "1")
}
// check that servers have thrown away most of their log entries
sz := cfg.LogSize()
if sz > 8*maxraftstate {
t.Fatalf("logs were not trimmed (%v > 8*%v)", sz, maxraftstate)
}
// check that the snapshots are not unreasonably large
ssz := cfg.SnapshotSize()
if ssz > maxsnapshotstate {
t.Fatalf("snapshot too large (%v > %v)", ssz, maxsnapshotstate)
}
cfg.end()
}
func TestSpeed3B(t *testing.T) {
GenericTestSpeed(t, "3B", 1000)
}
func TestSnapshotRecover3B(t *testing.T) {
// Test: restarts, snapshots, one client (3B) ...
GenericTest(t, "3B", 1, 5, false, true, false, 1000, false)
}
func TestSnapshotRecoverManyClients3B(t *testing.T) {
// Test: restarts, snapshots, many clients (3B) ...
GenericTest(t, "3B", 20, 5, false, true, false, 1000, false)
}
func TestSnapshotUnreliable3B(t *testing.T) {
// Test: unreliable net, snapshots, many clients (3B) ...
GenericTest(t, "3B", 5, 5, true, false, false, 1000, false)
}
func TestSnapshotUnreliableRecover3B(t *testing.T) {
// Test: unreliable net, restarts, snapshots, many clients (3B) ...
GenericTest(t, "3B", 5, 5, true, true, false, 1000, false)
}
func TestSnapshotUnreliableRecoverConcurrentPartition3B(t *testing.T) {
// Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ...
GenericTest(t, "3B", 5, 5, true, true, true, 1000, false)
}
func TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable3B(t *testing.T) {
// Test: unreliable net, restarts, partitions, snapshots, random keys, many clients (3B) ...
GenericTest(t, "3B", 15, 7, true, true, true, 1000, true)
}