feat: complete Lab3 Raft implementation
- 2A: Leader Election with randomized timeout - 2B: Log Replication with fast backup optimization - 2C: Persistence for crash recovery - 2D: Snapshots for log compaction - All tests passed (2A, 2B, 2C, 2D)
This commit is contained in:
885
src/raft/raft.go
885
src/raft/raft.go
@@ -18,26 +18,32 @@ package raft
|
|||||||
//
|
//
|
||||||
|
|
||||||
import (
|
import (
|
||||||
// "bytes"
|
"bytes"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
// "6.824/labgob"
|
"6.824/labgob"
|
||||||
"6.824/labrpc"
|
"6.824/labrpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Server states
|
||||||
|
const (
|
||||||
|
Follower = 0
|
||||||
|
Candidate = 1
|
||||||
|
Leader = 2
|
||||||
|
)
|
||||||
|
|
||||||
//
|
// Timing constants
|
||||||
// as each Raft peer becomes aware that successive log entries are
|
const (
|
||||||
// committed, the peer should send an ApplyMsg to the service (or
|
HeartbeatInterval = 100 * time.Millisecond
|
||||||
// tester) on the same server, via the applyCh passed to Make(). set
|
ElectionTimeoutMin = 300
|
||||||
// CommandValid to true to indicate that the ApplyMsg contains a newly
|
ElectionTimeoutMax = 500
|
||||||
// committed log entry.
|
)
|
||||||
//
|
|
||||||
// in part 2D you'll want to send other kinds of messages (e.g.,
|
// ApplyMsg - as each Raft peer becomes aware that successive log entries are
|
||||||
// snapshots) on the applyCh, but set CommandValid to false for these
|
// committed, the peer should send an ApplyMsg to the service (or tester)
|
||||||
// other uses.
|
|
||||||
//
|
|
||||||
type ApplyMsg struct {
|
type ApplyMsg struct {
|
||||||
CommandValid bool
|
CommandValid bool
|
||||||
Command interface{}
|
Command interface{}
|
||||||
@@ -50,9 +56,13 @@ type ApplyMsg struct {
|
|||||||
SnapshotIndex int
|
SnapshotIndex int
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// LogEntry represents a log entry
|
||||||
// A Go object implementing a single Raft peer.
|
type LogEntry struct {
|
||||||
//
|
Term int
|
||||||
|
Command interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Raft - A Go object implementing a single Raft peer.
|
||||||
type Raft struct {
|
type Raft struct {
|
||||||
mu sync.Mutex // Lock to protect shared access to this peer's state
|
mu sync.Mutex // Lock to protect shared access to this peer's state
|
||||||
peers []*labrpc.ClientEnd // RPC end points of all peers
|
peers []*labrpc.ClientEnd // RPC end points of all peers
|
||||||
@@ -60,180 +70,401 @@ type Raft struct {
|
|||||||
me int // this peer's index into peers[]
|
me int // this peer's index into peers[]
|
||||||
dead int32 // set by Kill()
|
dead int32 // set by Kill()
|
||||||
|
|
||||||
// Your data here (2A, 2B, 2C).
|
// Persistent state on all servers (Figure 2)
|
||||||
// Look at the paper's Figure 2 for a description of what
|
currentTerm int
|
||||||
// state a Raft server must maintain.
|
votedFor int
|
||||||
|
log []LogEntry
|
||||||
|
|
||||||
|
// Volatile state on all servers
|
||||||
|
commitIndex int
|
||||||
|
lastApplied int
|
||||||
|
|
||||||
|
// Volatile state on leaders
|
||||||
|
nextIndex []int
|
||||||
|
matchIndex []int
|
||||||
|
|
||||||
|
// Additional state
|
||||||
|
state int
|
||||||
|
lastHeartbeat time.Time
|
||||||
|
applyCh chan ApplyMsg
|
||||||
|
applyCond *sync.Cond
|
||||||
|
|
||||||
|
// Snapshot state (2D)
|
||||||
|
lastIncludedIndex int
|
||||||
|
lastIncludedTerm int
|
||||||
}
|
}
|
||||||
|
|
||||||
// return currentTerm and whether this server
|
// GetState returns currentTerm and whether this server believes it is the leader.
|
||||||
// believes it is the leader.
|
|
||||||
func (rf *Raft) GetState() (int, bool) {
|
func (rf *Raft) GetState() (int, bool) {
|
||||||
|
rf.mu.Lock()
|
||||||
var term int
|
defer rf.mu.Unlock()
|
||||||
var isleader bool
|
return rf.currentTerm, rf.state == Leader
|
||||||
// Your code here (2A).
|
|
||||||
return term, isleader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// getLastLogIndex returns the index of the last log entry
|
||||||
// save Raft's persistent state to stable storage,
|
func (rf *Raft) getLastLogIndex() int {
|
||||||
// where it can later be retrieved after a crash and restart.
|
return rf.lastIncludedIndex + len(rf.log)
|
||||||
// see paper's Figure 2 for a description of what should be persistent.
|
}
|
||||||
//
|
|
||||||
|
// getLastLogTerm returns the term of the last log entry
|
||||||
|
func (rf *Raft) getLastLogTerm() int {
|
||||||
|
if len(rf.log) == 0 {
|
||||||
|
return rf.lastIncludedTerm
|
||||||
|
}
|
||||||
|
return rf.log[len(rf.log)-1].Term
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLogEntry returns the log entry at the given index
|
||||||
|
func (rf *Raft) getLogEntry(index int) LogEntry {
|
||||||
|
return rf.log[index-rf.lastIncludedIndex-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLogTerm returns the term of the log entry at the given index
|
||||||
|
func (rf *Raft) getLogTerm(index int) int {
|
||||||
|
if index == rf.lastIncludedIndex {
|
||||||
|
return rf.lastIncludedTerm
|
||||||
|
}
|
||||||
|
return rf.log[index-rf.lastIncludedIndex-1].Term
|
||||||
|
}
|
||||||
|
|
||||||
|
// persist - save Raft's persistent state to stable storage
|
||||||
func (rf *Raft) persist() {
|
func (rf *Raft) persist() {
|
||||||
// Your code here (2C).
|
w := new(bytes.Buffer)
|
||||||
// Example:
|
e := labgob.NewEncoder(w)
|
||||||
// w := new(bytes.Buffer)
|
e.Encode(rf.currentTerm)
|
||||||
// e := labgob.NewEncoder(w)
|
e.Encode(rf.votedFor)
|
||||||
// e.Encode(rf.xxx)
|
e.Encode(rf.log)
|
||||||
// e.Encode(rf.yyy)
|
e.Encode(rf.lastIncludedIndex)
|
||||||
// data := w.Bytes()
|
e.Encode(rf.lastIncludedTerm)
|
||||||
// rf.persister.SaveRaftState(data)
|
data := w.Bytes()
|
||||||
|
rf.persister.SaveRaftState(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readPersist - restore previously persisted state
|
||||||
//
|
|
||||||
// restore previously persisted state.
|
|
||||||
//
|
|
||||||
func (rf *Raft) readPersist(data []byte) {
|
func (rf *Raft) readPersist(data []byte) {
|
||||||
if data == nil || len(data) < 1 { // bootstrap without any state?
|
if data == nil || len(data) < 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Your code here (2C).
|
r := bytes.NewBuffer(data)
|
||||||
// Example:
|
d := labgob.NewDecoder(r)
|
||||||
// r := bytes.NewBuffer(data)
|
var currentTerm int
|
||||||
// d := labgob.NewDecoder(r)
|
var votedFor int
|
||||||
// var xxx
|
var log []LogEntry
|
||||||
// var yyy
|
var lastIncludedIndex int
|
||||||
// if d.Decode(&xxx) != nil ||
|
var lastIncludedTerm int
|
||||||
// d.Decode(&yyy) != nil {
|
if d.Decode(¤tTerm) != nil ||
|
||||||
// error...
|
d.Decode(&votedFor) != nil ||
|
||||||
// } else {
|
d.Decode(&log) != nil ||
|
||||||
// rf.xxx = xxx
|
d.Decode(&lastIncludedIndex) != nil ||
|
||||||
// rf.yyy = yyy
|
d.Decode(&lastIncludedTerm) != nil {
|
||||||
// }
|
return
|
||||||
|
}
|
||||||
|
rf.currentTerm = currentTerm
|
||||||
|
rf.votedFor = votedFor
|
||||||
|
rf.log = log
|
||||||
|
rf.lastIncludedIndex = lastIncludedIndex
|
||||||
|
rf.lastIncludedTerm = lastIncludedTerm
|
||||||
|
rf.lastApplied = lastIncludedIndex
|
||||||
|
rf.commitIndex = lastIncludedIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CondInstallSnapshot - A service wants to switch to snapshot
|
||||||
//
|
|
||||||
// A service wants to switch to snapshot. Only do so if Raft hasn't
|
|
||||||
// have more recent info since it communicate the snapshot on applyCh.
|
|
||||||
//
|
|
||||||
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
|
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
|
||||||
|
// Deprecated in newer version, always return true
|
||||||
// Your code here (2D).
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// the service says it has created a snapshot that has
|
// Snapshot - the service says it has created a snapshot
|
||||||
// all info up to and including index. this means the
|
|
||||||
// service no longer needs the log through (and including)
|
|
||||||
// that index. Raft should now trim its log as much as possible.
|
|
||||||
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
||||||
// Your code here (2D).
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
if index <= rf.lastIncludedIndex || index > rf.getLastLogIndex() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get term BEFORE trimming log
|
||||||
|
newLastIncludedTerm := rf.getLogTerm(index)
|
||||||
|
|
||||||
|
// Trim log - keep entries from index+1 onwards
|
||||||
|
rf.log = rf.log[index-rf.lastIncludedIndex:]
|
||||||
|
rf.lastIncludedTerm = newLastIncludedTerm
|
||||||
|
rf.lastIncludedIndex = index
|
||||||
|
|
||||||
|
// Persist state and snapshot
|
||||||
|
w := new(bytes.Buffer)
|
||||||
|
e := labgob.NewEncoder(w)
|
||||||
|
e.Encode(rf.currentTerm)
|
||||||
|
e.Encode(rf.votedFor)
|
||||||
|
e.Encode(rf.log)
|
||||||
|
e.Encode(rf.lastIncludedIndex)
|
||||||
|
e.Encode(rf.lastIncludedTerm)
|
||||||
|
rf.persister.SaveStateAndSnapshot(w.Bytes(), snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InstallSnapshot RPC arguments
|
||||||
|
type InstallSnapshotArgs struct {
|
||||||
|
Term int
|
||||||
|
LeaderId int
|
||||||
|
LastIncludedIndex int
|
||||||
|
LastIncludedTerm int
|
||||||
|
Data []byte
|
||||||
|
}
|
||||||
|
|
||||||
//
|
// InstallSnapshot RPC reply
|
||||||
// example RequestVote RPC arguments structure.
|
type InstallSnapshotReply struct {
|
||||||
// field names must start with capital letters!
|
Term int
|
||||||
//
|
}
|
||||||
|
|
||||||
|
// InstallSnapshot RPC handler
|
||||||
|
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
|
||||||
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
reply.Term = rf.currentTerm
|
||||||
|
|
||||||
|
if args.Term < rf.currentTerm {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if args.Term > rf.currentTerm {
|
||||||
|
rf.currentTerm = args.Term
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.state = Follower
|
||||||
|
rf.persist()
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.lastHeartbeat = time.Now()
|
||||||
|
|
||||||
|
if args.LastIncludedIndex <= rf.lastIncludedIndex {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If existing log entry has same index and term as snapshot's last included entry,
|
||||||
|
// retain log entries following it
|
||||||
|
if args.LastIncludedIndex <= rf.getLastLogIndex() &&
|
||||||
|
rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm {
|
||||||
|
rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:]
|
||||||
|
} else {
|
||||||
|
rf.log = make([]LogEntry, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.lastIncludedIndex = args.LastIncludedIndex
|
||||||
|
rf.lastIncludedTerm = args.LastIncludedTerm
|
||||||
|
|
||||||
|
if rf.commitIndex < args.LastIncludedIndex {
|
||||||
|
rf.commitIndex = args.LastIncludedIndex
|
||||||
|
}
|
||||||
|
if rf.lastApplied < args.LastIncludedIndex {
|
||||||
|
rf.lastApplied = args.LastIncludedIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save state and snapshot
|
||||||
|
w := new(bytes.Buffer)
|
||||||
|
e := labgob.NewEncoder(w)
|
||||||
|
e.Encode(rf.currentTerm)
|
||||||
|
e.Encode(rf.votedFor)
|
||||||
|
e.Encode(rf.log)
|
||||||
|
e.Encode(rf.lastIncludedIndex)
|
||||||
|
e.Encode(rf.lastIncludedTerm)
|
||||||
|
rf.persister.SaveStateAndSnapshot(w.Bytes(), args.Data)
|
||||||
|
|
||||||
|
// Send snapshot to applyCh
|
||||||
|
rf.applyCh <- ApplyMsg{
|
||||||
|
SnapshotValid: true,
|
||||||
|
Snapshot: args.Data,
|
||||||
|
SnapshotTerm: args.LastIncludedTerm,
|
||||||
|
SnapshotIndex: args.LastIncludedIndex,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
|
||||||
|
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestVote RPC arguments structure
|
||||||
type RequestVoteArgs struct {
|
type RequestVoteArgs struct {
|
||||||
// Your data here (2A, 2B).
|
Term int
|
||||||
|
CandidateId int
|
||||||
|
LastLogIndex int
|
||||||
|
LastLogTerm int
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// RequestVote RPC reply structure
|
||||||
// example RequestVote RPC reply structure.
|
|
||||||
// field names must start with capital letters!
|
|
||||||
//
|
|
||||||
type RequestVoteReply struct {
|
type RequestVoteReply struct {
|
||||||
// Your data here (2A).
|
Term int
|
||||||
|
VoteGranted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// RequestVote RPC handler
|
||||||
// example RequestVote RPC handler.
|
|
||||||
//
|
|
||||||
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
|
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
|
||||||
// Your code here (2A, 2B).
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
reply.Term = rf.currentTerm
|
||||||
|
reply.VoteGranted = false
|
||||||
|
|
||||||
|
// Reply false if term < currentTerm
|
||||||
|
if args.Term < rf.currentTerm {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower
|
||||||
|
if args.Term > rf.currentTerm {
|
||||||
|
rf.currentTerm = args.Term
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.state = Follower
|
||||||
|
rf.persist()
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Term = rf.currentTerm
|
||||||
|
|
||||||
|
// If votedFor is null or candidateId, and candidate's log is at least as up-to-date
|
||||||
|
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
|
||||||
|
// Check if candidate's log is at least as up-to-date as receiver's log
|
||||||
|
lastLogTerm := rf.getLastLogTerm()
|
||||||
|
lastLogIndex := rf.getLastLogIndex()
|
||||||
|
|
||||||
|
if args.LastLogTerm > lastLogTerm ||
|
||||||
|
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) {
|
||||||
|
reply.VoteGranted = true
|
||||||
|
rf.votedFor = args.CandidateId
|
||||||
|
rf.lastHeartbeat = time.Now()
|
||||||
|
rf.persist()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// example code to send a RequestVote RPC to a server.
|
|
||||||
// server is the index of the target server in rf.peers[].
|
|
||||||
// expects RPC arguments in args.
|
|
||||||
// fills in *reply with RPC reply, so caller should
|
|
||||||
// pass &reply.
|
|
||||||
// the types of the args and reply passed to Call() must be
|
|
||||||
// the same as the types of the arguments declared in the
|
|
||||||
// handler function (including whether they are pointers).
|
|
||||||
//
|
|
||||||
// The labrpc package simulates a lossy network, in which servers
|
|
||||||
// may be unreachable, and in which requests and replies may be lost.
|
|
||||||
// Call() sends a request and waits for a reply. If a reply arrives
|
|
||||||
// within a timeout interval, Call() returns true; otherwise
|
|
||||||
// Call() returns false. Thus Call() may not return for a while.
|
|
||||||
// A false return can be caused by a dead server, a live server that
|
|
||||||
// can't be reached, a lost request, or a lost reply.
|
|
||||||
//
|
|
||||||
// Call() is guaranteed to return (perhaps after a delay) *except* if the
|
|
||||||
// handler function on the server side does not return. Thus there
|
|
||||||
// is no need to implement your own timeouts around Call().
|
|
||||||
//
|
|
||||||
// look at the comments in ../labrpc/labrpc.go for more details.
|
|
||||||
//
|
|
||||||
// if you're having trouble getting RPC to work, check that you've
|
|
||||||
// capitalized all field names in structs passed over RPC, and
|
|
||||||
// that the caller passes the address of the reply struct with &, not
|
|
||||||
// the struct itself.
|
|
||||||
//
|
|
||||||
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
|
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
|
||||||
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
|
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AppendEntries RPC arguments
|
||||||
//
|
type AppendEntriesArgs struct {
|
||||||
// the service using Raft (e.g. a k/v server) wants to start
|
Term int
|
||||||
// agreement on the next command to be appended to Raft's log. if this
|
LeaderId int
|
||||||
// server isn't the leader, returns false. otherwise start the
|
PrevLogIndex int
|
||||||
// agreement and return immediately. there is no guarantee that this
|
PrevLogTerm int
|
||||||
// command will ever be committed to the Raft log, since the leader
|
Entries []LogEntry
|
||||||
// may fail or lose an election. even if the Raft instance has been killed,
|
LeaderCommit int
|
||||||
// this function should return gracefully.
|
|
||||||
//
|
|
||||||
// the first return value is the index that the command will appear at
|
|
||||||
// if it's ever committed. the second return value is the current
|
|
||||||
// term. the third return value is true if this server believes it is
|
|
||||||
// the leader.
|
|
||||||
//
|
|
||||||
func (rf *Raft) Start(command interface{}) (int, int, bool) {
|
|
||||||
index := -1
|
|
||||||
term := -1
|
|
||||||
isLeader := true
|
|
||||||
|
|
||||||
// Your code here (2B).
|
|
||||||
|
|
||||||
|
|
||||||
return index, term, isLeader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// AppendEntries RPC reply
|
||||||
// the tester doesn't halt goroutines created by Raft after each test,
|
type AppendEntriesReply struct {
|
||||||
// but it does call the Kill() method. your code can use killed() to
|
Term int
|
||||||
// check whether Kill() has been called. the use of atomic avoids the
|
Success bool
|
||||||
// need for a lock.
|
// For fast backup
|
||||||
//
|
XTerm int // Term in the conflicting entry (if any)
|
||||||
// the issue is that long-running goroutines use memory and may chew
|
XIndex int // Index of first entry with XTerm
|
||||||
// up CPU time, perhaps causing later tests to fail and generating
|
XLen int // Log length
|
||||||
// confusing debug output. any goroutine with a long-running loop
|
}
|
||||||
// should call killed() to check whether it should stop.
|
|
||||||
//
|
// AppendEntries RPC handler
|
||||||
|
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
|
||||||
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
reply.Term = rf.currentTerm
|
||||||
|
reply.Success = false
|
||||||
|
|
||||||
|
// Reply false if term < currentTerm
|
||||||
|
if args.Term < rf.currentTerm {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If RPC request contains term T > currentTerm: set currentTerm = T, convert to follower
|
||||||
|
if args.Term > rf.currentTerm {
|
||||||
|
rf.currentTerm = args.Term
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.persist()
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.state = Follower
|
||||||
|
rf.lastHeartbeat = time.Now()
|
||||||
|
|
||||||
|
// Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm
|
||||||
|
if args.PrevLogIndex < rf.lastIncludedIndex {
|
||||||
|
reply.XLen = rf.getLastLogIndex() + 1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if args.PrevLogIndex > rf.getLastLogIndex() {
|
||||||
|
reply.XLen = rf.getLastLogIndex() + 1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if args.PrevLogIndex > rf.lastIncludedIndex && rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm {
|
||||||
|
reply.XTerm = rf.getLogTerm(args.PrevLogIndex)
|
||||||
|
// Find first index with XTerm
|
||||||
|
for i := rf.lastIncludedIndex + 1; i <= args.PrevLogIndex; i++ {
|
||||||
|
if rf.getLogTerm(i) == reply.XTerm {
|
||||||
|
reply.XIndex = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append any new entries not already in the log
|
||||||
|
for i, entry := range args.Entries {
|
||||||
|
index := args.PrevLogIndex + 1 + i
|
||||||
|
if index <= rf.lastIncludedIndex {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if index <= rf.getLastLogIndex() {
|
||||||
|
if rf.getLogTerm(index) != entry.Term {
|
||||||
|
// Conflict: delete the existing entry and all that follow it
|
||||||
|
rf.log = rf.log[:index-rf.lastIncludedIndex-1]
|
||||||
|
rf.log = append(rf.log, entry)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rf.log = append(rf.log, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rf.persist()
|
||||||
|
|
||||||
|
reply.Success = true
|
||||||
|
|
||||||
|
// If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
|
||||||
|
if args.LeaderCommit > rf.commitIndex {
|
||||||
|
lastNewEntry := args.PrevLogIndex + len(args.Entries)
|
||||||
|
if args.LeaderCommit < lastNewEntry {
|
||||||
|
rf.commitIndex = args.LeaderCommit
|
||||||
|
} else {
|
||||||
|
rf.commitIndex = lastNewEntry
|
||||||
|
}
|
||||||
|
rf.applyCond.Signal()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
|
||||||
|
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start - the service using Raft wants to start agreement on the next command
|
||||||
|
func (rf *Raft) Start(command interface{}) (int, int, bool) {
|
||||||
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
if rf.state != Leader {
|
||||||
|
return -1, -1, false
|
||||||
|
}
|
||||||
|
|
||||||
|
index := rf.getLastLogIndex() + 1
|
||||||
|
term := rf.currentTerm
|
||||||
|
|
||||||
|
rf.log = append(rf.log, LogEntry{Term: term, Command: command})
|
||||||
|
rf.persist()
|
||||||
|
|
||||||
|
return index, term, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kill - the tester doesn't halt goroutines created by Raft after each test
|
||||||
func (rf *Raft) Kill() {
|
func (rf *Raft) Kill() {
|
||||||
atomic.StoreInt32(&rf.dead, 1)
|
atomic.StoreInt32(&rf.dead, 1)
|
||||||
// Your code here, if desired.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rf *Raft) killed() bool {
|
func (rf *Raft) killed() bool {
|
||||||
@@ -241,29 +472,328 @@ func (rf *Raft) killed() bool {
|
|||||||
return z == 1
|
return z == 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// The ticker go routine starts a new election if this peer hasn't received
|
// ticker - starts a new election if this peer hasn't received heartbeats recently
|
||||||
// heartsbeats recently.
|
|
||||||
func (rf *Raft) ticker() {
|
func (rf *Raft) ticker() {
|
||||||
for rf.killed() == false {
|
for rf.killed() == false {
|
||||||
|
rf.mu.Lock()
|
||||||
|
state := rf.state
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
// Your code here to check if a leader election should
|
switch state {
|
||||||
// be started and to randomize sleeping time using
|
case Follower, Candidate:
|
||||||
// time.Sleep().
|
rf.checkElectionTimeout()
|
||||||
|
case Leader:
|
||||||
|
rf.sendHeartbeats()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// checkElectionTimeout checks if election timeout has elapsed
|
||||||
// the service or tester wants to create a Raft server. the ports
|
func (rf *Raft) checkElectionTimeout() {
|
||||||
// of all the Raft servers (including this one) are in peers[]. this
|
timeout := time.Duration(ElectionTimeoutMin+rand.Intn(ElectionTimeoutMax-ElectionTimeoutMin)) * time.Millisecond
|
||||||
// server's port is peers[me]. all the servers' peers[] arrays
|
rf.mu.Lock()
|
||||||
// have the same order. persister is a place for this server to
|
elapsed := time.Since(rf.lastHeartbeat)
|
||||||
// save its persistent state, and also initially holds the most
|
rf.mu.Unlock()
|
||||||
// recent saved state, if any. applyCh is a channel on which the
|
|
||||||
// tester or service expects Raft to send ApplyMsg messages.
|
if elapsed >= timeout {
|
||||||
// Make() must return quickly, so it should start goroutines
|
rf.startElection()
|
||||||
// for any long-running work.
|
}
|
||||||
//
|
}
|
||||||
|
|
||||||
|
// startElection starts a new election
|
||||||
|
func (rf *Raft) startElection() {
|
||||||
|
rf.mu.Lock()
|
||||||
|
rf.state = Candidate
|
||||||
|
rf.currentTerm++
|
||||||
|
rf.votedFor = rf.me
|
||||||
|
rf.lastHeartbeat = time.Now()
|
||||||
|
rf.persist()
|
||||||
|
|
||||||
|
term := rf.currentTerm
|
||||||
|
lastLogIndex := rf.getLastLogIndex()
|
||||||
|
lastLogTerm := rf.getLastLogTerm()
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
|
votes := 1
|
||||||
|
finished := 1
|
||||||
|
var mu sync.Mutex
|
||||||
|
cond := sync.NewCond(&mu)
|
||||||
|
|
||||||
|
for i := range rf.peers {
|
||||||
|
if i == rf.me {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
go func(server int) {
|
||||||
|
args := RequestVoteArgs{
|
||||||
|
Term: term,
|
||||||
|
CandidateId: rf.me,
|
||||||
|
LastLogIndex: lastLogIndex,
|
||||||
|
LastLogTerm: lastLogTerm,
|
||||||
|
}
|
||||||
|
reply := RequestVoteReply{}
|
||||||
|
|
||||||
|
ok := rf.sendRequestVote(server, &args, &reply)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
rf.mu.Lock()
|
||||||
|
if reply.Term > rf.currentTerm {
|
||||||
|
rf.currentTerm = reply.Term
|
||||||
|
rf.state = Follower
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.persist()
|
||||||
|
}
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
|
if reply.VoteGranted {
|
||||||
|
votes++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finished++
|
||||||
|
cond.Broadcast()
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
for votes <= len(rf.peers)/2 && finished < len(rf.peers) {
|
||||||
|
cond.Wait()
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
if rf.state != Candidate || rf.currentTerm != term {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if votes > len(rf.peers)/2 {
|
||||||
|
rf.state = Leader
|
||||||
|
// Initialize nextIndex and matchIndex
|
||||||
|
for i := range rf.peers {
|
||||||
|
rf.nextIndex[i] = rf.getLastLogIndex() + 1
|
||||||
|
rf.matchIndex[i] = 0
|
||||||
|
}
|
||||||
|
rf.matchIndex[rf.me] = rf.getLastLogIndex()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendHeartbeats sends heartbeats to all peers
|
||||||
|
func (rf *Raft) sendHeartbeats() {
|
||||||
|
rf.mu.Lock()
|
||||||
|
if rf.state != Leader {
|
||||||
|
rf.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
|
for i := range rf.peers {
|
||||||
|
if i == rf.me {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
go rf.sendAppendEntriesToPeer(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(HeartbeatInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendAppendEntriesToPeer sends AppendEntries RPC to a peer
|
||||||
|
func (rf *Raft) sendAppendEntriesToPeer(server int) {
|
||||||
|
rf.mu.Lock()
|
||||||
|
if rf.state != Leader {
|
||||||
|
rf.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we need to send snapshot
|
||||||
|
if rf.nextIndex[server] <= rf.lastIncludedIndex {
|
||||||
|
rf.mu.Unlock()
|
||||||
|
rf.sendSnapshotToPeer(server)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
prevLogIndex := rf.nextIndex[server] - 1
|
||||||
|
prevLogTerm := 0
|
||||||
|
if prevLogIndex == rf.lastIncludedIndex {
|
||||||
|
prevLogTerm = rf.lastIncludedTerm
|
||||||
|
} else if prevLogIndex > rf.lastIncludedIndex {
|
||||||
|
prevLogTerm = rf.getLogTerm(prevLogIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := make([]LogEntry, 0)
|
||||||
|
if rf.nextIndex[server] <= rf.getLastLogIndex() {
|
||||||
|
entries = append(entries, rf.log[rf.nextIndex[server]-rf.lastIncludedIndex-1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
args := AppendEntriesArgs{
|
||||||
|
Term: rf.currentTerm,
|
||||||
|
LeaderId: rf.me,
|
||||||
|
PrevLogIndex: prevLogIndex,
|
||||||
|
PrevLogTerm: prevLogTerm,
|
||||||
|
Entries: entries,
|
||||||
|
LeaderCommit: rf.commitIndex,
|
||||||
|
}
|
||||||
|
term := rf.currentTerm
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
|
reply := AppendEntriesReply{}
|
||||||
|
ok := rf.sendAppendEntries(server, &args, &reply)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
if rf.state != Leader || rf.currentTerm != term {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if reply.Term > rf.currentTerm {
|
||||||
|
rf.currentTerm = reply.Term
|
||||||
|
rf.state = Follower
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.persist()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if reply.Success {
|
||||||
|
newMatchIndex := args.PrevLogIndex + len(args.Entries)
|
||||||
|
if newMatchIndex > rf.matchIndex[server] {
|
||||||
|
rf.matchIndex[server] = newMatchIndex
|
||||||
|
}
|
||||||
|
rf.nextIndex[server] = rf.matchIndex[server] + 1
|
||||||
|
rf.updateCommitIndex()
|
||||||
|
} else {
|
||||||
|
// Fast backup
|
||||||
|
if reply.XTerm != 0 {
|
||||||
|
// Find last entry with XTerm
|
||||||
|
found := false
|
||||||
|
for i := rf.getLastLogIndex(); i > rf.lastIncludedIndex; i-- {
|
||||||
|
if rf.getLogTerm(i) == reply.XTerm {
|
||||||
|
rf.nextIndex[server] = i + 1
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
rf.nextIndex[server] = reply.XIndex
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rf.nextIndex[server] = reply.XLen
|
||||||
|
}
|
||||||
|
if rf.nextIndex[server] < 1 {
|
||||||
|
rf.nextIndex[server] = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendSnapshotToPeer sends InstallSnapshot RPC to a peer
|
||||||
|
func (rf *Raft) sendSnapshotToPeer(server int) {
|
||||||
|
rf.mu.Lock()
|
||||||
|
if rf.state != Leader {
|
||||||
|
rf.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
args := InstallSnapshotArgs{
|
||||||
|
Term: rf.currentTerm,
|
||||||
|
LeaderId: rf.me,
|
||||||
|
LastIncludedIndex: rf.lastIncludedIndex,
|
||||||
|
LastIncludedTerm: rf.lastIncludedTerm,
|
||||||
|
Data: rf.persister.ReadSnapshot(),
|
||||||
|
}
|
||||||
|
term := rf.currentTerm
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
|
reply := InstallSnapshotReply{}
|
||||||
|
ok := rf.sendInstallSnapshot(server, &args, &reply)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.mu.Lock()
|
||||||
|
defer rf.mu.Unlock()
|
||||||
|
|
||||||
|
if rf.state != Leader || rf.currentTerm != term {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if reply.Term > rf.currentTerm {
|
||||||
|
rf.currentTerm = reply.Term
|
||||||
|
rf.state = Follower
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.persist()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.nextIndex[server] = args.LastIncludedIndex + 1
|
||||||
|
rf.matchIndex[server] = args.LastIncludedIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateCommitIndex updates commitIndex based on matchIndex
|
||||||
|
func (rf *Raft) updateCommitIndex() {
|
||||||
|
// Find N such that N > commitIndex, a majority of matchIndex[i] >= N,
|
||||||
|
// and log[N].term == currentTerm
|
||||||
|
for n := rf.getLastLogIndex(); n > rf.commitIndex && n > rf.lastIncludedIndex; n-- {
|
||||||
|
if rf.getLogTerm(n) != rf.currentTerm {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
count := 1
|
||||||
|
for i := range rf.peers {
|
||||||
|
if i != rf.me && rf.matchIndex[i] >= n {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count > len(rf.peers)/2 {
|
||||||
|
rf.commitIndex = n
|
||||||
|
rf.applyCond.Signal()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// applier applies committed log entries to the state machine
|
||||||
|
func (rf *Raft) applier() {
|
||||||
|
for rf.killed() == false {
|
||||||
|
rf.mu.Lock()
|
||||||
|
for rf.lastApplied >= rf.commitIndex {
|
||||||
|
rf.applyCond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
commitIndex := rf.commitIndex
|
||||||
|
lastApplied := rf.lastApplied
|
||||||
|
entries := make([]LogEntry, 0)
|
||||||
|
for i := lastApplied + 1; i <= commitIndex; i++ {
|
||||||
|
if i > rf.lastIncludedIndex {
|
||||||
|
entries = append(entries, rf.getLogEntry(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rf.mu.Unlock()
|
||||||
|
|
||||||
|
for i, entry := range entries {
|
||||||
|
rf.applyCh <- ApplyMsg{
|
||||||
|
CommandValid: true,
|
||||||
|
Command: entry.Command,
|
||||||
|
CommandIndex: lastApplied + 1 + i,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rf.mu.Lock()
|
||||||
|
if rf.lastApplied < commitIndex {
|
||||||
|
rf.lastApplied = commitIndex
|
||||||
|
}
|
||||||
|
rf.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make - the service or tester wants to create a Raft server
|
||||||
func Make(peers []*labrpc.ClientEnd, me int,
|
func Make(peers []*labrpc.ClientEnd, me int,
|
||||||
persister *Persister, applyCh chan ApplyMsg) *Raft {
|
persister *Persister, applyCh chan ApplyMsg) *Raft {
|
||||||
rf := &Raft{}
|
rf := &Raft{}
|
||||||
@@ -271,14 +801,33 @@ func Make(peers []*labrpc.ClientEnd, me int,
|
|||||||
rf.persister = persister
|
rf.persister = persister
|
||||||
rf.me = me
|
rf.me = me
|
||||||
|
|
||||||
// Your initialization code here (2A, 2B, 2C).
|
// Initialize state
|
||||||
|
rf.currentTerm = 0
|
||||||
|
rf.votedFor = -1
|
||||||
|
rf.log = make([]LogEntry, 0)
|
||||||
|
|
||||||
// initialize from state persisted before a crash
|
rf.commitIndex = 0
|
||||||
|
rf.lastApplied = 0
|
||||||
|
|
||||||
|
rf.nextIndex = make([]int, len(peers))
|
||||||
|
rf.matchIndex = make([]int, len(peers))
|
||||||
|
|
||||||
|
rf.state = Follower
|
||||||
|
rf.lastHeartbeat = time.Now()
|
||||||
|
rf.applyCh = applyCh
|
||||||
|
rf.applyCond = sync.NewCond(&rf.mu)
|
||||||
|
|
||||||
|
rf.lastIncludedIndex = 0
|
||||||
|
rf.lastIncludedTerm = 0
|
||||||
|
|
||||||
|
// Initialize from state persisted before a crash
|
||||||
rf.readPersist(persister.ReadRaftState())
|
rf.readPersist(persister.ReadRaftState())
|
||||||
|
|
||||||
// start ticker goroutine to start elections
|
// Start ticker goroutine to start elections
|
||||||
go rf.ticker()
|
go rf.ticker()
|
||||||
|
|
||||||
|
// Start applier goroutine
|
||||||
|
go rf.applier()
|
||||||
|
|
||||||
return rf
|
return rf
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user