← 返回首页
🌐

分布式系统基础CAP/BASE

📂 architecture ⏱ 5 min 826 words

分布式系统基础CAP/BASE

分布式系统概述

分布式系统是由多个独立计算机组成的系统,这些计算机通过网络连接并协调工作,对外表现为一个统一的系统。分布式系统面临的主要挑战包括网络延迟、节点故障、数据一致性等问题。

CAP定理

CAP定理指出,一个分布式系统不可能同时满足以下三个属性:

  1. 一致性(Consistency):所有节点在同一时间看到的数据是一致的
  2. 可用性(Availability):每个请求都能在合理的时间内得到响应
  3. 分区容错性(Partition Tolerance):系统在网络分区时仍能继续运行

由于网络分区是不可避免的,系统必须在一致性和可用性之间做出权衡。

// CAP定理示例
// 强一致性系统 - 牺牲可用性
public class StrongConsistencySystem {
    
    private final Map<String, String> data = new ConcurrentHashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    public void write(String key, String value) {
        lock.writeLock().lock();
        try {
            data.put(key, value);
            // 等待所有节点同步完成
            syncToAllNodes(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public String read(String key) {
        lock.readLock().lock();
        try {
            // 确保读取最新数据
            return data.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }
}

// 高可用性系统 - 牺牲强一致性
public class HighAvailabilitySystem {
    
    private final Map<String, String> localData = new ConcurrentHashMap<>();
    private final Queue<SyncTask> syncQueue = new ConcurrentLinkedQueue<>();
    
    public void write(String key, String value) {
        // 立即写入本地
        localData.put(key, value);
        
        // 异步同步到其他节点
        syncQueue.offer(new SyncTask(key, value));
    }
    
    public String read(String key) {
        // 读取本地数据,可能不是最新的
        return localData.get(key);
    }
}

CAP选择策略

策略 一致性 可用性 适用场景
CP系统 强一致性 低可用性 金融交易、库存管理
AP系统 最终一致性 高可用性 社交网络、内容分发
CA系统 强一致性 高可用性 单机数据库(无网络分区)

BASE理论

BASE理论是CAP定理的延伸,它提出了一种在保证最终一致性的前提下,提高系统可用性的方案:

  1. 基本可用(Basically Available):系统在出现故障时,仍然能保证核心功能可用
  2. 软状态(Soft State):系统中的状态可以存在中间状态,不要求强一致性
  3. 最终一致(Eventually Consistent):系统中的数据最终会达到一致状态
// BASE理论实现示例
public class EventuallyConsistentSystem {
    
    private final Map<String, VersionedData> dataStore = new ConcurrentHashMap<>();
    private final Queue<UpdateEvent> eventQueue = new ConcurrentLinkedQueue<>();
    
    // 写入操作 - 本地立即成功,异步同步
    public void write(String key, String value) {
        VersionedData newData = new VersionedData(value, System.currentTimeMillis());
        dataStore.put(key, newData);
        
        // 发布更新事件
        eventQueue.offer(new UpdateEvent(key, newData));
    }
    
    // 读取操作 - 可能读取到旧数据
    public String read(String key) {
        VersionedData data = dataStore.get(key);
        return data != null ? data.getValue() : null;
    }
    
    // 后台同步任务
    @Scheduled(fixedDelay = 1000)
    public void syncData() {
        UpdateEvent event;
        while ((event = eventQueue.poll()) != null) {
            try {
                // 同步到其他节点
                syncToOtherNodes(event.getKey(), event.getData());
            } catch (Exception e) {
                // 失败后重新入队
                eventQueue.offer(event);
            }
        }
    }
}

// 带版本的数据
class VersionedData {
    private final String value;
    private final long version;
    private final LocalDateTime updatedAt;
    
    public VersionedData(String value, long version) {
        this.value = value;
        this.version = version;
        this.updatedAt = LocalDateTime.now();
    }
    
    // getters
}

一致性模型

强一致性

所有操作都是原子的,读取操作总是返回最新写入的值。

// 强一致性实现 - 使用分布式锁
public class StrongConsistentCache {
    
    private final DistributedLock lock;
    private final Cache<String, String> cache;
    
    public void put(String key, String value) {
        lock.lock(key);
        try {
            cache.put(key, value);
            // 确保数据持久化
            persistToAllNodes(key, value);
        } finally {
            lock.unlock(key);
        }
    }
    
    public String get(String key) {
        lock.lock(key);
        try {
            return cache.get(key);
        } finally {
            lock.unlock(key);
        }
    }
}

最终一致性

系统保证如果没有新的更新,最终所有副本都会收敛到相同的值。

// 最终一致性实现 - 使用事件驱动
public class EventuallyConsistentStore {
    
    private final Map<String, DataWithVersion> localStore = new ConcurrentHashMap<>();
    private final EventPublisher eventPublisher;
    
    public void put(String key, String value) {
        DataWithVersion data = new DataWithVersion(value, generateVersion());
        localStore.put(key, data);
        
        // 发布更新事件
        eventPublisher.publish(new DataUpdatedEvent(key, data));
    }
    
    public String get(String key) {
        DataWithVersion data = localStore.get(key);
        return data != null ? data.getValue() : null;
    }
    
    // 处理来自其他节点的更新
    @EventHandler
    public void onDataUpdated(DataUpdatedEvent event) {
        String key = event.getKey();
        DataWithVersion remoteData = event.getData();
        
        localStore.merge(key, remoteData, (existing, remote) -> {
            // 使用版本号解决冲突
            if (remote.getVersion() > existing.getVersion()) {
                return remote;
            }
            return existing;
        });
    }
}

// 带版本的数据
class DataWithVersion {
    private final String value;
    private final long version;
    
    // getters and constructors
}

网络分区处理

检测网络分区

// 心跳检测
@Component
public class PartitionDetector {
    
    private final Map<String, NodeStatus> nodeStatuses = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    @PostConstruct
    public void init() {
        scheduler.scheduleAtFixedRate(this::checkNodes, 0, 5, TimeUnit.SECONDS);
    }
    
    private void checkNodes() {
        for (String nodeId : getAllNodes()) {
            try {
                // 发送心跳
                boolean alive = pingNode(nodeId);
                nodeStatuses.put(nodeId, new NodeStatus(alive, System.currentTimeMillis()));
            } catch (Exception e) {
                nodeStatuses.put(nodeId, new NodeStatus(false, System.currentTimeMillis()));
            }
        }
        
        // 检测分区
        detectPartition();
    }
    
    private void detectPartition() {
        List<String> aliveNodes = nodeStatuses.entrySet().stream()
                .filter(entry -> entry.getValue().isAlive())
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
        
        List<String> deadNodes = nodeStatuses.entrySet().stream()
                .filter(entry -> !entry.getValue().isAlive())
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
        
        if (!deadNodes.isEmpty()) {
            // 触发分区处理
            handlePartition(aliveNodes, deadNodes);
        }
    }
}

分区恢复

// 分区恢复策略
@Component
public class PartitionRecovery {
    
    private final DataSyncService syncService;
    private final ConflictResolver conflictResolver;
    
    public void recoverFromPartition(List<String> recoveredNodes) {
        for (String nodeId : recoveredNodes) {
            // 获取该节点的数据
            Map<String, DataWithVersion> nodeData = getDataFromNode(nodeId);
            
            // 获取本地数据
            Map<String, DataWithVersion> localData = getLocalData();
            
            // 解决冲突
            for (Map.Entry<String, DataWithVersion> entry : nodeData.entrySet()) {
                String key = entry.getKey();
                DataWithVersion remoteData = entry.getValue();
                DataWithVersion localDataValue = localData.get(key);
                
                if (localDataValue == null) {
                    // 本地没有数据,使用远程数据
                    saveLocal(key, remoteData);
                } else {
                    // 解决冲突
                    DataWithVersion resolved = conflictResolver.resolve(
                        localDataValue, remoteData
                    );
                    saveLocal(key, resolved);
                }
            }
        }
    }
}

时钟同步问题

逻辑时钟

// 向量时钟实现
public class VectorClock {
    
    private final Map<String, Long> clock;
    
    public VectorClock() {
        this.clock = new ConcurrentHashMap<>();
    }
    
    // 事件发生时递增本地时钟
    public void increment(String nodeId) {
        clock.merge(nodeId, 1L, Long::sum);
    }
    
    // 合并其他节点的时钟
    public void merge(VectorClock other) {
        for (Map.Entry<String, Long> entry : other.clock.entrySet()) {
            clock.merge(entry.getKey(), entry.getValue(), Math::max);
        }
    }
    
    // 比较两个时钟
    public int compare(VectorClock other) {
        boolean thisLessOrEqual = true;
        boolean otherLessOrEqual = true;
        
        for (String nodeId : clock.keySet()) {
            long thisValue = clock.get(nodeId);
            long otherValue = other.clock.getOrDefault(nodeId, 0L);
            
            if (thisValue > otherValue) {
                otherLessOrEqual = false;
            }
            if (thisValue < otherValue) {
                thisLessOrEqual = false;
            }
        }
        
        for (String nodeId : other.clock.keySet()) {
            if (!clock.containsKey(nodeId)) {
                thisLessOrEqual = false;
            }
        }
        
        if (thisLessOrEqual && otherLessOrEqual) {
            return 0; // 相等
        } else if (thisLessOrEqual) {
            return -1; // this < other
        } else if (otherLessOrEqual) {
            return 1; // this > other
        } else {
            return 2; // 并发
        }
    }
}

实践建议

  1. 明确需求:根据业务需求选择合适的一致性模型
  2. 监控告警:建立完善的监控体系,及时发现和处理问题
  3. 容错设计:设计系统时考虑各种故障场景
  4. 数据备份:定期备份数据,防止数据丢失
  5. 故障演练:定期进行故障演练,验证系统的容错能力