← 返回首页
🌐

Raft/Paxos共识算法

📂 architecture ⏱ 5 min 913 words

Raft/Paxos共识算法

什么是共识算法

共识算法是分布式系统中用于在多个节点之间达成一致的算法。在分布式系统中,由于网络延迟、节点故障等原因,节点之间可能无法立即达成一致。共识算法确保即使在部分节点故障的情况下,系统仍能正常工作。

Raft算法

Raft是一种易于理解的共识算法,它将共识问题分解为三个相对独立的子问题:领导者选举、日志复制和安全性。

节点状态

Raft节点有三种状态:

// Raft节点实现
public class RaftNode {
    
    private enum State {
        FOLLOWER, CANDIDATE, LEADER
    }
    
    private State state;
    private int currentTerm;
    private int votedFor;
    private List<LogEntry> log;
    private int commitIndex;
    private int lastApplied;
    
    // 领导者状态
    private int[] nextIndex;
    private int[] matchIndex;
    
    private final String nodeId;
    private final List<String> peers;
    private final Scheduler scheduler;
    
    public RaftNode(String nodeId, List<String> peers) {
        this.nodeId = nodeId;
        this.peers = peers;
        this.state = State.FOLLOWER;
        this.currentTerm = 0;
        this.votedFor = -1;
        this.log = new ArrayList<>();
        this.commitIndex = 0;
        this.lastApplied = 0;
        this.scheduler = Schedulers.newSingleThreadScheduledExecutor();
        
        // 启动心跳超时定时器
        resetElectionTimeout();
    }
}

领导者选举

当跟随者在一定时间内没有收到领导者的心跳时,它会发起选举。

// 选举逻辑
public class ElectionManager {
    
    private final RaftNode node;
    private final ScheduledFuture<?> electionTimeout;
    
    public ElectionManager(RaftNode node) {
        this.node = node;
        this.electionTimeout = startElectionTimeout();
    }
    
    private ScheduledFuture<?> startElectionTimeout() {
        return node.getScheduler().scheduleAtFixedRate(
            this::checkElectionTimeout,
            150,  // 初始延迟
            300,  // 随机化超时时间
            TimeUnit.MILLISECONDS
        );
    }
    
    private void checkElectionTimeout() {
        if (node.getState() == State.FOLLOWER && 
            System.currentTimeMillis() - node.getLastHeartbeatTime() > getElectionTimeout()) {
            startElection();
        }
    }
    
    private void startElection() {
        // 转换为候选人状态
        node.setState(State.CANDIDATE);
        node.setCurrentTerm(node.getCurrentTerm() + 1);
        node.setVotedFor(node.getNodeId());
        
        // 重置选举超时
        resetElectionTimeout();
        
        // 向其他节点请求投票
        int votesReceived = 1; // 投票给自己
        
        for (String peer : node.getPeers()) {
            CompletableFuture<Boolean> voteFuture = requestVote(peer);
            voteFuture.thenAccept(granted -> {
                if (granted) {
                    votesReceived++;
                    if (votesReceived > node.getPeers().size() / 2) {
                        becomeLeader();
                    }
                }
            });
        }
    }
    
    private CompletableFuture<Boolean> requestVote(String peer) {
        RequestVoteRequest request = new RequestVoteRequest(
            node.getCurrentTerm(),
            node.getNodeId(),
            node.getLastLogIndex(),
            node.getLastLogTerm()
        );
        
        return node.getRpcClient().sendRequestVote(peer, request);
    }
    
    private void becomeLeader() {
        node.setState(State.LEADER);
        
        // 初始化领导者状态
        int peerCount = node.getPeers().size();
        node.setNextIndex(new int[peerCount]);
        node.setMatchIndex(new int[peerCount]);
        
        for (int i = 0; i < peerCount; i++) {
            node.getNextIndex()[i] = node.getLog().size() + 1;
            node.getMatchIndex()[i] = 0;
        }
        
        // 发送心跳
        sendHeartbeats();
        
        // 停止选举超时
        electionTimeout.cancel(false);
    }
}

日志复制

领导者接收客户端请求,将日志条目复制到跟随者,并提交日志。

// 日志复制逻辑
public class LogManager {
    
    private final RaftNode node;
    
    public void replicateLog() {
        if (node.getState() != State.LEADER) {
            return;
        }
        
        for (int i = 0; i < node.getPeers().size(); i++) {
            String peer = node.getPeers().get(i);
            int nextIndex = node.getNextIndex()[i];
            
            if (nextIndex <= node.getLog().size()) {
                // 需要复制日志
                replicateToPeer(peer, i, nextIndex);
            }
        }
    }
    
    private void replicateToPeer(String peer, int peerIndex, int nextIndex) {
        List<LogEntry> entries = node.getLog().subList(
            nextIndex - 1, 
            node.getLog().size()
        );
        
        AppendEntriesRequest request = new AppendEntriesRequest(
            node.getCurrentTerm(),
            node.getNodeId(),
            nextIndex - 1,
            getLogTerm(nextIndex - 1),
            entries,
            node.getCommitIndex()
        );
        
        node.getRpcClient().sendAppendEntries(peer, request)
            .thenAccept(response -> {
                if (response.isSuccess()) {
                    // 更新nextIndex和matchIndex
                    node.getNextIndex()[peerIndex] = node.getLog().size() + 1;
                    node.getMatchIndex()[peerIndex] = node.getLog().size();
                    
                    // 尝试提交日志
                    tryCommit();
                } else {
                    // 日志不一致,减少nextIndex重试
                    node.getNextIndex()[peerIndex] = Math.max(1, 
                        node.getNextIndex()[peerIndex] - 1);
                }
            });
    }
    
    private void tryCommit() {
        // 找到可以提交的最大索引
        for (int n = node.getLog().size(); n > node.getCommitIndex(); n--) {
            if (getLogTerm(n) == node.getCurrentTerm()) {
                int replicationCount = 1; // 领导者自己
                for (int i = 0; i < node.getPeers().size(); i++) {
                    if (node.getMatchIndex()[i] >= n) {
                        replicationCount++;
                    }
                }
                
                if (replicationCount > node.getPeers().size() / 2) {
                    node.setCommitIndex(n);
                    applyCommittedEntries();
                    break;
                }
            }
        }
    }
}

安全性保证

Raft通过以下机制保证安全性:

  1. 选举限制:只有拥有最新日志的节点才能成为领导者
  2. 日志匹配:如果两个日志在某个索引处的术语相同,则该索引之前的所有条目都相同
  3. 领导者完全性:如果一个日志条目在某个术语中被提交,那么该条目将出现在所有更高术语的领导者日志中
// 选举限制检查
public boolean checkElectionRestriction(RequestVoteRequest request) {
    // 检查候选人的日志是否至少和自己一样新
    int lastLogIndex = node.getLog().size();
    int lastLogTerm = getLogTerm(lastLogIndex);
    
    if (request.getLastLogTerm() > lastLogTerm) {
        return true; // 候选人的日志更新
    } else if (request.getLastLogTerm() == lastLogTerm) {
        return request.getLastLogIndex() >= lastLogIndex; // 术语相同,检查索引
    } else {
        return false; // 候选人的日志较旧
    }
}

// 日志匹配检查
public boolean checkLogMatch(AppendEntriesRequest request) {
    // 检查领导者发送的日志条目是否与本地日志匹配
    if (request.getPrevLogIndex() > node.getLog().size()) {
        return false; // 日志太短
    }
    
    if (request.getPrevLogIndex() > 0) {
        int prevLogTerm = getLogTerm(request.getPrevLogIndex());
        if (prevLogTerm != request.getPrevLogTerm()) {
            return false; // 术语不匹配
        }
    }
    
    return true;
}

Paxos算法

Paxos是一种更通用的共识算法,但比Raft更难理解。它分为两个阶段:准备阶段和接受阶段。

基本Paxos

// 基本Paxos实现
public class BasicPaxos {
    
    private int promisedProposalNumber = -1;
    private int acceptedProposalNumber = -1;
    private Object acceptedValue = null;
    
    // 准备阶段
    public PrepareResponse handlePrepare(PrepareRequest request) {
        if (request.getProposalNumber() > promisedProposalNumber) {
            promisedProposalNumber = request.getProposalNumber();
            return new PrepareResponse(true, acceptedProposalNumber, acceptedValue);
        } else {
            return new PrepareResponse(false, -1, null);
        }
    }
    
    // 接受阶段
    public AcceptResponse handleAccept(AcceptRequest request) {
        if (request.getProposalNumber() >= promisedProposalNumber) {
            promisedProposalNumber = request.getProposalNumber();
            acceptedProposalNumber = request.getProposalNumber();
            acceptedValue = request.getValue();
            return new AcceptResponse(true);
        } else {
            return new AcceptResponse(false);
        }
    }
}

Multi-Paxos

Multi-Paxos通过选举一个稳定的领导者来优化性能,减少Prepare阶段的开销。

// Multi-Paxos实现
public class MultiPaxos {
    
    private boolean isLeader = false;
    private int leaderProposalNumber = -1;
    private Map<Integer, LogEntry> log = new HashMap<>();
    
    // 选举领导者
    public void electLeader() {
        // 使用Basic Paxos选举领导者
        PrepareRequest prepare = new PrepareRequest(nextProposalNumber());
        broadcastPrepare(prepare);
        
        if (receiveMajorityPromises()) {
            isLeader = true;
            leaderProposalNumber = prepare.getProposalNumber();
        }
    }
    
    // 提议日志条目
    public void propose(int index, Object value) {
        if (isLeader) {
            // 领导者可以直接接受
            accept(index, value);
        } else {
            // 跟随者需要通过Prepare和Accept阶段
            prepare(index, value);
        }
    }
    
    private void prepare(int index, Object value) {
        PrepareRequest prepare = new PrepareRequest(nextProposalNumber());
        broadcastPrepare(prepare);
        
        if (receiveMajorityPromises()) {
            AcceptRequest accept = new AcceptRequest(
                prepare.getProposalNumber(), 
                index, 
                value
            );
            broadcastAccept(accept);
        }
    }
    
    private void accept(int index, Object value) {
        AcceptRequest accept = new AcceptRequest(
            leaderProposalNumber, 
            index, 
            value
        );
        broadcastAccept(accept);
    }
}

Raft vs Paxos对比

特性 Raft Paxos
易理解性
实现复杂度
性能 中等
领导者选举 显式 隐式
日志复制 连续 可跳跃
实际应用 etcd, Consul Chubby, Spanner

实际应用

etcd中的Raft

// etcd Raft配置
raftConfig := &raft.Config{
    ID:              uint64(member.ID),
    ElectionTick:    10,
    HeartbeatTick:   1,
    Storage:         raftStorage,
    MaxSizePerMsg:   1024 * 1024,
    MaxInflightMsgs: 256,
}

// 启动Raft
raftNode, err := raft.NewRawNode(raftConfig)
if err != nil {
    log.Fatal(err)
}

// 主循环
for {
    select {
    case <-ticker.C:
        raftNode.Tick()
    case rd := <-raftNode.Ready():
        // 处理Ready消息
        handleReady(rd)
    }
}

ZooKeeper中的ZAB

// ZooKeeper ZAB协议
public class ZabProtocol {
    
    private enum State {
        LOOKING, FOLLOWING, LEADING
    }
    
    private State state;
    private long currentEpoch;
    private long lastLoggedZxid;
    
    // 选举过程
    public void election() {
        // 发送投票
        Vote vote = new Vote(myId, lastLoggedZxid, currentEpoch);
        broadcastVote(vote);
        
        // 接收投票
        Vote bestVote = receiveVotes();
        
        // 检查是否获得多数票
        if (hasMajority(bestVote)) {
            // 成为领导者
            becomeLeader();
        } else {
            // 成为跟随者
            becomeFollower(bestVote.getId());
        }
    }
}

实践建议

  1. 选择合适的算法:根据系统需求选择Raft或Paxos
  2. 正确配置参数:合理设置超时时间、日志大小等参数
  3. 监控状态:监控节点状态、日志进度等指标
  4. 处理故障:设计故障恢复机制,确保系统可用性
  5. 性能优化:根据实际场景优化算法实现