Raft共识协议:日志复制与领导者选举
Raft共识协议:日志复制与领导者选举
Raft协议核心概念
Raft是一种管理日志复制的共识算法,通过领导者选举、日志复制和安全性三个核心机制保证分布式系统的一致性。相比Paxos,Raft更容易理解和实现。
节点状态:Leader | Follower | Candidate
日志结构:[Term:Index] -> Command
提交条件:大多数节点复制后提交
领导者选举机制
当领导者故障时,Follower超时未收到心跳会转变为Candidate发起选举。获得多数投票的Candidate成为新Leader。
// Raft节点状态
type RaftState int
const (
Follower RaftState = iota
Candidate
Leader
)
type RaftNode struct {
state RaftState
currentTerm uint64
votedFor string
log []LogEntry
commitIndex uint64
lastApplied uint64
nextIndex map[string]uint64
matchIndex map[string]uint64
peers []string
mu sync.RWMutex
}
// 领导者选举
func (n *RaftNode) startElection() {
n.mu.Lock()
n.state = Candidate
n.currentTerm++
n.votedFor = n.id
term := n.currentTerm
lastLogIndex := uint64(len(n.log) - 1)
lastLogTerm := n.log[lastLogIndex].Term
n.mu.Unlock()
votes := int32(1)
var mu sync.Mutex
for _, peer := range n.peers {
go func(peer string) {
reply := n.sendRequestVote(peer, term, n.id, lastLogIndex, lastLogTerm)
if reply.VoteGranted {
mu.Lock()
votes++
if votes > int32(len(n.peers)/2) {
n.becomeLeader()
}
mu.Unlock()
}
}(peer)
}
}
func (n *RaftNode) becomeLeader() {
n.mu.Lock()
defer n.mu.Unlock()
n.state = Leader
for _, peer := range n.peers {
n.nextIndex[peer] = uint64(len(n.log))
n.matchIndex[peer] = 0
}
go n.heartbeatLoop()
}
日志复制机制
Leader将客户端请求追加到本地日志,然后并行发送给Follower。当多数节点确认后提交日志并应用到状态机。
// 日志复制
func (n *RaftNode) replicateLog(peer string) {
n.mu.RLock()
nextIdx := n.nextIndex[peer]
prevLogIndex := nextIdx - 1
prevLogTerm := n.log[prevLogIndex].Term
entries := n.log[nextIdx:]
leaderCommit := n.commitIndex
term := n.currentTerm
n.mu.RUnlock()
reply := n.sendAppendEntries(peer, term, n.id, prevLogIndex, prevLogTerm, entries, leaderCommit)
if reply.Success {
n.mu.Lock()
n.matchIndex[peer] = prevLogIndex + uint64(len(entries))
n.nextIndex[peer] = n.matchIndex[peer] + 1
n.tryAdvanceCommitIndex()
n.mu.Unlock()
} else if reply.Term > term {
n.stepDown(reply.Term)
} else {
// 日志不匹配,回退nextIndex
n.mu.Lock()
if n.nextIndex[peer] > 1 {
n.nextIndex[peer]--
}
n.mu.Unlock()
}
}
// 尝试推进提交索引
func (n *RaftNode) tryAdvanceCommitIndex() {
for idx := n.commitIndex + 1; idx < uint64(len(n.log)); idx++ {
if n.log[idx].Term != n.currentTerm {
continue
}
count := 1
for _, peer := range n.peers {
if n.matchIndex[peer] >= idx {
count++
}
}
if count > len(n.peers)/2 {
n.commitIndex = idx
}
}
}
成员变更
Raft支持安全的集群成员变更,通过联合共识(Joint Consensus)或单节点变更确保安全性。新成员加入时先作为Follower同步日志,确认一致后再参与投票。
// 成员变更请求处理
func (n *RaftNode) AddPeer(newPeer string) error {
n.mu.Lock()
defer n.mu.Unlock()
// 创建配置变更日志条目
entry := LogEntry{
Term: n.currentTerm,
Command: ConfigChangeCommand{
Type: AddPeer,
Peer: newPeer,
},
}
n.log = append(n.log, entry)
// 等待大多数节点确认配置变更
return n.waitForCommit(uint64(len(n.log) - 1))
}
安全性保证
Raft通过以下机制保证安全性:选举限制(Candidate必须拥有最新的日志才能获得投票)、Leader只追加(Leader不会覆盖或删除自己的日志)、提交限制(Leader只能提交当前任期的日志条目)。