update
This commit is contained in:
355
src/shardctrler/config.go
Normal file
355
src/shardctrler/config.go
Normal file
@@ -0,0 +1,355 @@
|
||||
package shardctrler
|
||||
|
||||
import "6.824/labrpc"
|
||||
import "6.824/raft"
|
||||
import "testing"
|
||||
import "os"
|
||||
|
||||
// import "log"
|
||||
import crand "crypto/rand"
|
||||
import "math/rand"
|
||||
import "encoding/base64"
|
||||
import "sync"
|
||||
import "runtime"
|
||||
import "time"
|
||||
|
||||
func randstring(n int) string {
|
||||
b := make([]byte, 2*n)
|
||||
crand.Read(b)
|
||||
s := base64.URLEncoding.EncodeToString(b)
|
||||
return s[0:n]
|
||||
}
|
||||
|
||||
// Randomize server handles
|
||||
func random_handles(kvh []*labrpc.ClientEnd) []*labrpc.ClientEnd {
|
||||
sa := make([]*labrpc.ClientEnd, len(kvh))
|
||||
copy(sa, kvh)
|
||||
for i := range sa {
|
||||
j := rand.Intn(i + 1)
|
||||
sa[i], sa[j] = sa[j], sa[i]
|
||||
}
|
||||
return sa
|
||||
}
|
||||
|
||||
type config struct {
|
||||
mu sync.Mutex
|
||||
t *testing.T
|
||||
net *labrpc.Network
|
||||
n int
|
||||
servers []*ShardCtrler
|
||||
saved []*raft.Persister
|
||||
endnames [][]string // names of each server's sending ClientEnds
|
||||
clerks map[*Clerk][]string
|
||||
nextClientId int
|
||||
start time.Time // time at which make_config() was called
|
||||
}
|
||||
|
||||
func (cfg *config) checkTimeout() {
|
||||
// enforce a two minute real-time limit on each test
|
||||
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
|
||||
cfg.t.Fatal("test took longer than 120 seconds")
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) cleanup() {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
for i := 0; i < len(cfg.servers); i++ {
|
||||
if cfg.servers[i] != nil {
|
||||
cfg.servers[i].Kill()
|
||||
}
|
||||
}
|
||||
cfg.net.Cleanup()
|
||||
cfg.checkTimeout()
|
||||
}
|
||||
|
||||
// Maximum log size across all servers
|
||||
func (cfg *config) LogSize() int {
|
||||
logsize := 0
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
n := cfg.saved[i].RaftStateSize()
|
||||
if n > logsize {
|
||||
logsize = n
|
||||
}
|
||||
}
|
||||
return logsize
|
||||
}
|
||||
|
||||
// attach server i to servers listed in to
|
||||
// caller must hold cfg.mu
|
||||
func (cfg *config) connectUnlocked(i int, to []int) {
|
||||
// log.Printf("connect peer %d to %v\n", i, to)
|
||||
|
||||
// outgoing socket files
|
||||
for j := 0; j < len(to); j++ {
|
||||
endname := cfg.endnames[i][to[j]]
|
||||
cfg.net.Enable(endname, true)
|
||||
}
|
||||
|
||||
// incoming socket files
|
||||
for j := 0; j < len(to); j++ {
|
||||
endname := cfg.endnames[to[j]][i]
|
||||
cfg.net.Enable(endname, true)
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) connect(i int, to []int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
cfg.connectUnlocked(i, to)
|
||||
}
|
||||
|
||||
// detach server i from the servers listed in from
|
||||
// caller must hold cfg.mu
|
||||
func (cfg *config) disconnectUnlocked(i int, from []int) {
|
||||
// log.Printf("disconnect peer %d from %v\n", i, from)
|
||||
|
||||
// outgoing socket files
|
||||
for j := 0; j < len(from); j++ {
|
||||
if cfg.endnames[i] != nil {
|
||||
endname := cfg.endnames[i][from[j]]
|
||||
cfg.net.Enable(endname, false)
|
||||
}
|
||||
}
|
||||
|
||||
// incoming socket files
|
||||
for j := 0; j < len(from); j++ {
|
||||
if cfg.endnames[j] != nil {
|
||||
endname := cfg.endnames[from[j]][i]
|
||||
cfg.net.Enable(endname, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) disconnect(i int, from []int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
cfg.disconnectUnlocked(i, from)
|
||||
}
|
||||
|
||||
func (cfg *config) All() []int {
|
||||
all := make([]int, cfg.n)
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
all[i] = i
|
||||
}
|
||||
return all
|
||||
}
|
||||
|
||||
func (cfg *config) ConnectAll() {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
cfg.connectUnlocked(i, cfg.All())
|
||||
}
|
||||
}
|
||||
|
||||
// Sets up 2 partitions with connectivity between servers in each partition.
|
||||
func (cfg *config) partition(p1 []int, p2 []int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
// log.Printf("partition servers into: %v %v\n", p1, p2)
|
||||
for i := 0; i < len(p1); i++ {
|
||||
cfg.disconnectUnlocked(p1[i], p2)
|
||||
cfg.connectUnlocked(p1[i], p1)
|
||||
}
|
||||
for i := 0; i < len(p2); i++ {
|
||||
cfg.disconnectUnlocked(p2[i], p1)
|
||||
cfg.connectUnlocked(p2[i], p2)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a clerk with clerk specific server names.
|
||||
// Give it connections to all of the servers, but for
|
||||
// now enable only connections to servers in to[].
|
||||
func (cfg *config) makeClient(to []int) *Clerk {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
// a fresh set of ClientEnds.
|
||||
ends := make([]*labrpc.ClientEnd, cfg.n)
|
||||
endnames := make([]string, cfg.n)
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
endnames[j] = randstring(20)
|
||||
ends[j] = cfg.net.MakeEnd(endnames[j])
|
||||
cfg.net.Connect(endnames[j], j)
|
||||
}
|
||||
|
||||
ck := MakeClerk(random_handles(ends))
|
||||
cfg.clerks[ck] = endnames
|
||||
cfg.nextClientId++
|
||||
cfg.ConnectClientUnlocked(ck, to)
|
||||
return ck
|
||||
}
|
||||
|
||||
func (cfg *config) deleteClient(ck *Clerk) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
v := cfg.clerks[ck]
|
||||
for i := 0; i < len(v); i++ {
|
||||
os.Remove(v[i])
|
||||
}
|
||||
delete(cfg.clerks, ck)
|
||||
}
|
||||
|
||||
// caller should hold cfg.mu
|
||||
func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
|
||||
// log.Printf("ConnectClient %v to %v\n", ck, to)
|
||||
endnames := cfg.clerks[ck]
|
||||
for j := 0; j < len(to); j++ {
|
||||
s := endnames[to[j]]
|
||||
cfg.net.Enable(s, true)
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) ConnectClient(ck *Clerk, to []int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
cfg.ConnectClientUnlocked(ck, to)
|
||||
}
|
||||
|
||||
// caller should hold cfg.mu
|
||||
func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
|
||||
// log.Printf("DisconnectClient %v from %v\n", ck, from)
|
||||
endnames := cfg.clerks[ck]
|
||||
for j := 0; j < len(from); j++ {
|
||||
s := endnames[from[j]]
|
||||
cfg.net.Enable(s, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) DisconnectClient(ck *Clerk, from []int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
cfg.DisconnectClientUnlocked(ck, from)
|
||||
}
|
||||
|
||||
// Shutdown a server by isolating it
|
||||
func (cfg *config) ShutdownServer(i int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
cfg.disconnectUnlocked(i, cfg.All())
|
||||
|
||||
// disable client connections to the server.
|
||||
// it's important to do this before creating
|
||||
// the new Persister in saved[i], to avoid
|
||||
// the possibility of the server returning a
|
||||
// positive reply to an Append but persisting
|
||||
// the result in the superseded Persister.
|
||||
cfg.net.DeleteServer(i)
|
||||
|
||||
// a fresh persister, in case old instance
|
||||
// continues to update the Persister.
|
||||
// but copy old persister's content so that we always
|
||||
// pass Make() the last persisted state.
|
||||
if cfg.saved[i] != nil {
|
||||
cfg.saved[i] = cfg.saved[i].Copy()
|
||||
}
|
||||
|
||||
kv := cfg.servers[i]
|
||||
if kv != nil {
|
||||
cfg.mu.Unlock()
|
||||
kv.Kill()
|
||||
cfg.mu.Lock()
|
||||
cfg.servers[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// If restart servers, first call ShutdownServer
|
||||
func (cfg *config) StartServer(i int) {
|
||||
cfg.mu.Lock()
|
||||
|
||||
// a fresh set of outgoing ClientEnd names.
|
||||
cfg.endnames[i] = make([]string, cfg.n)
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
cfg.endnames[i][j] = randstring(20)
|
||||
}
|
||||
|
||||
// a fresh set of ClientEnds.
|
||||
ends := make([]*labrpc.ClientEnd, cfg.n)
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
|
||||
cfg.net.Connect(cfg.endnames[i][j], j)
|
||||
}
|
||||
|
||||
// a fresh persister, so old instance doesn't overwrite
|
||||
// new instance's persisted state.
|
||||
// give the fresh persister a copy of the old persister's
|
||||
// state, so that the spec is that we pass StartKVServer()
|
||||
// the last persisted state.
|
||||
if cfg.saved[i] != nil {
|
||||
cfg.saved[i] = cfg.saved[i].Copy()
|
||||
} else {
|
||||
cfg.saved[i] = raft.MakePersister()
|
||||
}
|
||||
|
||||
cfg.mu.Unlock()
|
||||
|
||||
cfg.servers[i] = StartServer(ends, i, cfg.saved[i])
|
||||
|
||||
kvsvc := labrpc.MakeService(cfg.servers[i])
|
||||
rfsvc := labrpc.MakeService(cfg.servers[i].rf)
|
||||
srv := labrpc.MakeServer()
|
||||
srv.AddService(kvsvc)
|
||||
srv.AddService(rfsvc)
|
||||
cfg.net.AddServer(i, srv)
|
||||
}
|
||||
|
||||
func (cfg *config) Leader() (bool, int) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
_, is_leader := cfg.servers[i].rf.GetState()
|
||||
if is_leader {
|
||||
return true, i
|
||||
}
|
||||
}
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// Partition servers into 2 groups and put current leader in minority
|
||||
func (cfg *config) make_partition() ([]int, []int) {
|
||||
_, l := cfg.Leader()
|
||||
p1 := make([]int, cfg.n/2+1)
|
||||
p2 := make([]int, cfg.n/2)
|
||||
j := 0
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
if i != l {
|
||||
if j < len(p1) {
|
||||
p1[j] = i
|
||||
} else {
|
||||
p2[j-len(p1)] = i
|
||||
}
|
||||
j++
|
||||
}
|
||||
}
|
||||
p2[len(p2)-1] = l
|
||||
return p1, p2
|
||||
}
|
||||
|
||||
func make_config(t *testing.T, n int, unreliable bool) *config {
|
||||
runtime.GOMAXPROCS(4)
|
||||
cfg := &config{}
|
||||
cfg.t = t
|
||||
cfg.net = labrpc.MakeNetwork()
|
||||
cfg.n = n
|
||||
cfg.servers = make([]*ShardCtrler, cfg.n)
|
||||
cfg.saved = make([]*raft.Persister, cfg.n)
|
||||
cfg.endnames = make([][]string, cfg.n)
|
||||
cfg.clerks = make(map[*Clerk][]string)
|
||||
cfg.nextClientId = cfg.n + 1000 // client ids start 1000 above the highest serverid
|
||||
cfg.start = time.Now()
|
||||
|
||||
// create a full set of KV servers.
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
cfg.StartServer(i)
|
||||
}
|
||||
|
||||
cfg.ConnectAll()
|
||||
|
||||
cfg.net.Reliable(!unreliable)
|
||||
|
||||
return cfg
|
||||
}
|
||||
Reference in New Issue
Block a user