This commit is contained in:
Lily Tsai
2021-02-23 12:07:54 -08:00
parent e0e726d4a4
commit bc1e228980

View File

@@ -127,8 +127,9 @@ func (cfg *config) crash1(i int) {
if cfg.saved[i] != nil { if cfg.saved[i] != nil {
raftlog := cfg.saved[i].ReadRaftState() raftlog := cfg.saved[i].ReadRaftState()
snapshot := cfg.saved[i].ReadSnapshot()
cfg.saved[i] = &Persister{} cfg.saved[i] = &Persister{}
cfg.saved[i].SaveRaftState(raftlog) cfg.saved[i].SaveStateAndSnapshot(raftlog, snapshot)
} }
} }
@@ -178,8 +179,11 @@ const SnapShotInterval = 10
// periodically snapshot raft state // periodically snapshot raft state
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
lastApplied := 0
for m := range applyCh { for m := range applyCh {
if m.SnapshotValid { if m.SnapshotValid {
//DPrintf("Installsnapshot %v %v\n", m.SnapshotIndex, lastApplied)
cfg.mu.Lock()
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm, if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
m.SnapshotIndex, m.Snapshot) { m.SnapshotIndex, m.Snapshot) {
cfg.logs[i] = make(map[int]interface{}) cfg.logs[i] = make(map[int]interface{})
@@ -190,8 +194,11 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
log.Fatalf("decode error\n") log.Fatalf("decode error\n")
} }
cfg.logs[i][m.SnapshotIndex] = v cfg.logs[i][m.SnapshotIndex] = v
lastApplied = m.SnapshotIndex
} }
} else if m.CommandValid { cfg.mu.Unlock()
} else if m.CommandValid && m.CommandIndex > lastApplied {
//DPrintf("apply %v lastApplied %v\n", m.CommandIndex, lastApplied)
cfg.mu.Lock() cfg.mu.Lock()
err_msg, prevok := cfg.checkLogs(i, m) err_msg, prevok := cfg.checkLogs(i, m)
cfg.mu.Unlock() cfg.mu.Unlock()
@@ -204,6 +211,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
// keep reading after error so that Raft doesn't block // keep reading after error so that Raft doesn't block
// holding locks... // holding locks...
} }
lastApplied = m.CommandIndex
if (m.CommandIndex+1)%SnapShotInterval == 0 { if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer) w := new(bytes.Buffer)
e := labgob.NewEncoder(w) e := labgob.NewEncoder(w)
@@ -212,7 +220,12 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes()) cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
} }
} else { } else {
// ignore other types of ApplyMsg // Ignore other types of ApplyMsg or old
// commands. Old command may never happen,
// depending on the Raft implementation, but
// just in case.
// DPrintf("Ignore: Index %v lastApplied %v\n", m.CommandIndex, lastApplied)
} }
} }
} }