分布式系统
分布式系统
分布式系统是现代软件架构的核心。本文将深入探讨CAP定理、一致性模型和分布式事务处理,帮助开发者设计可靠的分布式系统。
CAP定理
CAP定理指出,分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)中的两个。
from dataclasses import dataclass
from typing import Dict, Any, Optional
import time
import random
@dataclass
class Node:
id: str
data: Dict[str, Any]
is_alive: bool = True
def read(self, key: str) -> Optional[Any]:
if not self.is_alive:
raise ConnectionError(f"节点 {self.id} 不可用")
return self.data.get(key)
def write(self, key: str, value: Any) -> bool:
if not self.is_alive:
raise ConnectionError(f"节点 {self.id} 不可用")
self.data[key] = value
return True
class CAPDemo:
"""CAP定理演示"""
def __init__(self):
self.nodes = {
'node1': Node('node1', {'counter': 0}),
'node2': Node('node2', {'counter': 0}),
'node3': Node('node3', {'counter': 0})
}
def simulate_partition(self, node_ids: list):
"""模拟网络分区"""
for node_id in node_ids:
self.nodes[node_id].is_alive = False
print(f"节点 {node_ids} 发生网络分区")
def heal_partition(self):
"""恢复网络分区"""
for node in self.nodes.values():
node.is_alive = True
print("网络分区已恢复")
def consistency_example(self):
"""一致性示例:写操作需要所有节点确认"""
print("\n一致性示例:")
print("写操作需要所有节点确认")
# 模拟写操作
success_count = 0
for node_id, node in self.nodes.items():
if node.is_alive:
node.write('counter', 1)
success_count += 1
print(f"成功写入 {success_count}/{len(self.nodes)} 个节点")
# 读取所有节点
for node_id, node in self.nodes.items():
if node.is_alive:
value = node.read('counter')
print(f"节点 {node_id}: {value}")
def availability_example(self):
"""可用性示例:任何节点都可以处理请求"""
print("\n可用性示例:")
print("任何节点都可以处理请求,即使部分节点不可用")
# 模拟分区
self.simulate_partition(['node2', 'node3'])
# 尝试读写
try:
self.nodes['node1'].write('counter', 2)
print(f"节点1写入成功: {self.nodes['node1'].read('counter')}")
except ConnectionError as e:
print(f"写入失败: {e}")
# 恢复分区
self.heal_partition()
def partition_tolerance_example(self):
"""分区容错性示例:系统在网络分区时继续运行"""
print("\n分区容错性示例:")
print("系统在网络分区时继续运行")
# 模拟分区
self.simulate_partition(['node2'])
# 部分节点不可用,但系统继续运行
available_nodes = [n for n in self.nodes.values() if n.is_alive]
print(f"可用节点: {len(available_nodes)}/{len(self.nodes)}")
# 在可用节点上执行操作
for node in available_nodes:
node.write('counter', 3)
self.heal_partition()
# 测试CAP
cap_demo = CAPDemo()
cap_demo.consistency_example()
cap_demo.availability_example()
cap_demo.partition_tolerance_example()
一致性模型
分布式系统有多种一致性模型,从强一致性到最终一致性。选择合适的一致性模型对于系统设计至关重要。
import time
import threading
from dataclasses import dataclass
from typing import Dict, List, Any
from enum import Enum
class ConsistencyLevel(Enum):
STRONG = "strong"
WEAK = "weak"
EVENTUAL = "eventual"
@dataclass
class DistributedKVStore:
"""分布式键值存储"""
nodes: Dict[str, Dict[str, Any]]
consistency_level: ConsistencyLevel
def __init__(self, consistency_level: ConsistencyLevel = ConsistencyLevel.EVENTUAL):
self.nodes = {}
self.consistency_level = consistency_level
self.lock = threading.Lock()
def add_node(self, node_id: str):
self.nodes[node_id] = {}
def write(self, key: str, value: Any, node_id: str = None):
"""写入操作"""
if self.consistency_level == ConsistencyLevel.STRONG:
return self._strong_write(key, value)
elif self.consistency_level == ConsistencyLevel.WEAK:
return self._weak_write(key, value, node_id)
else:
return self._eventual_write(key, value)
def _strong_write(self, key: str, value: Any):
"""强一致性写入:所有节点必须确认"""
with self.lock:
success_count = 0
for node_id in self.nodes:
self.nodes[node_id][key] = value
success_count += 1
print(f"强一致性写入: {key}={value} 到 {success_count} 个节点")
return success_count == len(self.nodes)
def _weak_write(self, key: str, value: Any, node_id: str):
"""弱一致性写入:只写入一个节点"""
with self.lock:
if node_id in self.nodes:
self.nodes[node_id][key] = value
print(f"弱一致性写入: {key}={value} 到节点 {node_id}")
return True
return False
def _eventual_write(self, key: str, value: Any):
"""最终一致性写入:异步复制到所有节点"""
with self.lock:
# 写入主节点
primary_node = list(self.nodes.keys())[0]
self.nodes[primary_node][key] = value
# 异步复制到其他节点
def replicate():
time.sleep(0.1) # 模拟网络延迟
for node_id in list(self.nodes.keys())[1:]:
self.nodes[node_id][key] = value
threading.Thread(target=replicate, daemon=True).start()
print(f"最终一致性写入: {key}={value} 到主节点 {primary_node}")
return True
def read(self, key: str, node_id: str = None):
"""读取操作"""
if self.consistency_level == ConsistencyLevel.STRONG:
return self._strong_read(key)
elif self.consistency_level == ConsistencyLevel.WEAK:
return self._weak_read(key, node_id)
else:
return self._eventual_read(key)
def _strong_read(self, key: str):
"""强一致性读取:从所有节点读取并验证一致性"""
with self.lock:
values = {}
for node_id, node_data in self.nodes.items():
if key in node_data:
values[node_id] = node_data[key]
if len(set(values.values())) == 1:
return list(values.values())[0]
else:
print(f"一致性冲突: {values}")
return None
def _weak_read(self, key: str, node_id: str):
"""弱一致性读取:从指定节点读取"""
with self.lock:
if node_id in self.nodes:
return self.nodes[node_id].get(key)
return None
def _eventual_read(self, key: str):
"""最终一致性读取:从任意节点读取"""
with self.lock:
for node_data in self.nodes.values():
if key in node_data:
return node_data[key]
return None
# 测试一致性模型
print("强一致性模型:")
strong_store = DistributedKVStore(ConsistencyLevel.STRONG)
strong_store.add_node("node1")
strong_store.add_node("node2")
strong_store.add_node("node3")
strong_store.write("key1", "value1")
print(f"读取 key1: {strong_store.read('key1')}")
print("\n弱一致性模型:")
weak_store = DistributedKVStore(ConsistencyLevel.WEAK)
weak_store.add_node("node1")
weak_store.add_node("node2")
weak_store.add_node("node3")
weak_store.write("key2", "value2", "node1")
print(f"从node1读取 key2: {weak_store.read('key2', 'node1')}")
print(f"从node2读取 key2: {weak_store.read('key2', 'node2')}")
print("\n最终一致性模型:")
eventual_store = DistributedKVStore(ConsistencyLevel.EVENTUAL)
eventual_store.add_node("node1")
eventual_store.add_node("node2")
eventual_store.add_node("node3")
eventual_store.write("key3", "value3")
time.sleep(0.2) # 等待复制完成
print(f"从node2读取 key3: {eventual_store.read('key3')}")
分布式事务
分布式事务是确保跨多个节点操作原子性的机制。常见的实现方式包括两阶段提交、三阶段提交和Saga模式。
import time
import threading
from dataclasses import dataclass
from typing import List, Callable, Any
from enum import Enum
class TransactionState(Enum):
PENDING = "pending"
PREPARED = "prepared"
COMMITTED = "committed"
ABORTED = "aborted"
@dataclass
class TransactionParticipant:
"""事务参与者"""
id: str
state: TransactionState = TransactionState.PENDING
data: dict = None
def __post_init__(self):
if self.data is None:
self.data = {}
def prepare(self) -> bool:
"""准备阶段"""
print(f"参与者 {self.id}: 准备事务")
self.state = TransactionState.PREPARED
return True
def commit(self) -> bool:
"""提交阶段"""
print(f"参与者 {self.id}: 提交事务")
self.state = TransactionState.COMMITTED
return True
def abort(self) -> bool:
"""回滚阶段"""
print(f"参与者 {self.id}: 回滚事务")
self.state = TransactionState.ABORTED
return True
class TwoPhaseCommit:
"""两阶段提交协议"""
def __init__(self):
self.participants: List[TransactionParticipant] = []
self.coordinator_state = TransactionState.PENDING
def add_participant(self, participant: TransactionParticipant):
self.participants.append(participant)
def execute_transaction(self, operations: List[Callable]) -> bool:
"""执行分布式事务"""
print("开始两阶段提交事务")
# 阶段1:准备
print("\n阶段1:准备")
prepared = True
for participant in self.participants:
if not participant.prepare():
prepared = False
break
if not prepared:
print("准备阶段失败,回滚所有参与者")
self._rollback_all()
return False
# 执行业务操作
print("\n执行业务操作")
for operation in operations:
try:
operation()
except Exception as e:
print(f"业务操作失败: {e}")
self._rollback_all()
return False
# 阶段2:提交
print("\n阶段2:提交")
committed = True
for participant in self.participants:
if not participant.commit():
committed = False
break
if committed:
print("事务提交成功")
self.coordinator_state = TransactionState.COMMITTED
return True
else:
print("提交阶段失败")
self.coordinator_state = TransactionState.ABORTED
return False
def _rollback_all(self):
"""回滚所有参与者"""
print("回滚所有参与者")
for participant in self.participants:
participant.abort()
self.coordinator_state = TransactionState.ABORTED
# 测试两阶段提交
print("两阶段提交测试:")
tc = TwoPhaseCommit()
# 添加参与者
participant1 = TransactionParticipant("participant1")
participant2 = TransactionParticipant("participant2")
tc.add_participant(participant1)
tc.add_participant(participant2)
# 定义业务操作
def operation1():
participant1.data["balance"] = 1000
print("操作1: 设置participant1余额为1000")
def operation2():
participant2.data["balance"] = 2000
print("操作2: 设置participant2余额为2000")
# 执行事务
success = tc.execute_transaction([operation1, operation2])
print(f"事务结果: {'成功' if success else '失败'}")
# 三阶段提交
class ThreePhaseCommit:
"""三阶段提交协议"""
def __init__(self):
self.participants: List[TransactionParticipant] = []
def add_participant(self, participant: TransactionParticipant):
self.participants.append(participant)
def execute_transaction(self, operations: List[Callable]) -> bool:
"""执行三阶段提交事务"""
print("开始三阶段提交事务")
# 阶段1:CanCommit
print("\n阶段1:CanCommit")
can_commit = True
for participant in self.participants:
if not participant.prepare():
can_commit = False
break
if not can_commit:
print("CanCommit阶段失败")
return False
# 阶段2:PreCommit
print("\n阶段2:PreCommit")
for participant in self.participants:
participant.state = TransactionState.PREPARED
# 执行业务操作
print("\n执行业务操作")
for operation in operations:
operation()
# 阶段3:DoCommit
print("\n阶段3:DoCommit")
for participant in self.participants:
participant.commit()
print("三阶段提交事务完成")
return True
# Saga模式
class Saga:
"""Saga模式"""
def __init__(self):
self.steps: List[Callable] = []
self.compensations: List[Callable] = []
def add_step(self, action: Callable, compensation: Callable):
self.steps.append(action)
self.compensations.append(compensation)
def execute(self) -> bool:
"""执行Saga事务"""
print("开始Saga事务")
executed_steps = []
for i, (step, compensation) in enumerate(zip(self.steps, self.compensations)):
try:
print(f"执行步骤 {i+1}")
step()
executed_steps.append(i)
except Exception as e:
print(f"步骤 {i+1} 失败: {e}")
print("开始补偿操作")
# 反向执行补偿操作
for j in reversed(executed_steps):
print(f"补偿步骤 {j+1}")
self.compensations[j]()
print("Saga事务失败")
return False
print("Saga事务成功")
return True
# 测试Saga
print("\nSaga模式测试:")
saga = Saga()
# 定义步骤和补偿
def step1():
print("步骤1: 创建订单")
def compensation1():
print("补偿1: 取消订单")
def step2():
print("步骤2: 扣减库存")
def compensation2():
print("补偿2: 恢复库存")
def step3():
print("步骤3: 处理支付")
# 模拟支付失败
raise Exception("支付失败")
def compensation3():
print("补偿3: 退款")
saga.add_step(step1, compensation1)
saga.add_step(step2, compensation2)
saga.add_step(step3, compensation3)
# 执行Saga
saga.execute()
分布式锁
分布式锁是确保分布式系统中资源互斥访问的机制。常见的实现包括基于Redis的Redlock和基于ZooKeeper的分布式锁。
import time
import threading
import uuid
from dataclasses import dataclass
from typing import Optional
@dataclass
class DistributedLock:
"""分布式锁接口"""
def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
raise NotImplementedError
def release(self, lock_name: str) -> bool:
raise NotImplementedError
class RedisDistributedLock(DistributedLock):
"""基于Redis的分布式锁(简化实现)"""
def __init__(self):
self.locks = {}
self.lock_owners = {}
def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
"""获取锁"""
if lock_name not in self.locks:
self.locks[lock_name] = True
self.lock_owners[lock_name] = str(uuid.uuid4())
print(f"获取锁成功: {lock_name}, 所有者: {self.lock_owners[lock_name]}")
return True
print(f"获取锁失败: {lock_name} 已被占用")
return False
def release(self, lock_name: str) -> bool:
"""释放锁"""
if lock_name in self.locks:
del self.locks[lock_name]
if lock_name in self.lock_owners:
del self.lock_owners[lock_name]
print(f"释放锁成功: {lock_name}")
return True
print(f"释放锁失败: {lock_name} 不存在")
return False
def is_locked(self, lock_name: str) -> bool:
"""检查锁是否被持有"""
return lock_name in self.locks
# 测试分布式锁
print("分布式锁测试:")
lock = RedisDistributedLock()
# 模拟并发访问
def worker(worker_id: int, lock_name: str):
"""工作线程"""
print(f"工作线程 {worker_id}: 尝试获取锁")
if lock.acquire(lock_name):
try:
print(f"工作线程 {worker_id}: 获取锁成功,执行任务")
time.sleep(0.1) # 模拟任务执行
finally:
lock.release(lock_name)
else:
print(f"工作线程 {worker_id}: 获取锁失败")
# 启动多个工作线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, "resource_lock"))
threads.append(t)
t.start()
for t in threads:
t.join()
# 锁的高级特性
class ReentrantLock(DistributedLock):
"""可重入分布式锁"""
def __init__(self):
self.locks = {}
self.lock_counts = {}
self.lock_owners = {}
def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
"""获取可重入锁"""
current_owner = str(uuid.uuid4())
if lock_name not in self.locks:
self.locks[lock_name] = True
self.lock_counts[lock_name] = 1
self.lock_owners[lock_name] = current_owner
print(f"获取锁成功: {lock_name}, 计数: 1")
return True
if self.lock_owners.get(lock_name) == current_owner:
self.lock_counts[lock_name] += 1
print(f"重入锁成功: {lock_name}, 计数: {self.lock_counts[lock_name]}")
return True
print(f"获取锁失败: {lock_name} 已被占用")
return False
def release(self, lock_name: str) -> bool:
"""释放可重入锁"""
if lock_name not in self.locks:
print(f"释放锁失败: {lock_name} 不存在")
return False
self.lock_counts[lock_name] -= 1
print(f"释放锁: {lock_name}, 剩余计数: {self.lock_counts[lock_name]}")
if self.lock_counts[lock_name] == 0:
del self.locks[lock_name]
del self.lock_counts[lock_name]
if lock_name in self.lock_owners:
del self.lock_owners[lock_name]
print(f"锁完全释放: {lock_name}")
return True
# 测试可重入锁
print("\n可重入锁测试:")
reentrant_lock = ReentrantLock()
# 测试重入
lock_name = "reentrant_resource"
reentrant_lock.acquire(lock_name)
reentrant_lock.acquire(lock_name) # 重入
reentrant_lock.acquire(lock_name) # 再次重入
reentrant_lock.release(lock_name)
reentrant_lock.release(lock_name)
reentrant_lock.release(lock_name)
# 分布式锁最佳实践
print("\n分布式锁最佳实践:")
print("1. 设置锁的过期时间,防止死锁")
print("2. 使用唯一标识确保只有持有者能释放锁")
print("3. 实现锁的续期机制,防止业务未完成锁已过期")
print("4. 考虑使用Redlock算法提高可靠性")
print("5. 在业务层实现幂等性,防止重复执行")
分布式系统是现代软件架构的核心。通过深入理解CAP定理、一致性模型和分布式事务处理,开发者可以设计出可靠、可扩展的分布式系统。掌握这些概念不仅有助于解决分布式系统中的复杂问题,还能帮助开发者做出更好的架构设计决策。