分布式系统基础CAP/BASE
分布式系统基础CAP/BASE
分布式系统概述
分布式系统是由多个独立计算机组成的系统,这些计算机通过网络连接并协调工作,对外表现为一个统一的系统。分布式系统面临的主要挑战包括网络延迟、节点故障、数据一致性等问题。
CAP定理
CAP定理指出,一个分布式系统不可能同时满足以下三个属性:
- 一致性(Consistency):所有节点在同一时间看到的数据是一致的
- 可用性(Availability):每个请求都能在合理的时间内得到响应
- 分区容错性(Partition Tolerance):系统在网络分区时仍能继续运行
由于网络分区是不可避免的,系统必须在一致性和可用性之间做出权衡。
// CAP定理示例
// 强一致性系统 - 牺牲可用性
public class StrongConsistencySystem {
private final Map<String, String> data = new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public void write(String key, String value) {
lock.writeLock().lock();
try {
data.put(key, value);
// 等待所有节点同步完成
syncToAllNodes(key, value);
} finally {
lock.writeLock().unlock();
}
}
public String read(String key) {
lock.readLock().lock();
try {
// 确保读取最新数据
return data.get(key);
} finally {
lock.readLock().unlock();
}
}
}
// 高可用性系统 - 牺牲强一致性
public class HighAvailabilitySystem {
private final Map<String, String> localData = new ConcurrentHashMap<>();
private final Queue<SyncTask> syncQueue = new ConcurrentLinkedQueue<>();
public void write(String key, String value) {
// 立即写入本地
localData.put(key, value);
// 异步同步到其他节点
syncQueue.offer(new SyncTask(key, value));
}
public String read(String key) {
// 读取本地数据,可能不是最新的
return localData.get(key);
}
}
CAP选择策略
| 策略 | 一致性 | 可用性 | 适用场景 |
|---|---|---|---|
| CP系统 | 强一致性 | 低可用性 | 金融交易、库存管理 |
| AP系统 | 最终一致性 | 高可用性 | 社交网络、内容分发 |
| CA系统 | 强一致性 | 高可用性 | 单机数据库(无网络分区) |
BASE理论
BASE理论是CAP定理的延伸,它提出了一种在保证最终一致性的前提下,提高系统可用性的方案:
- 基本可用(Basically Available):系统在出现故障时,仍然能保证核心功能可用
- 软状态(Soft State):系统中的状态可以存在中间状态,不要求强一致性
- 最终一致(Eventually Consistent):系统中的数据最终会达到一致状态
// BASE理论实现示例
public class EventuallyConsistentSystem {
private final Map<String, VersionedData> dataStore = new ConcurrentHashMap<>();
private final Queue<UpdateEvent> eventQueue = new ConcurrentLinkedQueue<>();
// 写入操作 - 本地立即成功,异步同步
public void write(String key, String value) {
VersionedData newData = new VersionedData(value, System.currentTimeMillis());
dataStore.put(key, newData);
// 发布更新事件
eventQueue.offer(new UpdateEvent(key, newData));
}
// 读取操作 - 可能读取到旧数据
public String read(String key) {
VersionedData data = dataStore.get(key);
return data != null ? data.getValue() : null;
}
// 后台同步任务
@Scheduled(fixedDelay = 1000)
public void syncData() {
UpdateEvent event;
while ((event = eventQueue.poll()) != null) {
try {
// 同步到其他节点
syncToOtherNodes(event.getKey(), event.getData());
} catch (Exception e) {
// 失败后重新入队
eventQueue.offer(event);
}
}
}
}
// 带版本的数据
class VersionedData {
private final String value;
private final long version;
private final LocalDateTime updatedAt;
public VersionedData(String value, long version) {
this.value = value;
this.version = version;
this.updatedAt = LocalDateTime.now();
}
// getters
}
一致性模型
强一致性
所有操作都是原子的,读取操作总是返回最新写入的值。
// 强一致性实现 - 使用分布式锁
public class StrongConsistentCache {
private final DistributedLock lock;
private final Cache<String, String> cache;
public void put(String key, String value) {
lock.lock(key);
try {
cache.put(key, value);
// 确保数据持久化
persistToAllNodes(key, value);
} finally {
lock.unlock(key);
}
}
public String get(String key) {
lock.lock(key);
try {
return cache.get(key);
} finally {
lock.unlock(key);
}
}
}
最终一致性
系统保证如果没有新的更新,最终所有副本都会收敛到相同的值。
// 最终一致性实现 - 使用事件驱动
public class EventuallyConsistentStore {
private final Map<String, DataWithVersion> localStore = new ConcurrentHashMap<>();
private final EventPublisher eventPublisher;
public void put(String key, String value) {
DataWithVersion data = new DataWithVersion(value, generateVersion());
localStore.put(key, data);
// 发布更新事件
eventPublisher.publish(new DataUpdatedEvent(key, data));
}
public String get(String key) {
DataWithVersion data = localStore.get(key);
return data != null ? data.getValue() : null;
}
// 处理来自其他节点的更新
@EventHandler
public void onDataUpdated(DataUpdatedEvent event) {
String key = event.getKey();
DataWithVersion remoteData = event.getData();
localStore.merge(key, remoteData, (existing, remote) -> {
// 使用版本号解决冲突
if (remote.getVersion() > existing.getVersion()) {
return remote;
}
return existing;
});
}
}
// 带版本的数据
class DataWithVersion {
private final String value;
private final long version;
// getters and constructors
}
网络分区处理
检测网络分区
// 心跳检测
@Component
public class PartitionDetector {
private final Map<String, NodeStatus> nodeStatuses = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@PostConstruct
public void init() {
scheduler.scheduleAtFixedRate(this::checkNodes, 0, 5, TimeUnit.SECONDS);
}
private void checkNodes() {
for (String nodeId : getAllNodes()) {
try {
// 发送心跳
boolean alive = pingNode(nodeId);
nodeStatuses.put(nodeId, new NodeStatus(alive, System.currentTimeMillis()));
} catch (Exception e) {
nodeStatuses.put(nodeId, new NodeStatus(false, System.currentTimeMillis()));
}
}
// 检测分区
detectPartition();
}
private void detectPartition() {
List<String> aliveNodes = nodeStatuses.entrySet().stream()
.filter(entry -> entry.getValue().isAlive())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
List<String> deadNodes = nodeStatuses.entrySet().stream()
.filter(entry -> !entry.getValue().isAlive())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
if (!deadNodes.isEmpty()) {
// 触发分区处理
handlePartition(aliveNodes, deadNodes);
}
}
}
分区恢复
// 分区恢复策略
@Component
public class PartitionRecovery {
private final DataSyncService syncService;
private final ConflictResolver conflictResolver;
public void recoverFromPartition(List<String> recoveredNodes) {
for (String nodeId : recoveredNodes) {
// 获取该节点的数据
Map<String, DataWithVersion> nodeData = getDataFromNode(nodeId);
// 获取本地数据
Map<String, DataWithVersion> localData = getLocalData();
// 解决冲突
for (Map.Entry<String, DataWithVersion> entry : nodeData.entrySet()) {
String key = entry.getKey();
DataWithVersion remoteData = entry.getValue();
DataWithVersion localDataValue = localData.get(key);
if (localDataValue == null) {
// 本地没有数据,使用远程数据
saveLocal(key, remoteData);
} else {
// 解决冲突
DataWithVersion resolved = conflictResolver.resolve(
localDataValue, remoteData
);
saveLocal(key, resolved);
}
}
}
}
}
时钟同步问题
逻辑时钟
// 向量时钟实现
public class VectorClock {
private final Map<String, Long> clock;
public VectorClock() {
this.clock = new ConcurrentHashMap<>();
}
// 事件发生时递增本地时钟
public void increment(String nodeId) {
clock.merge(nodeId, 1L, Long::sum);
}
// 合并其他节点的时钟
public void merge(VectorClock other) {
for (Map.Entry<String, Long> entry : other.clock.entrySet()) {
clock.merge(entry.getKey(), entry.getValue(), Math::max);
}
}
// 比较两个时钟
public int compare(VectorClock other) {
boolean thisLessOrEqual = true;
boolean otherLessOrEqual = true;
for (String nodeId : clock.keySet()) {
long thisValue = clock.get(nodeId);
long otherValue = other.clock.getOrDefault(nodeId, 0L);
if (thisValue > otherValue) {
otherLessOrEqual = false;
}
if (thisValue < otherValue) {
thisLessOrEqual = false;
}
}
for (String nodeId : other.clock.keySet()) {
if (!clock.containsKey(nodeId)) {
thisLessOrEqual = false;
}
}
if (thisLessOrEqual && otherLessOrEqual) {
return 0; // 相等
} else if (thisLessOrEqual) {
return -1; // this < other
} else if (otherLessOrEqual) {
return 1; // this > other
} else {
return 2; // 并发
}
}
}
实践建议
- 明确需求:根据业务需求选择合适的一致性模型
- 监控告警:建立完善的监控体系,及时发现和处理问题
- 容错设计:设计系统时考虑各种故障场景
- 数据备份:定期备份数据,防止数据丢失
- 故障演练:定期进行故障演练,验证系统的容错能力