← 返回首页
🌐

分布式系统

📂 python ⏱ 7 min 1248 words

分布式系统

分布式系统是现代软件架构的核心。本文将深入探讨CAP定理、一致性模型和分布式事务处理,帮助开发者设计可靠的分布式系统。

CAP定理

CAP定理指出,分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)中的两个。

from dataclasses import dataclass
from typing import Dict, Any, Optional
import time
import random

@dataclass
class Node:
    id: str
    data: Dict[str, Any]
    is_alive: bool = True
    
    def read(self, key: str) -> Optional[Any]:
        if not self.is_alive:
            raise ConnectionError(f"节点 {self.id} 不可用")
        return self.data.get(key)
    
    def write(self, key: str, value: Any) -> bool:
        if not self.is_alive:
            raise ConnectionError(f"节点 {self.id} 不可用")
        self.data[key] = value
        return True

class CAPDemo:
    """CAP定理演示"""
    
    def __init__(self):
        self.nodes = {
            'node1': Node('node1', {'counter': 0}),
            'node2': Node('node2', {'counter': 0}),
            'node3': Node('node3', {'counter': 0})
        }
    
    def simulate_partition(self, node_ids: list):
        """模拟网络分区"""
        for node_id in node_ids:
            self.nodes[node_id].is_alive = False
        print(f"节点 {node_ids} 发生网络分区")
    
    def heal_partition(self):
        """恢复网络分区"""
        for node in self.nodes.values():
            node.is_alive = True
        print("网络分区已恢复")
    
    def consistency_example(self):
        """一致性示例:写操作需要所有节点确认"""
        print("\n一致性示例:")
        print("写操作需要所有节点确认")
        
        # 模拟写操作
        success_count = 0
        for node_id, node in self.nodes.items():
            if node.is_alive:
                node.write('counter', 1)
                success_count += 1
        
        print(f"成功写入 {success_count}/{len(self.nodes)} 个节点")
        
        # 读取所有节点
        for node_id, node in self.nodes.items():
            if node.is_alive:
                value = node.read('counter')
                print(f"节点 {node_id}: {value}")
    
    def availability_example(self):
        """可用性示例:任何节点都可以处理请求"""
        print("\n可用性示例:")
        print("任何节点都可以处理请求,即使部分节点不可用")
        
        # 模拟分区
        self.simulate_partition(['node2', 'node3'])
        
        # 尝试读写
        try:
            self.nodes['node1'].write('counter', 2)
            print(f"节点1写入成功: {self.nodes['node1'].read('counter')}")
        except ConnectionError as e:
            print(f"写入失败: {e}")
        
        # 恢复分区
        self.heal_partition()
    
    def partition_tolerance_example(self):
        """分区容错性示例:系统在网络分区时继续运行"""
        print("\n分区容错性示例:")
        print("系统在网络分区时继续运行")
        
        # 模拟分区
        self.simulate_partition(['node2'])
        
        # 部分节点不可用,但系统继续运行
        available_nodes = [n for n in self.nodes.values() if n.is_alive]
        print(f"可用节点: {len(available_nodes)}/{len(self.nodes)}")
        
        # 在可用节点上执行操作
        for node in available_nodes:
            node.write('counter', 3)
        
        self.heal_partition()

# 测试CAP
cap_demo = CAPDemo()
cap_demo.consistency_example()
cap_demo.availability_example()
cap_demo.partition_tolerance_example()

一致性模型

分布式系统有多种一致性模型,从强一致性到最终一致性。选择合适的一致性模型对于系统设计至关重要。

import time
import threading
from dataclasses import dataclass
from typing import Dict, List, Any
from enum import Enum

class ConsistencyLevel(Enum):
    STRONG = "strong"
    WEAK = "weak"
    EVENTUAL = "eventual"

@dataclass
class DistributedKVStore:
    """分布式键值存储"""
    
    nodes: Dict[str, Dict[str, Any]]
    consistency_level: ConsistencyLevel
    
    def __init__(self, consistency_level: ConsistencyLevel = ConsistencyLevel.EVENTUAL):
        self.nodes = {}
        self.consistency_level = consistency_level
        self.lock = threading.Lock()
    
    def add_node(self, node_id: str):
        self.nodes[node_id] = {}
    
    def write(self, key: str, value: Any, node_id: str = None):
        """写入操作"""
        if self.consistency_level == ConsistencyLevel.STRONG:
            return self._strong_write(key, value)
        elif self.consistency_level == ConsistencyLevel.WEAK:
            return self._weak_write(key, value, node_id)
        else:
            return self._eventual_write(key, value)
    
    def _strong_write(self, key: str, value: Any):
        """强一致性写入:所有节点必须确认"""
        with self.lock:
            success_count = 0
            for node_id in self.nodes:
                self.nodes[node_id][key] = value
                success_count += 1
            
            print(f"强一致性写入: {key}={value} 到 {success_count} 个节点")
            return success_count == len(self.nodes)
    
    def _weak_write(self, key: str, value: Any, node_id: str):
        """弱一致性写入:只写入一个节点"""
        with self.lock:
            if node_id in self.nodes:
                self.nodes[node_id][key] = value
                print(f"弱一致性写入: {key}={value} 到节点 {node_id}")
                return True
        return False
    
    def _eventual_write(self, key: str, value: Any):
        """最终一致性写入:异步复制到所有节点"""
        with self.lock:
            # 写入主节点
            primary_node = list(self.nodes.keys())[0]
            self.nodes[primary_node][key] = value
            
            # 异步复制到其他节点
            def replicate():
                time.sleep(0.1)  # 模拟网络延迟
                for node_id in list(self.nodes.keys())[1:]:
                    self.nodes[node_id][key] = value
            
            threading.Thread(target=replicate, daemon=True).start()
            print(f"最终一致性写入: {key}={value} 到主节点 {primary_node}")
            return True
    
    def read(self, key: str, node_id: str = None):
        """读取操作"""
        if self.consistency_level == ConsistencyLevel.STRONG:
            return self._strong_read(key)
        elif self.consistency_level == ConsistencyLevel.WEAK:
            return self._weak_read(key, node_id)
        else:
            return self._eventual_read(key)
    
    def _strong_read(self, key: str):
        """强一致性读取:从所有节点读取并验证一致性"""
        with self.lock:
            values = {}
            for node_id, node_data in self.nodes.items():
                if key in node_data:
                    values[node_id] = node_data[key]
            
            if len(set(values.values())) == 1:
                return list(values.values())[0]
            else:
                print(f"一致性冲突: {values}")
                return None
    
    def _weak_read(self, key: str, node_id: str):
        """弱一致性读取:从指定节点读取"""
        with self.lock:
            if node_id in self.nodes:
                return self.nodes[node_id].get(key)
        return None
    
    def _eventual_read(self, key: str):
        """最终一致性读取:从任意节点读取"""
        with self.lock:
            for node_data in self.nodes.values():
                if key in node_data:
                    return node_data[key]
        return None

# 测试一致性模型
print("强一致性模型:")
strong_store = DistributedKVStore(ConsistencyLevel.STRONG)
strong_store.add_node("node1")
strong_store.add_node("node2")
strong_store.add_node("node3")

strong_store.write("key1", "value1")
print(f"读取 key1: {strong_store.read('key1')}")

print("\n弱一致性模型:")
weak_store = DistributedKVStore(ConsistencyLevel.WEAK)
weak_store.add_node("node1")
weak_store.add_node("node2")
weak_store.add_node("node3")

weak_store.write("key2", "value2", "node1")
print(f"从node1读取 key2: {weak_store.read('key2', 'node1')}")
print(f"从node2读取 key2: {weak_store.read('key2', 'node2')}")

print("\n最终一致性模型:")
eventual_store = DistributedKVStore(ConsistencyLevel.EVENTUAL)
eventual_store.add_node("node1")
eventual_store.add_node("node2")
eventual_store.add_node("node3")

eventual_store.write("key3", "value3")
time.sleep(0.2)  # 等待复制完成
print(f"从node2读取 key3: {eventual_store.read('key3')}")

分布式事务

分布式事务是确保跨多个节点操作原子性的机制。常见的实现方式包括两阶段提交、三阶段提交和Saga模式。

import time
import threading
from dataclasses import dataclass
from typing import List, Callable, Any
from enum import Enum

class TransactionState(Enum):
    PENDING = "pending"
    PREPARED = "prepared"
    COMMITTED = "committed"
    ABORTED = "aborted"

@dataclass
class TransactionParticipant:
    """事务参与者"""
    id: str
    state: TransactionState = TransactionState.PENDING
    data: dict = None
    
    def __post_init__(self):
        if self.data is None:
            self.data = {}
    
    def prepare(self) -> bool:
        """准备阶段"""
        print(f"参与者 {self.id}: 准备事务")
        self.state = TransactionState.PREPARED
        return True
    
    def commit(self) -> bool:
        """提交阶段"""
        print(f"参与者 {self.id}: 提交事务")
        self.state = TransactionState.COMMITTED
        return True
    
    def abort(self) -> bool:
        """回滚阶段"""
        print(f"参与者 {self.id}: 回滚事务")
        self.state = TransactionState.ABORTED
        return True

class TwoPhaseCommit:
    """两阶段提交协议"""
    
    def __init__(self):
        self.participants: List[TransactionParticipant] = []
        self.coordinator_state = TransactionState.PENDING
    
    def add_participant(self, participant: TransactionParticipant):
        self.participants.append(participant)
    
    def execute_transaction(self, operations: List[Callable]) -> bool:
        """执行分布式事务"""
        print("开始两阶段提交事务")
        
        # 阶段1:准备
        print("\n阶段1:准备")
        prepared = True
        for participant in self.participants:
            if not participant.prepare():
                prepared = False
                break
        
        if not prepared:
            print("准备阶段失败,回滚所有参与者")
            self._rollback_all()
            return False
        
        # 执行业务操作
        print("\n执行业务操作")
        for operation in operations:
            try:
                operation()
            except Exception as e:
                print(f"业务操作失败: {e}")
                self._rollback_all()
                return False
        
        # 阶段2:提交
        print("\n阶段2:提交")
        committed = True
        for participant in self.participants:
            if not participant.commit():
                committed = False
                break
        
        if committed:
            print("事务提交成功")
            self.coordinator_state = TransactionState.COMMITTED
            return True
        else:
            print("提交阶段失败")
            self.coordinator_state = TransactionState.ABORTED
            return False
    
    def _rollback_all(self):
        """回滚所有参与者"""
        print("回滚所有参与者")
        for participant in self.participants:
            participant.abort()
        self.coordinator_state = TransactionState.ABORTED

# 测试两阶段提交
print("两阶段提交测试:")
tc = TwoPhaseCommit()

# 添加参与者
participant1 = TransactionParticipant("participant1")
participant2 = TransactionParticipant("participant2")
tc.add_participant(participant1)
tc.add_participant(participant2)

# 定义业务操作
def operation1():
    participant1.data["balance"] = 1000
    print("操作1: 设置participant1余额为1000")

def operation2():
    participant2.data["balance"] = 2000
    print("操作2: 设置participant2余额为2000")

# 执行事务
success = tc.execute_transaction([operation1, operation2])
print(f"事务结果: {'成功' if success else '失败'}")

# 三阶段提交
class ThreePhaseCommit:
    """三阶段提交协议"""
    
    def __init__(self):
        self.participants: List[TransactionParticipant] = []
    
    def add_participant(self, participant: TransactionParticipant):
        self.participants.append(participant)
    
    def execute_transaction(self, operations: List[Callable]) -> bool:
        """执行三阶段提交事务"""
        print("开始三阶段提交事务")
        
        # 阶段1:CanCommit
        print("\n阶段1:CanCommit")
        can_commit = True
        for participant in self.participants:
            if not participant.prepare():
                can_commit = False
                break
        
        if not can_commit:
            print("CanCommit阶段失败")
            return False
        
        # 阶段2:PreCommit
        print("\n阶段2:PreCommit")
        for participant in self.participants:
            participant.state = TransactionState.PREPARED
        
        # 执行业务操作
        print("\n执行业务操作")
        for operation in operations:
            operation()
        
        # 阶段3:DoCommit
        print("\n阶段3:DoCommit")
        for participant in self.participants:
            participant.commit()
        
        print("三阶段提交事务完成")
        return True

# Saga模式
class Saga:
    """Saga模式"""
    
    def __init__(self):
        self.steps: List[Callable] = []
        self.compensations: List[Callable] = []
    
    def add_step(self, action: Callable, compensation: Callable):
        self.steps.append(action)
        self.compensations.append(compensation)
    
    def execute(self) -> bool:
        """执行Saga事务"""
        print("开始Saga事务")
        
        executed_steps = []
        
        for i, (step, compensation) in enumerate(zip(self.steps, self.compensations)):
            try:
                print(f"执行步骤 {i+1}")
                step()
                executed_steps.append(i)
            except Exception as e:
                print(f"步骤 {i+1} 失败: {e}")
                print("开始补偿操作")
                
                # 反向执行补偿操作
                for j in reversed(executed_steps):
                    print(f"补偿步骤 {j+1}")
                    self.compensations[j]()
                
                print("Saga事务失败")
                return False
        
        print("Saga事务成功")
        return True

# 测试Saga
print("\nSaga模式测试:")
saga = Saga()

# 定义步骤和补偿
def step1():
    print("步骤1: 创建订单")

def compensation1():
    print("补偿1: 取消订单")

def step2():
    print("步骤2: 扣减库存")

def compensation2():
    print("补偿2: 恢复库存")

def step3():
    print("步骤3: 处理支付")
    # 模拟支付失败
    raise Exception("支付失败")

def compensation3():
    print("补偿3: 退款")

saga.add_step(step1, compensation1)
saga.add_step(step2, compensation2)
saga.add_step(step3, compensation3)

# 执行Saga
saga.execute()

分布式锁

分布式锁是确保分布式系统中资源互斥访问的机制。常见的实现包括基于Redis的Redlock和基于ZooKeeper的分布式锁。

import time
import threading
import uuid
from dataclasses import dataclass
from typing import Optional

@dataclass
class DistributedLock:
    """分布式锁接口"""
    
    def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
        raise NotImplementedError
    
    def release(self, lock_name: str) -> bool:
        raise NotImplementedError

class RedisDistributedLock(DistributedLock):
    """基于Redis的分布式锁(简化实现)"""
    
    def __init__(self):
        self.locks = {}
        self.lock_owners = {}
    
    def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
        """获取锁"""
        if lock_name not in self.locks:
            self.locks[lock_name] = True
            self.lock_owners[lock_name] = str(uuid.uuid4())
            print(f"获取锁成功: {lock_name}, 所有者: {self.lock_owners[lock_name]}")
            return True
        
        print(f"获取锁失败: {lock_name} 已被占用")
        return False
    
    def release(self, lock_name: str) -> bool:
        """释放锁"""
        if lock_name in self.locks:
            del self.locks[lock_name]
            if lock_name in self.lock_owners:
                del self.lock_owners[lock_name]
            print(f"释放锁成功: {lock_name}")
            return True
        
        print(f"释放锁失败: {lock_name} 不存在")
        return False
    
    def is_locked(self, lock_name: str) -> bool:
        """检查锁是否被持有"""
        return lock_name in self.locks

# 测试分布式锁
print("分布式锁测试:")
lock = RedisDistributedLock()

# 模拟并发访问
def worker(worker_id: int, lock_name: str):
    """工作线程"""
    print(f"工作线程 {worker_id}: 尝试获取锁")
    
    if lock.acquire(lock_name):
        try:
            print(f"工作线程 {worker_id}: 获取锁成功,执行任务")
            time.sleep(0.1)  # 模拟任务执行
        finally:
            lock.release(lock_name)
    else:
        print(f"工作线程 {worker_id}: 获取锁失败")

# 启动多个工作线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i, "resource_lock"))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 锁的高级特性
class ReentrantLock(DistributedLock):
    """可重入分布式锁"""
    
    def __init__(self):
        self.locks = {}
        self.lock_counts = {}
        self.lock_owners = {}
    
    def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
        """获取可重入锁"""
        current_owner = str(uuid.uuid4())
        
        if lock_name not in self.locks:
            self.locks[lock_name] = True
            self.lock_counts[lock_name] = 1
            self.lock_owners[lock_name] = current_owner
            print(f"获取锁成功: {lock_name}, 计数: 1")
            return True
        
        if self.lock_owners.get(lock_name) == current_owner:
            self.lock_counts[lock_name] += 1
            print(f"重入锁成功: {lock_name}, 计数: {self.lock_counts[lock_name]}")
            return True
        
        print(f"获取锁失败: {lock_name} 已被占用")
        return False
    
    def release(self, lock_name: str) -> bool:
        """释放可重入锁"""
        if lock_name not in self.locks:
            print(f"释放锁失败: {lock_name} 不存在")
            return False
        
        self.lock_counts[lock_name] -= 1
        print(f"释放锁: {lock_name}, 剩余计数: {self.lock_counts[lock_name]}")
        
        if self.lock_counts[lock_name] == 0:
            del self.locks[lock_name]
            del self.lock_counts[lock_name]
            if lock_name in self.lock_owners:
                del self.lock_owners[lock_name]
            print(f"锁完全释放: {lock_name}")
        
        return True

# 测试可重入锁
print("\n可重入锁测试:")
reentrant_lock = ReentrantLock()

# 测试重入
lock_name = "reentrant_resource"
reentrant_lock.acquire(lock_name)
reentrant_lock.acquire(lock_name)  # 重入
reentrant_lock.acquire(lock_name)  # 再次重入

reentrant_lock.release(lock_name)
reentrant_lock.release(lock_name)
reentrant_lock.release(lock_name)

# 分布式锁最佳实践
print("\n分布式锁最佳实践:")
print("1. 设置锁的过期时间,防止死锁")
print("2. 使用唯一标识确保只有持有者能释放锁")
print("3. 实现锁的续期机制,防止业务未完成锁已过期")
print("4. 考虑使用Redlock算法提高可靠性")
print("5. 在业务层实现幂等性,防止重复执行")

分布式系统是现代软件架构的核心。通过深入理解CAP定理、一致性模型和分布式事务处理,开发者可以设计出可靠、可扩展的分布式系统。掌握这些概念不仅有助于解决分布式系统中的复杂问题,还能帮助开发者做出更好的架构设计决策。