diff --git a/src/shardctrler/client.go b/src/shardctrler/client.go new file mode 100644 index 0000000..1a90443 --- /dev/null +++ b/src/shardctrler/client.go @@ -0,0 +1,101 @@ +package shardctrler + +// +// Shardctrler clerk. +// + +import "6.824/labrpc" +import "time" +import "crypto/rand" +import "math/big" + +type Clerk struct { + servers []*labrpc.ClientEnd + // Your data here. +} + +func nrand() int64 { + max := big.NewInt(int64(1) << 62) + bigx, _ := rand.Int(rand.Reader, max) + x := bigx.Int64() + return x +} + +func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { + ck := new(Clerk) + ck.servers = servers + // Your code here. + return ck +} + +func (ck *Clerk) Query(num int) Config { + args := &QueryArgs{} + // Your code here. + args.Num = num + for { + // try each known server. + for _, srv := range ck.servers { + var reply QueryReply + ok := srv.Call("ShardCtrler.Query", args, &reply) + if ok && reply.WrongLeader == false { + return reply.Config + } + } + time.Sleep(100 * time.Millisecond) + } +} + +func (ck *Clerk) Join(servers map[int][]string) { + args := &JoinArgs{} + // Your code here. + args.Servers = servers + + for { + // try each known server. + for _, srv := range ck.servers { + var reply JoinReply + ok := srv.Call("ShardCtrler.Join", args, &reply) + if ok && reply.WrongLeader == false { + return + } + } + time.Sleep(100 * time.Millisecond) + } +} + +func (ck *Clerk) Leave(gids []int) { + args := &LeaveArgs{} + // Your code here. + args.GIDs = gids + + for { + // try each known server. + for _, srv := range ck.servers { + var reply LeaveReply + ok := srv.Call("ShardCtrler.Leave", args, &reply) + if ok && reply.WrongLeader == false { + return + } + } + time.Sleep(100 * time.Millisecond) + } +} + +func (ck *Clerk) Move(shard int, gid int) { + args := &MoveArgs{} + // Your code here. + args.Shard = shard + args.GID = gid + + for { + // try each known server. + for _, srv := range ck.servers { + var reply MoveReply + ok := srv.Call("ShardCtrler.Move", args, &reply) + if ok && reply.WrongLeader == false { + return + } + } + time.Sleep(100 * time.Millisecond) + } +} diff --git a/src/shardctrler/common.go b/src/shardctrler/common.go new file mode 100644 index 0000000..031806a --- /dev/null +++ b/src/shardctrler/common.go @@ -0,0 +1,73 @@ +package shardctrler + +// +// Shard controler: assigns shards to replication groups. +// +// RPC interface: +// Join(servers) -- add a set of groups (gid -> server-list mapping). +// Leave(gids) -- delete a set of groups. +// Move(shard, gid) -- hand off one shard from current owner to gid. +// Query(num) -> fetch Config # num, or latest config if num==-1. +// +// A Config (configuration) describes a set of replica groups, and the +// replica group responsible for each shard. Configs are numbered. Config +// #0 is the initial configuration, with no groups and all shards +// assigned to group 0 (the invalid group). +// +// You will need to add fields to the RPC argument structs. +// + +// The number of shards. +const NShards = 10 + +// A configuration -- an assignment of shards to groups. +// Please don't change this. +type Config struct { + Num int // config number + Shards [NShards]int // shard -> gid + Groups map[int][]string // gid -> servers[] +} + +const ( + OK = "OK" +) + +type Err string + +type JoinArgs struct { + Servers map[int][]string // new GID -> servers mappings +} + +type JoinReply struct { + WrongLeader bool + Err Err +} + +type LeaveArgs struct { + GIDs []int +} + +type LeaveReply struct { + WrongLeader bool + Err Err +} + +type MoveArgs struct { + Shard int + GID int +} + +type MoveReply struct { + WrongLeader bool + Err Err +} + +type QueryArgs struct { + Num int // desired config number +} + +type QueryReply struct { + WrongLeader bool + Err Err + Config Config +} diff --git a/src/shardctrler/config.go b/src/shardctrler/config.go new file mode 100644 index 0000000..ee984ec --- /dev/null +++ b/src/shardctrler/config.go @@ -0,0 +1,355 @@ +package shardctrler + +import "6.824/labrpc" +import "6.824/raft" +import "testing" +import "os" + +// import "log" +import crand "crypto/rand" +import "math/rand" +import "encoding/base64" +import "sync" +import "runtime" +import "time" + +func randstring(n int) string { + b := make([]byte, 2*n) + crand.Read(b) + s := base64.URLEncoding.EncodeToString(b) + return s[0:n] +} + +// Randomize server handles +func random_handles(kvh []*labrpc.ClientEnd) []*labrpc.ClientEnd { + sa := make([]*labrpc.ClientEnd, len(kvh)) + copy(sa, kvh) + for i := range sa { + j := rand.Intn(i + 1) + sa[i], sa[j] = sa[j], sa[i] + } + return sa +} + +type config struct { + mu sync.Mutex + t *testing.T + net *labrpc.Network + n int + servers []*ShardCtrler + saved []*raft.Persister + endnames [][]string // names of each server's sending ClientEnds + clerks map[*Clerk][]string + nextClientId int + start time.Time // time at which make_config() was called +} + +func (cfg *config) checkTimeout() { + // enforce a two minute real-time limit on each test + if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second { + cfg.t.Fatal("test took longer than 120 seconds") + } +} + +func (cfg *config) cleanup() { + cfg.mu.Lock() + defer cfg.mu.Unlock() + for i := 0; i < len(cfg.servers); i++ { + if cfg.servers[i] != nil { + cfg.servers[i].Kill() + } + } + cfg.net.Cleanup() + cfg.checkTimeout() +} + +// Maximum log size across all servers +func (cfg *config) LogSize() int { + logsize := 0 + for i := 0; i < cfg.n; i++ { + n := cfg.saved[i].RaftStateSize() + if n > logsize { + logsize = n + } + } + return logsize +} + +// attach server i to servers listed in to +// caller must hold cfg.mu +func (cfg *config) connectUnlocked(i int, to []int) { + // log.Printf("connect peer %d to %v\n", i, to) + + // outgoing socket files + for j := 0; j < len(to); j++ { + endname := cfg.endnames[i][to[j]] + cfg.net.Enable(endname, true) + } + + // incoming socket files + for j := 0; j < len(to); j++ { + endname := cfg.endnames[to[j]][i] + cfg.net.Enable(endname, true) + } +} + +func (cfg *config) connect(i int, to []int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + cfg.connectUnlocked(i, to) +} + +// detach server i from the servers listed in from +// caller must hold cfg.mu +func (cfg *config) disconnectUnlocked(i int, from []int) { + // log.Printf("disconnect peer %d from %v\n", i, from) + + // outgoing socket files + for j := 0; j < len(from); j++ { + if cfg.endnames[i] != nil { + endname := cfg.endnames[i][from[j]] + cfg.net.Enable(endname, false) + } + } + + // incoming socket files + for j := 0; j < len(from); j++ { + if cfg.endnames[j] != nil { + endname := cfg.endnames[from[j]][i] + cfg.net.Enable(endname, false) + } + } +} + +func (cfg *config) disconnect(i int, from []int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + cfg.disconnectUnlocked(i, from) +} + +func (cfg *config) All() []int { + all := make([]int, cfg.n) + for i := 0; i < cfg.n; i++ { + all[i] = i + } + return all +} + +func (cfg *config) ConnectAll() { + cfg.mu.Lock() + defer cfg.mu.Unlock() + for i := 0; i < cfg.n; i++ { + cfg.connectUnlocked(i, cfg.All()) + } +} + +// Sets up 2 partitions with connectivity between servers in each partition. +func (cfg *config) partition(p1 []int, p2 []int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + // log.Printf("partition servers into: %v %v\n", p1, p2) + for i := 0; i < len(p1); i++ { + cfg.disconnectUnlocked(p1[i], p2) + cfg.connectUnlocked(p1[i], p1) + } + for i := 0; i < len(p2); i++ { + cfg.disconnectUnlocked(p2[i], p1) + cfg.connectUnlocked(p2[i], p2) + } +} + +// Create a clerk with clerk specific server names. +// Give it connections to all of the servers, but for +// now enable only connections to servers in to[]. +func (cfg *config) makeClient(to []int) *Clerk { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + // a fresh set of ClientEnds. + ends := make([]*labrpc.ClientEnd, cfg.n) + endnames := make([]string, cfg.n) + for j := 0; j < cfg.n; j++ { + endnames[j] = randstring(20) + ends[j] = cfg.net.MakeEnd(endnames[j]) + cfg.net.Connect(endnames[j], j) + } + + ck := MakeClerk(random_handles(ends)) + cfg.clerks[ck] = endnames + cfg.nextClientId++ + cfg.ConnectClientUnlocked(ck, to) + return ck +} + +func (cfg *config) deleteClient(ck *Clerk) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + v := cfg.clerks[ck] + for i := 0; i < len(v); i++ { + os.Remove(v[i]) + } + delete(cfg.clerks, ck) +} + +// caller should hold cfg.mu +func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) { + // log.Printf("ConnectClient %v to %v\n", ck, to) + endnames := cfg.clerks[ck] + for j := 0; j < len(to); j++ { + s := endnames[to[j]] + cfg.net.Enable(s, true) + } +} + +func (cfg *config) ConnectClient(ck *Clerk, to []int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + cfg.ConnectClientUnlocked(ck, to) +} + +// caller should hold cfg.mu +func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) { + // log.Printf("DisconnectClient %v from %v\n", ck, from) + endnames := cfg.clerks[ck] + for j := 0; j < len(from); j++ { + s := endnames[from[j]] + cfg.net.Enable(s, false) + } +} + +func (cfg *config) DisconnectClient(ck *Clerk, from []int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + cfg.DisconnectClientUnlocked(ck, from) +} + +// Shutdown a server by isolating it +func (cfg *config) ShutdownServer(i int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + cfg.disconnectUnlocked(i, cfg.All()) + + // disable client connections to the server. + // it's important to do this before creating + // the new Persister in saved[i], to avoid + // the possibility of the server returning a + // positive reply to an Append but persisting + // the result in the superseded Persister. + cfg.net.DeleteServer(i) + + // a fresh persister, in case old instance + // continues to update the Persister. + // but copy old persister's content so that we always + // pass Make() the last persisted state. + if cfg.saved[i] != nil { + cfg.saved[i] = cfg.saved[i].Copy() + } + + kv := cfg.servers[i] + if kv != nil { + cfg.mu.Unlock() + kv.Kill() + cfg.mu.Lock() + cfg.servers[i] = nil + } +} + +// If restart servers, first call ShutdownServer +func (cfg *config) StartServer(i int) { + cfg.mu.Lock() + + // a fresh set of outgoing ClientEnd names. + cfg.endnames[i] = make([]string, cfg.n) + for j := 0; j < cfg.n; j++ { + cfg.endnames[i][j] = randstring(20) + } + + // a fresh set of ClientEnds. + ends := make([]*labrpc.ClientEnd, cfg.n) + for j := 0; j < cfg.n; j++ { + ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j]) + cfg.net.Connect(cfg.endnames[i][j], j) + } + + // a fresh persister, so old instance doesn't overwrite + // new instance's persisted state. + // give the fresh persister a copy of the old persister's + // state, so that the spec is that we pass StartKVServer() + // the last persisted state. + if cfg.saved[i] != nil { + cfg.saved[i] = cfg.saved[i].Copy() + } else { + cfg.saved[i] = raft.MakePersister() + } + + cfg.mu.Unlock() + + cfg.servers[i] = StartServer(ends, i, cfg.saved[i]) + + kvsvc := labrpc.MakeService(cfg.servers[i]) + rfsvc := labrpc.MakeService(cfg.servers[i].rf) + srv := labrpc.MakeServer() + srv.AddService(kvsvc) + srv.AddService(rfsvc) + cfg.net.AddServer(i, srv) +} + +func (cfg *config) Leader() (bool, int) { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + for i := 0; i < cfg.n; i++ { + _, is_leader := cfg.servers[i].rf.GetState() + if is_leader { + return true, i + } + } + return false, 0 +} + +// Partition servers into 2 groups and put current leader in minority +func (cfg *config) make_partition() ([]int, []int) { + _, l := cfg.Leader() + p1 := make([]int, cfg.n/2+1) + p2 := make([]int, cfg.n/2) + j := 0 + for i := 0; i < cfg.n; i++ { + if i != l { + if j < len(p1) { + p1[j] = i + } else { + p2[j-len(p1)] = i + } + j++ + } + } + p2[len(p2)-1] = l + return p1, p2 +} + +func make_config(t *testing.T, n int, unreliable bool) *config { + runtime.GOMAXPROCS(4) + cfg := &config{} + cfg.t = t + cfg.net = labrpc.MakeNetwork() + cfg.n = n + cfg.servers = make([]*ShardCtrler, cfg.n) + cfg.saved = make([]*raft.Persister, cfg.n) + cfg.endnames = make([][]string, cfg.n) + cfg.clerks = make(map[*Clerk][]string) + cfg.nextClientId = cfg.n + 1000 // client ids start 1000 above the highest serverid + cfg.start = time.Now() + + // create a full set of KV servers. + for i := 0; i < cfg.n; i++ { + cfg.StartServer(i) + } + + cfg.ConnectAll() + + cfg.net.Reliable(!unreliable) + + return cfg +} diff --git a/src/shardctrler/server.go b/src/shardctrler/server.go new file mode 100644 index 0000000..87a8e98 --- /dev/null +++ b/src/shardctrler/server.go @@ -0,0 +1,80 @@ +package shardctrler + + +import "6.824/raft" +import "6.824/labrpc" +import "sync" +import "6.824/labgob" + + +type ShardCtrler struct { + mu sync.Mutex + me int + rf *raft.Raft + applyCh chan raft.ApplyMsg + + // Your data here. + + configs []Config // indexed by config num +} + + +type Op struct { + // Your data here. +} + + +func (sc *ShardCtrler) Join(args *JoinArgs, reply *JoinReply) { + // Your code here. +} + +func (sc *ShardCtrler) Leave(args *LeaveArgs, reply *LeaveReply) { + // Your code here. +} + +func (sc *ShardCtrler) Move(args *MoveArgs, reply *MoveReply) { + // Your code here. +} + +func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) { + // Your code here. +} + + +// +// the tester calls Kill() when a ShardCtrler instance won't +// be needed again. you are not required to do anything +// in Kill(), but it might be convenient to (for example) +// turn off debug output from this instance. +// +func (sc *ShardCtrler) Kill() { + sc.rf.Kill() + // Your code here, if desired. +} + +// needed by shardkv tester +func (sc *ShardCtrler) Raft() *raft.Raft { + return sc.rf +} + +// +// servers[] contains the ports of the set of +// servers that will cooperate via Paxos to +// form the fault-tolerant shardctrler service. +// me is the index of the current server in servers[]. +// +func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardCtrler { + sc := new(ShardCtrler) + sc.me = me + + sc.configs = make([]Config, 1) + sc.configs[0].Groups = map[int][]string{} + + labgob.Register(Op{}) + sc.applyCh = make(chan raft.ApplyMsg) + sc.rf = raft.Make(servers, me, persister, sc.applyCh) + + // Your code here. + + return sc +} diff --git a/src/shardctrler/test_test.go b/src/shardctrler/test_test.go new file mode 100644 index 0000000..5c95d56 --- /dev/null +++ b/src/shardctrler/test_test.go @@ -0,0 +1,380 @@ +package shardctrler + +import ( + "sync" + "testing" +) + +// import "time" +import "fmt" + +func check(t *testing.T, groups []int, ck *Clerk) { + c := ck.Query(-1) + if len(c.Groups) != len(groups) { + t.Fatalf("wanted %v groups, got %v", len(groups), len(c.Groups)) + } + + // are the groups as expected? + for _, g := range groups { + _, ok := c.Groups[g] + if ok != true { + t.Fatalf("missing group %v", g) + } + } + + // any un-allocated shards? + if len(groups) > 0 { + for s, g := range c.Shards { + _, ok := c.Groups[g] + if ok == false { + t.Fatalf("shard %v -> invalid group %v", s, g) + } + } + } + + // more or less balanced sharding? + counts := map[int]int{} + for _, g := range c.Shards { + counts[g] += 1 + } + min := 257 + max := 0 + for g, _ := range c.Groups { + if counts[g] > max { + max = counts[g] + } + if counts[g] < min { + min = counts[g] + } + } + if max > min+1 { + t.Fatalf("max %v too much larger than min %v", max, min) + } +} + +func check_same_config(t *testing.T, c1 Config, c2 Config) { + if c1.Num != c2.Num { + t.Fatalf("Num wrong") + } + if c1.Shards != c2.Shards { + t.Fatalf("Shards wrong") + } + if len(c1.Groups) != len(c2.Groups) { + t.Fatalf("number of Groups is wrong") + } + for gid, sa := range c1.Groups { + sa1, ok := c2.Groups[gid] + if ok == false || len(sa1) != len(sa) { + t.Fatalf("len(Groups) wrong") + } + if ok && len(sa1) == len(sa) { + for j := 0; j < len(sa); j++ { + if sa[j] != sa1[j] { + t.Fatalf("Groups wrong") + } + } + } + } +} + +func TestBasic(t *testing.T) { + const nservers = 3 + cfg := make_config(t, nservers, false) + defer cfg.cleanup() + + ck := cfg.makeClient(cfg.All()) + + fmt.Printf("Test: Basic leave/join ...\n") + + cfa := make([]Config, 6) + cfa[0] = ck.Query(-1) + + check(t, []int{}, ck) + + var gid1 int = 1 + ck.Join(map[int][]string{gid1: []string{"x", "y", "z"}}) + check(t, []int{gid1}, ck) + cfa[1] = ck.Query(-1) + + var gid2 int = 2 + ck.Join(map[int][]string{gid2: []string{"a", "b", "c"}}) + check(t, []int{gid1, gid2}, ck) + cfa[2] = ck.Query(-1) + + cfx := ck.Query(-1) + sa1 := cfx.Groups[gid1] + if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { + t.Fatalf("wrong servers for gid %v: %v\n", gid1, sa1) + } + sa2 := cfx.Groups[gid2] + if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" { + t.Fatalf("wrong servers for gid %v: %v\n", gid2, sa2) + } + + ck.Leave([]int{gid1}) + check(t, []int{gid2}, ck) + cfa[4] = ck.Query(-1) + + ck.Leave([]int{gid2}) + cfa[5] = ck.Query(-1) + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Historical queries ...\n") + + for s := 0; s < nservers; s++ { + cfg.ShutdownServer(s) + for i := 0; i < len(cfa); i++ { + c := ck.Query(cfa[i].Num) + check_same_config(t, c, cfa[i]) + } + cfg.StartServer(s) + cfg.ConnectAll() + } + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Move ...\n") + { + var gid3 int = 503 + ck.Join(map[int][]string{gid3: []string{"3a", "3b", "3c"}}) + var gid4 int = 504 + ck.Join(map[int][]string{gid4: []string{"4a", "4b", "4c"}}) + for i := 0; i < NShards; i++ { + cf := ck.Query(-1) + if i < NShards/2 { + ck.Move(i, gid3) + if cf.Shards[i] != gid3 { + cf1 := ck.Query(-1) + if cf1.Num <= cf.Num { + t.Fatalf("Move should increase Config.Num") + } + } + } else { + ck.Move(i, gid4) + if cf.Shards[i] != gid4 { + cf1 := ck.Query(-1) + if cf1.Num <= cf.Num { + t.Fatalf("Move should increase Config.Num") + } + } + } + } + cf2 := ck.Query(-1) + for i := 0; i < NShards; i++ { + if i < NShards/2 { + if cf2.Shards[i] != gid3 { + t.Fatalf("expected shard %v on gid %v actually %v", + i, gid3, cf2.Shards[i]) + } + } else { + if cf2.Shards[i] != gid4 { + t.Fatalf("expected shard %v on gid %v actually %v", + i, gid4, cf2.Shards[i]) + } + } + } + ck.Leave([]int{gid3}) + ck.Leave([]int{gid4}) + } + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Concurrent leave/join ...\n") + + const npara = 10 + var cka [npara]*Clerk + for i := 0; i < len(cka); i++ { + cka[i] = cfg.makeClient(cfg.All()) + } + gids := make([]int, npara) + ch := make(chan bool) + for xi := 0; xi < npara; xi++ { + gids[xi] = int((xi * 10) + 100) + go func(i int) { + defer func() { ch <- true }() + var gid int = gids[i] + var sid1 = fmt.Sprintf("s%da", gid) + var sid2 = fmt.Sprintf("s%db", gid) + cka[i].Join(map[int][]string{gid + 1000: []string{sid1}}) + cka[i].Join(map[int][]string{gid: []string{sid2}}) + cka[i].Leave([]int{gid + 1000}) + }(xi) + } + for i := 0; i < npara; i++ { + <-ch + } + check(t, gids, ck) + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Minimal transfers after joins ...\n") + + c1 := ck.Query(-1) + for i := 0; i < 5; i++ { + var gid = int(npara + 1 + i) + ck.Join(map[int][]string{gid: []string{ + fmt.Sprintf("%da", gid), + fmt.Sprintf("%db", gid), + fmt.Sprintf("%db", gid)}}) + } + c2 := ck.Query(-1) + for i := int(1); i <= npara; i++ { + for j := 0; j < len(c1.Shards); j++ { + if c2.Shards[j] == i { + if c1.Shards[j] != i { + t.Fatalf("non-minimal transfer after Join()s") + } + } + } + } + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Minimal transfers after leaves ...\n") + + for i := 0; i < 5; i++ { + ck.Leave([]int{int(npara + 1 + i)}) + } + c3 := ck.Query(-1) + for i := int(1); i <= npara; i++ { + for j := 0; j < len(c1.Shards); j++ { + if c2.Shards[j] == i { + if c3.Shards[j] != i { + t.Fatalf("non-minimal transfer after Leave()s") + } + } + } + } + + fmt.Printf(" ... Passed\n") +} + +func TestMulti(t *testing.T) { + const nservers = 3 + cfg := make_config(t, nservers, false) + defer cfg.cleanup() + + ck := cfg.makeClient(cfg.All()) + + fmt.Printf("Test: Multi-group join/leave ...\n") + + cfa := make([]Config, 6) + cfa[0] = ck.Query(-1) + + check(t, []int{}, ck) + + var gid1 int = 1 + var gid2 int = 2 + ck.Join(map[int][]string{ + gid1: []string{"x", "y", "z"}, + gid2: []string{"a", "b", "c"}, + }) + check(t, []int{gid1, gid2}, ck) + cfa[1] = ck.Query(-1) + + var gid3 int = 3 + ck.Join(map[int][]string{gid3: []string{"j", "k", "l"}}) + check(t, []int{gid1, gid2, gid3}, ck) + cfa[2] = ck.Query(-1) + + cfx := ck.Query(-1) + sa1 := cfx.Groups[gid1] + if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { + t.Fatalf("wrong servers for gid %v: %v\n", gid1, sa1) + } + sa2 := cfx.Groups[gid2] + if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" { + t.Fatalf("wrong servers for gid %v: %v\n", gid2, sa2) + } + sa3 := cfx.Groups[gid3] + if len(sa3) != 3 || sa3[0] != "j" || sa3[1] != "k" || sa3[2] != "l" { + t.Fatalf("wrong servers for gid %v: %v\n", gid3, sa3) + } + + ck.Leave([]int{gid1, gid3}) + check(t, []int{gid2}, ck) + cfa[3] = ck.Query(-1) + + cfx = ck.Query(-1) + sa2 = cfx.Groups[gid2] + if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" { + t.Fatalf("wrong servers for gid %v: %v\n", gid2, sa2) + } + + ck.Leave([]int{gid2}) + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Concurrent multi leave/join ...\n") + + const npara = 10 + var cka [npara]*Clerk + for i := 0; i < len(cka); i++ { + cka[i] = cfg.makeClient(cfg.All()) + } + gids := make([]int, npara) + var wg sync.WaitGroup + for xi := 0; xi < npara; xi++ { + wg.Add(1) + gids[xi] = int(xi + 1000) + go func(i int) { + defer wg.Done() + var gid int = gids[i] + cka[i].Join(map[int][]string{ + gid: []string{ + fmt.Sprintf("%da", gid), + fmt.Sprintf("%db", gid), + fmt.Sprintf("%dc", gid)}, + gid + 1000: []string{fmt.Sprintf("%da", gid+1000)}, + gid + 2000: []string{fmt.Sprintf("%da", gid+2000)}, + }) + cka[i].Leave([]int{gid + 1000, gid + 2000}) + }(xi) + } + wg.Wait() + check(t, gids, ck) + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Minimal transfers after multijoins ...\n") + + c1 := ck.Query(-1) + m := make(map[int][]string) + for i := 0; i < 5; i++ { + var gid = npara + 1 + i + m[gid] = []string{fmt.Sprintf("%da", gid), fmt.Sprintf("%db", gid)} + } + ck.Join(m) + c2 := ck.Query(-1) + for i := int(1); i <= npara; i++ { + for j := 0; j < len(c1.Shards); j++ { + if c2.Shards[j] == i { + if c1.Shards[j] != i { + t.Fatalf("non-minimal transfer after Join()s") + } + } + } + } + + fmt.Printf(" ... Passed\n") + + fmt.Printf("Test: Minimal transfers after multileaves ...\n") + + var l []int + for i := 0; i < 5; i++ { + l = append(l, npara+1+i) + } + ck.Leave(l) + c3 := ck.Query(-1) + for i := int(1); i <= npara; i++ { + for j := 0; j < len(c1.Shards); j++ { + if c2.Shards[j] == i { + if c3.Shards[j] != i { + t.Fatalf("non-minimal transfer after Leave()s") + } + } + } + } + + fmt.Printf(" ... Passed\n") +} diff --git a/src/shardmaster/mkclass.sh b/src/shardmaster/mkclass.sh deleted file mode 100644 index 6bf5f4e..0000000 --- a/src/shardmaster/mkclass.sh +++ /dev/null @@ -1,82 +0,0 @@ -#!/bin/sh - -# -# create or update 6.824-golabs-YEAR -# -# if a directory has a file mkclass.ignore, ignore the files listed in that file - -umask 2 - -# update this repository: -CLASSREPO=git@g.csail.mit.edu:6.824-golabs-2021 - -# include only these src/ directories (e.g. not paxos) -SRCS="mr mrapps main labrpc raft kvraft shardkv shardmaster labgob porcupine models" - -SD=$(cd $(dirname $0)/.. && /bin/pwd) -CD=/tmp/golabs.$$ -git clone $CLASSREPO $CD || exit 1 - -mkdir -p $CD/src/ - -cp $SD/mygo/Makefile $CD/Makefile -(cd $CD/ && git add Makefile 2> /dev/null ) - -cp $SD/mygo/.check-build $CD/.check-build -(cd $CD/ && git add .check-build 2> /dev/null ) - -cp $SD/mygo/.gitignore $CD/.gitignore -(cd $CD/ && git add .gitignore 2> /dev/null ) - -cp $SD/mygo/src/.gitignore $CD/src/.gitignore || exit 1 -(cd $CD/src && git add .gitignore) - -cp $SD/mygo/src/go.mod $CD/src/go.mod -(cd $CD/ && git add src/go.mod 2> /dev/null ) - -cp $SD/mygo/src/go.sum $CD/src/go.sum -(cd $CD/ && git add src/go.sum 2> /dev/null ) - -# for D in `(cd $SD/mygo/src ; ls)` -for D in `echo $SRCS` -do - mkdir -p $CD/src/$D || exit 1 - (cd $SD/mygo/src/$D - for F in `ls` - do - if [ -s mkclass.ignore ] - then - if grep -q $F mkclass.ignore - then - I=1 - else - I=0 - fi - else - I=0 - fi - if [ "$F" = "out" ] - then - I=1 - fi - if [ "$F" = "mkclass.ignore" ] - then - I=1 - fi - if echo $F | grep -q '~$' - then - I=1 - fi - if [ $I -eq 1 ] - then - echo "ignore $F" - else - $SD/bin/mklab.pl $CD/src/$D $F - (cd $CD/src/$D && git add $F 2> /dev/null ) - fi - done) -done - -(cd $CD ; git commit -am 'update') - -echo "Now, examine and push the repo in $CD" diff --git a/src/shardmaster/mklab.pl b/src/shardmaster/mklab.pl deleted file mode 100644 index 7e74792..0000000 --- a/src/shardmaster/mklab.pl +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/perl -w - -# -# ./mklab output-directory files... -# -# strips out lines from -# // SOL -# to -# // END -# -# eliminates DELETE -# e.g. func DELETELock(...) -# - -use strict; -use File::Copy qw(copy); - -sub usage { - print STDERR "Usage: mklab.pl output-directory files...\n"; - exit(1); -} - -sub onefile { - my($infile, $outfile) = @_; - - if ($infile !~ /\.go$/){ - print STDERR "copying non-Go $infile without SOL parsing\n"; - die "cannot copy $infile > $outfile" if !copy($infile, $outfile); - return; - } - - die "cannot read $infile" if !open(IF, $infile); - my $out = ""; - - my $omit = 0; - while(){ - s/DELETE//g; - if(/ SOL$/){ - if($omit == 0){ - $omit = 1; - } else { - die "nested SOL"; - } - } elsif(/SOL/){ - die "malformed SOL"; - } elsif(/ END$/){ - if($omit == 1){ - $omit = 0; - } else { - die "extra END in $infile"; - } - } elsif(/END/){ - die "malformed END"; - } elsif($omit == 0){ - $out .= $_; - } - } - close(IF); - - if($omit){ - die "SOL without END"; - } - - die "cannot write $outfile" if !open(OF, ">$outfile"); - print OF $out; - close(OF); -} - -sub main { - if($#ARGV < 1){ - usage(); - } - - my $dir = $ARGV[0]; - - system("mkdir -p $dir"); - if(! -d $dir){ - print STDERR "cannot create directory $dir\n"; - exit(1) - } - - for(my $i = 1; $i <= $#ARGV; $i++){ - my $f = $ARGV[$i]; - onefile($f, $dir . "/" . $f); - } -} - -main(); -exit(0);