← 返回首页
🌐

Raft共识协议:日志复制与领导者选举

📂 architecture ⏱ 2 min 318 words

Raft共识协议:日志复制与领导者选举

Raft协议核心概念

Raft是一种管理日志复制的共识算法,通过领导者选举、日志复制和安全性三个核心机制保证分布式系统的一致性。相比Paxos,Raft更容易理解和实现。

节点状态:Leader | Follower | Candidate
日志结构:[Term:Index] -> Command
提交条件:大多数节点复制后提交

领导者选举机制

当领导者故障时,Follower超时未收到心跳会转变为Candidate发起选举。获得多数投票的Candidate成为新Leader。

// Raft节点状态
type RaftState int

const (
    Follower RaftState = iota
    Candidate
    Leader
)

type RaftNode struct {
    state        RaftState
    currentTerm  uint64
    votedFor     string
    log          []LogEntry
    commitIndex  uint64
    lastApplied  uint64
    nextIndex    map[string]uint64
    matchIndex   map[string]uint64
    peers        []string
    mu           sync.RWMutex
}

// 领导者选举
func (n *RaftNode) startElection() {
    n.mu.Lock()
    n.state = Candidate
    n.currentTerm++
    n.votedFor = n.id
    term := n.currentTerm
    lastLogIndex := uint64(len(n.log) - 1)
    lastLogTerm := n.log[lastLogIndex].Term
    n.mu.Unlock()
    
    votes := int32(1)
    var mu sync.Mutex
    
    for _, peer := range n.peers {
        go func(peer string) {
            reply := n.sendRequestVote(peer, term, n.id, lastLogIndex, lastLogTerm)
            if reply.VoteGranted {
                mu.Lock()
                votes++
                if votes > int32(len(n.peers)/2) {
                    n.becomeLeader()
                }
                mu.Unlock()
            }
        }(peer)
    }
}

func (n *RaftNode) becomeLeader() {
    n.mu.Lock()
    defer n.mu.Unlock()
    
    n.state = Leader
    for _, peer := range n.peers {
        n.nextIndex[peer] = uint64(len(n.log))
        n.matchIndex[peer] = 0
    }
    go n.heartbeatLoop()
}

日志复制机制

Leader将客户端请求追加到本地日志,然后并行发送给Follower。当多数节点确认后提交日志并应用到状态机。

// 日志复制
func (n *RaftNode) replicateLog(peer string) {
    n.mu.RLock()
    nextIdx := n.nextIndex[peer]
    prevLogIndex := nextIdx - 1
    prevLogTerm := n.log[prevLogIndex].Term
    entries := n.log[nextIdx:]
    leaderCommit := n.commitIndex
    term := n.currentTerm
    n.mu.RUnlock()
    
    reply := n.sendAppendEntries(peer, term, n.id, prevLogIndex, prevLogTerm, entries, leaderCommit)
    
    if reply.Success {
        n.mu.Lock()
        n.matchIndex[peer] = prevLogIndex + uint64(len(entries))
        n.nextIndex[peer] = n.matchIndex[peer] + 1
        n.tryAdvanceCommitIndex()
        n.mu.Unlock()
    } else if reply.Term > term {
        n.stepDown(reply.Term)
    } else {
        // 日志不匹配,回退nextIndex
        n.mu.Lock()
        if n.nextIndex[peer] > 1 {
            n.nextIndex[peer]--
        }
        n.mu.Unlock()
    }
}

// 尝试推进提交索引
func (n *RaftNode) tryAdvanceCommitIndex() {
    for idx := n.commitIndex + 1; idx < uint64(len(n.log)); idx++ {
        if n.log[idx].Term != n.currentTerm {
            continue
        }
        
        count := 1
        for _, peer := range n.peers {
            if n.matchIndex[peer] >= idx {
                count++
            }
        }
        
        if count > len(n.peers)/2 {
            n.commitIndex = idx
        }
    }
}

成员变更

Raft支持安全的集群成员变更,通过联合共识(Joint Consensus)或单节点变更确保安全性。新成员加入时先作为Follower同步日志,确认一致后再参与投票。

// 成员变更请求处理
func (n *RaftNode) AddPeer(newPeer string) error {
    n.mu.Lock()
    defer n.mu.Unlock()
    
    // 创建配置变更日志条目
    entry := LogEntry{
        Term:    n.currentTerm,
        Command: ConfigChangeCommand{
            Type:  AddPeer,
            Peer:  newPeer,
        },
    }
    
    n.log = append(n.log, entry)
    
    // 等待大多数节点确认配置变更
    return n.waitForCommit(uint64(len(n.log) - 1))
}

安全性保证

Raft通过以下机制保证安全性:选举限制(Candidate必须拥有最新的日志才能获得投票)、Leader只追加(Leader不会覆盖或删除自己的日志)、提交限制(Leader只能提交当前任期的日志条目)。