diff --git a/src/raft/raft.go b/src/raft/raft.go index 29ea521..17ab1a3 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -18,26 +18,32 @@ package raft // import ( -// "bytes" + "bytes" + "math/rand" "sync" "sync/atomic" + "time" -// "6.824/labgob" + "6.824/labgob" "6.824/labrpc" ) +// Server states +const ( + Follower = 0 + Candidate = 1 + Leader = 2 +) -// -// as each Raft peer becomes aware that successive log entries are -// committed, the peer should send an ApplyMsg to the service (or -// tester) on the same server, via the applyCh passed to Make(). set -// CommandValid to true to indicate that the ApplyMsg contains a newly -// committed log entry. -// -// in part 2D you'll want to send other kinds of messages (e.g., -// snapshots) on the applyCh, but set CommandValid to false for these -// other uses. -// +// Timing constants +const ( + HeartbeatInterval = 100 * time.Millisecond + ElectionTimeoutMin = 300 + ElectionTimeoutMax = 500 +) + +// ApplyMsg - as each Raft peer becomes aware that successive log entries are +// committed, the peer should send an ApplyMsg to the service (or tester) type ApplyMsg struct { CommandValid bool Command interface{} @@ -50,9 +56,13 @@ type ApplyMsg struct { SnapshotIndex int } -// -// A Go object implementing a single Raft peer. -// +// LogEntry represents a log entry +type LogEntry struct { + Term int + Command interface{} +} + +// Raft - A Go object implementing a single Raft peer. type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers @@ -60,180 +70,401 @@ type Raft struct { me int // this peer's index into peers[] dead int32 // set by Kill() - // Your data here (2A, 2B, 2C). - // Look at the paper's Figure 2 for a description of what - // state a Raft server must maintain. + // Persistent state on all servers (Figure 2) + currentTerm int + votedFor int + log []LogEntry + // Volatile state on all servers + commitIndex int + lastApplied int + + // Volatile state on leaders + nextIndex []int + matchIndex []int + + // Additional state + state int + lastHeartbeat time.Time + applyCh chan ApplyMsg + applyCond *sync.Cond + + // Snapshot state (2D) + lastIncludedIndex int + lastIncludedTerm int } -// return currentTerm and whether this server -// believes it is the leader. +// GetState returns currentTerm and whether this server believes it is the leader. func (rf *Raft) GetState() (int, bool) { - - var term int - var isleader bool - // Your code here (2A). - return term, isleader + rf.mu.Lock() + defer rf.mu.Unlock() + return rf.currentTerm, rf.state == Leader } -// -// save Raft's persistent state to stable storage, -// where it can later be retrieved after a crash and restart. -// see paper's Figure 2 for a description of what should be persistent. -// +// getLastLogIndex returns the index of the last log entry +func (rf *Raft) getLastLogIndex() int { + return rf.lastIncludedIndex + len(rf.log) +} + +// getLastLogTerm returns the term of the last log entry +func (rf *Raft) getLastLogTerm() int { + if len(rf.log) == 0 { + return rf.lastIncludedTerm + } + return rf.log[len(rf.log)-1].Term +} + +// getLogEntry returns the log entry at the given index +func (rf *Raft) getLogEntry(index int) LogEntry { + return rf.log[index-rf.lastIncludedIndex-1] +} + +// getLogTerm returns the term of the log entry at the given index +func (rf *Raft) getLogTerm(index int) int { + if index == rf.lastIncludedIndex { + return rf.lastIncludedTerm + } + return rf.log[index-rf.lastIncludedIndex-1].Term +} + +// persist - save Raft's persistent state to stable storage func (rf *Raft) persist() { - // Your code here (2C). - // Example: - // w := new(bytes.Buffer) - // e := labgob.NewEncoder(w) - // e.Encode(rf.xxx) - // e.Encode(rf.yyy) - // data := w.Bytes() - // rf.persister.SaveRaftState(data) + w := new(bytes.Buffer) + e := labgob.NewEncoder(w) + e.Encode(rf.currentTerm) + e.Encode(rf.votedFor) + e.Encode(rf.log) + e.Encode(rf.lastIncludedIndex) + e.Encode(rf.lastIncludedTerm) + data := w.Bytes() + rf.persister.SaveRaftState(data) } - -// -// restore previously persisted state. -// +// readPersist - restore previously persisted state func (rf *Raft) readPersist(data []byte) { - if data == nil || len(data) < 1 { // bootstrap without any state? + if data == nil || len(data) < 1 { return } - // Your code here (2C). - // Example: - // r := bytes.NewBuffer(data) - // d := labgob.NewDecoder(r) - // var xxx - // var yyy - // if d.Decode(&xxx) != nil || - // d.Decode(&yyy) != nil { - // error... - // } else { - // rf.xxx = xxx - // rf.yyy = yyy - // } + r := bytes.NewBuffer(data) + d := labgob.NewDecoder(r) + var currentTerm int + var votedFor int + var log []LogEntry + var lastIncludedIndex int + var lastIncludedTerm int + if d.Decode(¤tTerm) != nil || + d.Decode(&votedFor) != nil || + d.Decode(&log) != nil || + d.Decode(&lastIncludedIndex) != nil || + d.Decode(&lastIncludedTerm) != nil { + return + } + rf.currentTerm = currentTerm + rf.votedFor = votedFor + rf.log = log + rf.lastIncludedIndex = lastIncludedIndex + rf.lastIncludedTerm = lastIncludedTerm + rf.lastApplied = lastIncludedIndex + rf.commitIndex = lastIncludedIndex } - -// -// A service wants to switch to snapshot. Only do so if Raft hasn't -// have more recent info since it communicate the snapshot on applyCh. -// +// CondInstallSnapshot - A service wants to switch to snapshot func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool { - - // Your code here (2D). - + // Deprecated in newer version, always return true return true } -// the service says it has created a snapshot that has -// all info up to and including index. this means the -// service no longer needs the log through (and including) -// that index. Raft should now trim its log as much as possible. +// Snapshot - the service says it has created a snapshot func (rf *Raft) Snapshot(index int, snapshot []byte) { - // Your code here (2D). + rf.mu.Lock() + defer rf.mu.Unlock() + if index <= rf.lastIncludedIndex || index > rf.getLastLogIndex() { + return + } + + // Get term BEFORE trimming log + newLastIncludedTerm := rf.getLogTerm(index) + + // Trim log - keep entries from index+1 onwards + rf.log = rf.log[index-rf.lastIncludedIndex:] + rf.lastIncludedTerm = newLastIncludedTerm + rf.lastIncludedIndex = index + + // Persist state and snapshot + w := new(bytes.Buffer) + e := labgob.NewEncoder(w) + e.Encode(rf.currentTerm) + e.Encode(rf.votedFor) + e.Encode(rf.log) + e.Encode(rf.lastIncludedIndex) + e.Encode(rf.lastIncludedTerm) + rf.persister.SaveStateAndSnapshot(w.Bytes(), snapshot) } +// InstallSnapshot RPC arguments +type InstallSnapshotArgs struct { + Term int + LeaderId int + LastIncludedIndex int + LastIncludedTerm int + Data []byte +} -// -// example RequestVote RPC arguments structure. -// field names must start with capital letters! -// +// InstallSnapshot RPC reply +type InstallSnapshotReply struct { + Term int +} + +// InstallSnapshot RPC handler +func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + + reply.Term = rf.currentTerm + + if args.Term < rf.currentTerm { + return + } + + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.state = Follower + rf.persist() + } + + rf.lastHeartbeat = time.Now() + + if args.LastIncludedIndex <= rf.lastIncludedIndex { + return + } + + // If existing log entry has same index and term as snapshot's last included entry, + // retain log entries following it + if args.LastIncludedIndex <= rf.getLastLogIndex() && + rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm { + rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:] + } else { + rf.log = make([]LogEntry, 0) + } + + rf.lastIncludedIndex = args.LastIncludedIndex + rf.lastIncludedTerm = args.LastIncludedTerm + + if rf.commitIndex < args.LastIncludedIndex { + rf.commitIndex = args.LastIncludedIndex + } + if rf.lastApplied < args.LastIncludedIndex { + rf.lastApplied = args.LastIncludedIndex + } + + // Save state and snapshot + w := new(bytes.Buffer) + e := labgob.NewEncoder(w) + e.Encode(rf.currentTerm) + e.Encode(rf.votedFor) + e.Encode(rf.log) + e.Encode(rf.lastIncludedIndex) + e.Encode(rf.lastIncludedTerm) + rf.persister.SaveStateAndSnapshot(w.Bytes(), args.Data) + + // Send snapshot to applyCh + rf.applyCh <- ApplyMsg{ + SnapshotValid: true, + Snapshot: args.Data, + SnapshotTerm: args.LastIncludedTerm, + SnapshotIndex: args.LastIncludedIndex, + } +} + +func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool { + ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply) + return ok +} + +// RequestVote RPC arguments structure type RequestVoteArgs struct { - // Your data here (2A, 2B). + Term int + CandidateId int + LastLogIndex int + LastLogTerm int } -// -// example RequestVote RPC reply structure. -// field names must start with capital letters! -// +// RequestVote RPC reply structure type RequestVoteReply struct { - // Your data here (2A). + Term int + VoteGranted bool } -// -// example RequestVote RPC handler. -// +// RequestVote RPC handler func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { - // Your code here (2A, 2B). + rf.mu.Lock() + defer rf.mu.Unlock() + + reply.Term = rf.currentTerm + reply.VoteGranted = false + + // Reply false if term < currentTerm + if args.Term < rf.currentTerm { + return + } + + // If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.state = Follower + rf.persist() + } + + reply.Term = rf.currentTerm + + // If votedFor is null or candidateId, and candidate's log is at least as up-to-date + if rf.votedFor == -1 || rf.votedFor == args.CandidateId { + // Check if candidate's log is at least as up-to-date as receiver's log + lastLogTerm := rf.getLastLogTerm() + lastLogIndex := rf.getLastLogIndex() + + if args.LastLogTerm > lastLogTerm || + (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) { + reply.VoteGranted = true + rf.votedFor = args.CandidateId + rf.lastHeartbeat = time.Now() + rf.persist() + } + } } -// -// example code to send a RequestVote RPC to a server. -// server is the index of the target server in rf.peers[]. -// expects RPC arguments in args. -// fills in *reply with RPC reply, so caller should -// pass &reply. -// the types of the args and reply passed to Call() must be -// the same as the types of the arguments declared in the -// handler function (including whether they are pointers). -// -// The labrpc package simulates a lossy network, in which servers -// may be unreachable, and in which requests and replies may be lost. -// Call() sends a request and waits for a reply. If a reply arrives -// within a timeout interval, Call() returns true; otherwise -// Call() returns false. Thus Call() may not return for a while. -// A false return can be caused by a dead server, a live server that -// can't be reached, a lost request, or a lost reply. -// -// Call() is guaranteed to return (perhaps after a delay) *except* if the -// handler function on the server side does not return. Thus there -// is no need to implement your own timeouts around Call(). -// -// look at the comments in ../labrpc/labrpc.go for more details. -// -// if you're having trouble getting RPC to work, check that you've -// capitalized all field names in structs passed over RPC, and -// that the caller passes the address of the reply struct with &, not -// the struct itself. -// func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) return ok } - -// -// the service using Raft (e.g. a k/v server) wants to start -// agreement on the next command to be appended to Raft's log. if this -// server isn't the leader, returns false. otherwise start the -// agreement and return immediately. there is no guarantee that this -// command will ever be committed to the Raft log, since the leader -// may fail or lose an election. even if the Raft instance has been killed, -// this function should return gracefully. -// -// the first return value is the index that the command will appear at -// if it's ever committed. the second return value is the current -// term. the third return value is true if this server believes it is -// the leader. -// -func (rf *Raft) Start(command interface{}) (int, int, bool) { - index := -1 - term := -1 - isLeader := true - - // Your code here (2B). - - - return index, term, isLeader +// AppendEntries RPC arguments +type AppendEntriesArgs struct { + Term int + LeaderId int + PrevLogIndex int + PrevLogTerm int + Entries []LogEntry + LeaderCommit int } -// -// the tester doesn't halt goroutines created by Raft after each test, -// but it does call the Kill() method. your code can use killed() to -// check whether Kill() has been called. the use of atomic avoids the -// need for a lock. -// -// the issue is that long-running goroutines use memory and may chew -// up CPU time, perhaps causing later tests to fail and generating -// confusing debug output. any goroutine with a long-running loop -// should call killed() to check whether it should stop. -// +// AppendEntries RPC reply +type AppendEntriesReply struct { + Term int + Success bool + // For fast backup + XTerm int // Term in the conflicting entry (if any) + XIndex int // Index of first entry with XTerm + XLen int // Log length +} + +// AppendEntries RPC handler +func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + + reply.Term = rf.currentTerm + reply.Success = false + + // Reply false if term < currentTerm + if args.Term < rf.currentTerm { + return + } + + // If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.persist() + } + + rf.state = Follower + rf.lastHeartbeat = time.Now() + + // Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm + if args.PrevLogIndex < rf.lastIncludedIndex { + reply.XLen = rf.getLastLogIndex() + 1 + return + } + + if args.PrevLogIndex > rf.getLastLogIndex() { + reply.XLen = rf.getLastLogIndex() + 1 + return + } + + if args.PrevLogIndex > rf.lastIncludedIndex && rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm { + reply.XTerm = rf.getLogTerm(args.PrevLogIndex) + // Find first index with XTerm + for i := rf.lastIncludedIndex + 1; i <= args.PrevLogIndex; i++ { + if rf.getLogTerm(i) == reply.XTerm { + reply.XIndex = i + break + } + } + return + } + + // Append any new entries not already in the log + for i, entry := range args.Entries { + index := args.PrevLogIndex + 1 + i + if index <= rf.lastIncludedIndex { + continue + } + if index <= rf.getLastLogIndex() { + if rf.getLogTerm(index) != entry.Term { + // Conflict: delete the existing entry and all that follow it + rf.log = rf.log[:index-rf.lastIncludedIndex-1] + rf.log = append(rf.log, entry) + } + } else { + rf.log = append(rf.log, entry) + } + } + rf.persist() + + reply.Success = true + + // If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) + if args.LeaderCommit > rf.commitIndex { + lastNewEntry := args.PrevLogIndex + len(args.Entries) + if args.LeaderCommit < lastNewEntry { + rf.commitIndex = args.LeaderCommit + } else { + rf.commitIndex = lastNewEntry + } + rf.applyCond.Signal() + } +} + +func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { + ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) + return ok +} + +// Start - the service using Raft wants to start agreement on the next command +func (rf *Raft) Start(command interface{}) (int, int, bool) { + rf.mu.Lock() + defer rf.mu.Unlock() + + if rf.state != Leader { + return -1, -1, false + } + + index := rf.getLastLogIndex() + 1 + term := rf.currentTerm + + rf.log = append(rf.log, LogEntry{Term: term, Command: command}) + rf.persist() + + return index, term, true +} + +// Kill - the tester doesn't halt goroutines created by Raft after each test func (rf *Raft) Kill() { atomic.StoreInt32(&rf.dead, 1) - // Your code here, if desired. } func (rf *Raft) killed() bool { @@ -241,29 +472,328 @@ func (rf *Raft) killed() bool { return z == 1 } -// The ticker go routine starts a new election if this peer hasn't received -// heartsbeats recently. +// ticker - starts a new election if this peer hasn't received heartbeats recently func (rf *Raft) ticker() { for rf.killed() == false { + rf.mu.Lock() + state := rf.state + rf.mu.Unlock() - // Your code here to check if a leader election should - // be started and to randomize sleeping time using - // time.Sleep(). + switch state { + case Follower, Candidate: + rf.checkElectionTimeout() + case Leader: + rf.sendHeartbeats() + } + time.Sleep(10 * time.Millisecond) } } -// -// the service or tester wants to create a Raft server. the ports -// of all the Raft servers (including this one) are in peers[]. this -// server's port is peers[me]. all the servers' peers[] arrays -// have the same order. persister is a place for this server to -// save its persistent state, and also initially holds the most -// recent saved state, if any. applyCh is a channel on which the -// tester or service expects Raft to send ApplyMsg messages. -// Make() must return quickly, so it should start goroutines -// for any long-running work. -// +// checkElectionTimeout checks if election timeout has elapsed +func (rf *Raft) checkElectionTimeout() { + timeout := time.Duration(ElectionTimeoutMin+rand.Intn(ElectionTimeoutMax-ElectionTimeoutMin)) * time.Millisecond + rf.mu.Lock() + elapsed := time.Since(rf.lastHeartbeat) + rf.mu.Unlock() + + if elapsed >= timeout { + rf.startElection() + } +} + +// startElection starts a new election +func (rf *Raft) startElection() { + rf.mu.Lock() + rf.state = Candidate + rf.currentTerm++ + rf.votedFor = rf.me + rf.lastHeartbeat = time.Now() + rf.persist() + + term := rf.currentTerm + lastLogIndex := rf.getLastLogIndex() + lastLogTerm := rf.getLastLogTerm() + rf.mu.Unlock() + + votes := 1 + finished := 1 + var mu sync.Mutex + cond := sync.NewCond(&mu) + + for i := range rf.peers { + if i == rf.me { + continue + } + go func(server int) { + args := RequestVoteArgs{ + Term: term, + CandidateId: rf.me, + LastLogIndex: lastLogIndex, + LastLogTerm: lastLogTerm, + } + reply := RequestVoteReply{} + + ok := rf.sendRequestVote(server, &args, &reply) + + mu.Lock() + defer mu.Unlock() + + if ok { + rf.mu.Lock() + if reply.Term > rf.currentTerm { + rf.currentTerm = reply.Term + rf.state = Follower + rf.votedFor = -1 + rf.persist() + } + rf.mu.Unlock() + + if reply.VoteGranted { + votes++ + } + } + finished++ + cond.Broadcast() + }(i) + } + + mu.Lock() + for votes <= len(rf.peers)/2 && finished < len(rf.peers) { + cond.Wait() + } + mu.Unlock() + + rf.mu.Lock() + defer rf.mu.Unlock() + + if rf.state != Candidate || rf.currentTerm != term { + return + } + + if votes > len(rf.peers)/2 { + rf.state = Leader + // Initialize nextIndex and matchIndex + for i := range rf.peers { + rf.nextIndex[i] = rf.getLastLogIndex() + 1 + rf.matchIndex[i] = 0 + } + rf.matchIndex[rf.me] = rf.getLastLogIndex() + } +} + +// sendHeartbeats sends heartbeats to all peers +func (rf *Raft) sendHeartbeats() { + rf.mu.Lock() + if rf.state != Leader { + rf.mu.Unlock() + return + } + rf.mu.Unlock() + + for i := range rf.peers { + if i == rf.me { + continue + } + go rf.sendAppendEntriesToPeer(i) + } + + time.Sleep(HeartbeatInterval) +} + +// sendAppendEntriesToPeer sends AppendEntries RPC to a peer +func (rf *Raft) sendAppendEntriesToPeer(server int) { + rf.mu.Lock() + if rf.state != Leader { + rf.mu.Unlock() + return + } + + // Check if we need to send snapshot + if rf.nextIndex[server] <= rf.lastIncludedIndex { + rf.mu.Unlock() + rf.sendSnapshotToPeer(server) + return + } + + prevLogIndex := rf.nextIndex[server] - 1 + prevLogTerm := 0 + if prevLogIndex == rf.lastIncludedIndex { + prevLogTerm = rf.lastIncludedTerm + } else if prevLogIndex > rf.lastIncludedIndex { + prevLogTerm = rf.getLogTerm(prevLogIndex) + } + + entries := make([]LogEntry, 0) + if rf.nextIndex[server] <= rf.getLastLogIndex() { + entries = append(entries, rf.log[rf.nextIndex[server]-rf.lastIncludedIndex-1:]...) + } + + args := AppendEntriesArgs{ + Term: rf.currentTerm, + LeaderId: rf.me, + PrevLogIndex: prevLogIndex, + PrevLogTerm: prevLogTerm, + Entries: entries, + LeaderCommit: rf.commitIndex, + } + term := rf.currentTerm + rf.mu.Unlock() + + reply := AppendEntriesReply{} + ok := rf.sendAppendEntries(server, &args, &reply) + + if !ok { + return + } + + rf.mu.Lock() + defer rf.mu.Unlock() + + if rf.state != Leader || rf.currentTerm != term { + return + } + + if reply.Term > rf.currentTerm { + rf.currentTerm = reply.Term + rf.state = Follower + rf.votedFor = -1 + rf.persist() + return + } + + if reply.Success { + newMatchIndex := args.PrevLogIndex + len(args.Entries) + if newMatchIndex > rf.matchIndex[server] { + rf.matchIndex[server] = newMatchIndex + } + rf.nextIndex[server] = rf.matchIndex[server] + 1 + rf.updateCommitIndex() + } else { + // Fast backup + if reply.XTerm != 0 { + // Find last entry with XTerm + found := false + for i := rf.getLastLogIndex(); i > rf.lastIncludedIndex; i-- { + if rf.getLogTerm(i) == reply.XTerm { + rf.nextIndex[server] = i + 1 + found = true + break + } + } + if !found { + rf.nextIndex[server] = reply.XIndex + } + } else { + rf.nextIndex[server] = reply.XLen + } + if rf.nextIndex[server] < 1 { + rf.nextIndex[server] = 1 + } + } +} + +// sendSnapshotToPeer sends InstallSnapshot RPC to a peer +func (rf *Raft) sendSnapshotToPeer(server int) { + rf.mu.Lock() + if rf.state != Leader { + rf.mu.Unlock() + return + } + + args := InstallSnapshotArgs{ + Term: rf.currentTerm, + LeaderId: rf.me, + LastIncludedIndex: rf.lastIncludedIndex, + LastIncludedTerm: rf.lastIncludedTerm, + Data: rf.persister.ReadSnapshot(), + } + term := rf.currentTerm + rf.mu.Unlock() + + reply := InstallSnapshotReply{} + ok := rf.sendInstallSnapshot(server, &args, &reply) + + if !ok { + return + } + + rf.mu.Lock() + defer rf.mu.Unlock() + + if rf.state != Leader || rf.currentTerm != term { + return + } + + if reply.Term > rf.currentTerm { + rf.currentTerm = reply.Term + rf.state = Follower + rf.votedFor = -1 + rf.persist() + return + } + + rf.nextIndex[server] = args.LastIncludedIndex + 1 + rf.matchIndex[server] = args.LastIncludedIndex +} + +// updateCommitIndex updates commitIndex based on matchIndex +func (rf *Raft) updateCommitIndex() { + // Find N such that N > commitIndex, a majority of matchIndex[i] >= N, + // and log[N].term == currentTerm + for n := rf.getLastLogIndex(); n > rf.commitIndex && n > rf.lastIncludedIndex; n-- { + if rf.getLogTerm(n) != rf.currentTerm { + continue + } + count := 1 + for i := range rf.peers { + if i != rf.me && rf.matchIndex[i] >= n { + count++ + } + } + if count > len(rf.peers)/2 { + rf.commitIndex = n + rf.applyCond.Signal() + break + } + } +} + +// applier applies committed log entries to the state machine +func (rf *Raft) applier() { + for rf.killed() == false { + rf.mu.Lock() + for rf.lastApplied >= rf.commitIndex { + rf.applyCond.Wait() + } + + commitIndex := rf.commitIndex + lastApplied := rf.lastApplied + entries := make([]LogEntry, 0) + for i := lastApplied + 1; i <= commitIndex; i++ { + if i > rf.lastIncludedIndex { + entries = append(entries, rf.getLogEntry(i)) + } + } + rf.mu.Unlock() + + for i, entry := range entries { + rf.applyCh <- ApplyMsg{ + CommandValid: true, + Command: entry.Command, + CommandIndex: lastApplied + 1 + i, + } + } + + rf.mu.Lock() + if rf.lastApplied < commitIndex { + rf.lastApplied = commitIndex + } + rf.mu.Unlock() + } +} + +// Make - the service or tester wants to create a Raft server func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} @@ -271,14 +801,33 @@ func Make(peers []*labrpc.ClientEnd, me int, rf.persister = persister rf.me = me - // Your initialization code here (2A, 2B, 2C). + // Initialize state + rf.currentTerm = 0 + rf.votedFor = -1 + rf.log = make([]LogEntry, 0) - // initialize from state persisted before a crash + rf.commitIndex = 0 + rf.lastApplied = 0 + + rf.nextIndex = make([]int, len(peers)) + rf.matchIndex = make([]int, len(peers)) + + rf.state = Follower + rf.lastHeartbeat = time.Now() + rf.applyCh = applyCh + rf.applyCond = sync.NewCond(&rf.mu) + + rf.lastIncludedIndex = 0 + rf.lastIncludedTerm = 0 + + // Initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) - // start ticker goroutine to start elections + // Start ticker goroutine to start elections go rf.ticker() + // Start applier goroutine + go rf.applier() return rf }