Raft/Paxos共识算法
Raft/Paxos共识算法
什么是共识算法
共识算法是分布式系统中用于在多个节点之间达成一致的算法。在分布式系统中,由于网络延迟、节点故障等原因,节点之间可能无法立即达成一致。共识算法确保即使在部分节点故障的情况下,系统仍能正常工作。
Raft算法
Raft是一种易于理解的共识算法,它将共识问题分解为三个相对独立的子问题:领导者选举、日志复制和安全性。
节点状态
Raft节点有三种状态:
- 领导者(Leader):处理所有客户端请求,管理日志复制
- 跟随者(Follower):接收领导者的心跳和日志条目
- 候选人(Candidate):在选举过程中临时状态
// Raft节点实现
public class RaftNode {
private enum State {
FOLLOWER, CANDIDATE, LEADER
}
private State state;
private int currentTerm;
private int votedFor;
private List<LogEntry> log;
private int commitIndex;
private int lastApplied;
// 领导者状态
private int[] nextIndex;
private int[] matchIndex;
private final String nodeId;
private final List<String> peers;
private final Scheduler scheduler;
public RaftNode(String nodeId, List<String> peers) {
this.nodeId = nodeId;
this.peers = peers;
this.state = State.FOLLOWER;
this.currentTerm = 0;
this.votedFor = -1;
this.log = new ArrayList<>();
this.commitIndex = 0;
this.lastApplied = 0;
this.scheduler = Schedulers.newSingleThreadScheduledExecutor();
// 启动心跳超时定时器
resetElectionTimeout();
}
}
领导者选举
当跟随者在一定时间内没有收到领导者的心跳时,它会发起选举。
// 选举逻辑
public class ElectionManager {
private final RaftNode node;
private final ScheduledFuture<?> electionTimeout;
public ElectionManager(RaftNode node) {
this.node = node;
this.electionTimeout = startElectionTimeout();
}
private ScheduledFuture<?> startElectionTimeout() {
return node.getScheduler().scheduleAtFixedRate(
this::checkElectionTimeout,
150, // 初始延迟
300, // 随机化超时时间
TimeUnit.MILLISECONDS
);
}
private void checkElectionTimeout() {
if (node.getState() == State.FOLLOWER &&
System.currentTimeMillis() - node.getLastHeartbeatTime() > getElectionTimeout()) {
startElection();
}
}
private void startElection() {
// 转换为候选人状态
node.setState(State.CANDIDATE);
node.setCurrentTerm(node.getCurrentTerm() + 1);
node.setVotedFor(node.getNodeId());
// 重置选举超时
resetElectionTimeout();
// 向其他节点请求投票
int votesReceived = 1; // 投票给自己
for (String peer : node.getPeers()) {
CompletableFuture<Boolean> voteFuture = requestVote(peer);
voteFuture.thenAccept(granted -> {
if (granted) {
votesReceived++;
if (votesReceived > node.getPeers().size() / 2) {
becomeLeader();
}
}
});
}
}
private CompletableFuture<Boolean> requestVote(String peer) {
RequestVoteRequest request = new RequestVoteRequest(
node.getCurrentTerm(),
node.getNodeId(),
node.getLastLogIndex(),
node.getLastLogTerm()
);
return node.getRpcClient().sendRequestVote(peer, request);
}
private void becomeLeader() {
node.setState(State.LEADER);
// 初始化领导者状态
int peerCount = node.getPeers().size();
node.setNextIndex(new int[peerCount]);
node.setMatchIndex(new int[peerCount]);
for (int i = 0; i < peerCount; i++) {
node.getNextIndex()[i] = node.getLog().size() + 1;
node.getMatchIndex()[i] = 0;
}
// 发送心跳
sendHeartbeats();
// 停止选举超时
electionTimeout.cancel(false);
}
}
日志复制
领导者接收客户端请求,将日志条目复制到跟随者,并提交日志。
// 日志复制逻辑
public class LogManager {
private final RaftNode node;
public void replicateLog() {
if (node.getState() != State.LEADER) {
return;
}
for (int i = 0; i < node.getPeers().size(); i++) {
String peer = node.getPeers().get(i);
int nextIndex = node.getNextIndex()[i];
if (nextIndex <= node.getLog().size()) {
// 需要复制日志
replicateToPeer(peer, i, nextIndex);
}
}
}
private void replicateToPeer(String peer, int peerIndex, int nextIndex) {
List<LogEntry> entries = node.getLog().subList(
nextIndex - 1,
node.getLog().size()
);
AppendEntriesRequest request = new AppendEntriesRequest(
node.getCurrentTerm(),
node.getNodeId(),
nextIndex - 1,
getLogTerm(nextIndex - 1),
entries,
node.getCommitIndex()
);
node.getRpcClient().sendAppendEntries(peer, request)
.thenAccept(response -> {
if (response.isSuccess()) {
// 更新nextIndex和matchIndex
node.getNextIndex()[peerIndex] = node.getLog().size() + 1;
node.getMatchIndex()[peerIndex] = node.getLog().size();
// 尝试提交日志
tryCommit();
} else {
// 日志不一致,减少nextIndex重试
node.getNextIndex()[peerIndex] = Math.max(1,
node.getNextIndex()[peerIndex] - 1);
}
});
}
private void tryCommit() {
// 找到可以提交的最大索引
for (int n = node.getLog().size(); n > node.getCommitIndex(); n--) {
if (getLogTerm(n) == node.getCurrentTerm()) {
int replicationCount = 1; // 领导者自己
for (int i = 0; i < node.getPeers().size(); i++) {
if (node.getMatchIndex()[i] >= n) {
replicationCount++;
}
}
if (replicationCount > node.getPeers().size() / 2) {
node.setCommitIndex(n);
applyCommittedEntries();
break;
}
}
}
}
}
安全性保证
Raft通过以下机制保证安全性:
- 选举限制:只有拥有最新日志的节点才能成为领导者
- 日志匹配:如果两个日志在某个索引处的术语相同,则该索引之前的所有条目都相同
- 领导者完全性:如果一个日志条目在某个术语中被提交,那么该条目将出现在所有更高术语的领导者日志中
// 选举限制检查
public boolean checkElectionRestriction(RequestVoteRequest request) {
// 检查候选人的日志是否至少和自己一样新
int lastLogIndex = node.getLog().size();
int lastLogTerm = getLogTerm(lastLogIndex);
if (request.getLastLogTerm() > lastLogTerm) {
return true; // 候选人的日志更新
} else if (request.getLastLogTerm() == lastLogTerm) {
return request.getLastLogIndex() >= lastLogIndex; // 术语相同,检查索引
} else {
return false; // 候选人的日志较旧
}
}
// 日志匹配检查
public boolean checkLogMatch(AppendEntriesRequest request) {
// 检查领导者发送的日志条目是否与本地日志匹配
if (request.getPrevLogIndex() > node.getLog().size()) {
return false; // 日志太短
}
if (request.getPrevLogIndex() > 0) {
int prevLogTerm = getLogTerm(request.getPrevLogIndex());
if (prevLogTerm != request.getPrevLogTerm()) {
return false; // 术语不匹配
}
}
return true;
}
Paxos算法
Paxos是一种更通用的共识算法,但比Raft更难理解。它分为两个阶段:准备阶段和接受阶段。
基本Paxos
// 基本Paxos实现
public class BasicPaxos {
private int promisedProposalNumber = -1;
private int acceptedProposalNumber = -1;
private Object acceptedValue = null;
// 准备阶段
public PrepareResponse handlePrepare(PrepareRequest request) {
if (request.getProposalNumber() > promisedProposalNumber) {
promisedProposalNumber = request.getProposalNumber();
return new PrepareResponse(true, acceptedProposalNumber, acceptedValue);
} else {
return new PrepareResponse(false, -1, null);
}
}
// 接受阶段
public AcceptResponse handleAccept(AcceptRequest request) {
if (request.getProposalNumber() >= promisedProposalNumber) {
promisedProposalNumber = request.getProposalNumber();
acceptedProposalNumber = request.getProposalNumber();
acceptedValue = request.getValue();
return new AcceptResponse(true);
} else {
return new AcceptResponse(false);
}
}
}
Multi-Paxos
Multi-Paxos通过选举一个稳定的领导者来优化性能,减少Prepare阶段的开销。
// Multi-Paxos实现
public class MultiPaxos {
private boolean isLeader = false;
private int leaderProposalNumber = -1;
private Map<Integer, LogEntry> log = new HashMap<>();
// 选举领导者
public void electLeader() {
// 使用Basic Paxos选举领导者
PrepareRequest prepare = new PrepareRequest(nextProposalNumber());
broadcastPrepare(prepare);
if (receiveMajorityPromises()) {
isLeader = true;
leaderProposalNumber = prepare.getProposalNumber();
}
}
// 提议日志条目
public void propose(int index, Object value) {
if (isLeader) {
// 领导者可以直接接受
accept(index, value);
} else {
// 跟随者需要通过Prepare和Accept阶段
prepare(index, value);
}
}
private void prepare(int index, Object value) {
PrepareRequest prepare = new PrepareRequest(nextProposalNumber());
broadcastPrepare(prepare);
if (receiveMajorityPromises()) {
AcceptRequest accept = new AcceptRequest(
prepare.getProposalNumber(),
index,
value
);
broadcastAccept(accept);
}
}
private void accept(int index, Object value) {
AcceptRequest accept = new AcceptRequest(
leaderProposalNumber,
index,
value
);
broadcastAccept(accept);
}
}
Raft vs Paxos对比
| 特性 | Raft | Paxos |
|---|---|---|
| 易理解性 | 高 | 低 |
| 实现复杂度 | 低 | 高 |
| 性能 | 中等 | 高 |
| 领导者选举 | 显式 | 隐式 |
| 日志复制 | 连续 | 可跳跃 |
| 实际应用 | etcd, Consul | Chubby, Spanner |
实际应用
etcd中的Raft
// etcd Raft配置
raftConfig := &raft.Config{
ID: uint64(member.ID),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
}
// 启动Raft
raftNode, err := raft.NewRawNode(raftConfig)
if err != nil {
log.Fatal(err)
}
// 主循环
for {
select {
case <-ticker.C:
raftNode.Tick()
case rd := <-raftNode.Ready():
// 处理Ready消息
handleReady(rd)
}
}
ZooKeeper中的ZAB
// ZooKeeper ZAB协议
public class ZabProtocol {
private enum State {
LOOKING, FOLLOWING, LEADING
}
private State state;
private long currentEpoch;
private long lastLoggedZxid;
// 选举过程
public void election() {
// 发送投票
Vote vote = new Vote(myId, lastLoggedZxid, currentEpoch);
broadcastVote(vote);
// 接收投票
Vote bestVote = receiveVotes();
// 检查是否获得多数票
if (hasMajority(bestVote)) {
// 成为领导者
becomeLeader();
} else {
// 成为跟随者
becomeFollower(bestVote.getId());
}
}
}
实践建议
- 选择合适的算法:根据系统需求选择Raft或Paxos
- 正确配置参数:合理设置超时时间、日志大小等参数
- 监控状态:监控节点状态、日志进度等指标
- 处理故障:设计故障恢复机制,确保系统可用性
- 性能优化:根据实际场景优化算法实现