← 返回首页
📐

一致性模型:强一致、因果一致与最终一致

📂 architecture ⏱ 4 min 624 words

一致性模型:强一致、因果一致与最终一致

一致性模型概览

一致性模型定义了分布式系统中数据可见性的保证级别。从强到弱,主要有一致性模型包括:线性一致性、顺序一致性、因果一致性、最终一致性。

一致性模型层次:

线性一致性(Linearizability):
  最强的一致性模型
  任何读操作都能读到最新的写操作结果
  保证操作的全局实时顺序

顺序一致性(Sequential Consistency):
  保证所有操作的全局顺序一致
  但不要求是最新值
  比线性一致性弱

因果一致性(Causal Consistency):
  保证因果关系的操作顺序
  没有因果关系的操作可以任意顺序
  比顺序一致性弱

最终一致性(Eventual Consistency):
  最弱的一致性模型
  不保证读到最新值,但最终会一致
  性能最好

线性一致性实现

线性一致性要求任何读操作都能读到最新的写操作结果,实现起来最复杂。

// 线性一致性实现
public class LinearizabilityService {
    private final Map<String, VersionedValue> store = new ConcurrentHashMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
    // 写操作
    public void write(String key, String value) {
        lock.writeLock().lock();
        try {
            // 获取当前版本号
            VersionedValue current = store.get(key);
            long newVersion = (current == null) ? 1 : current.getVersion() + 1;
            
            // 写入新值
            store.put(key, new VersionedValue(value, newVersion));
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    // 读操作
    public String read(String key) {
        lock.readLock().lock();
        try {
            VersionedValue value = store.get(key);
            return value == null ? null : value.getValue();
        } finally {
            lock.readLock().unlock();
        }
    }
    
    // CAS操作(Compare-And-Swap)
    public boolean compareAndSwap(String key, String expected, String newValue) {
        lock.writeLock().lock();
        try {
            VersionedValue current = store.get(key);
            String currentValue = current == null ? null : current.getValue();
            
            if (expected.equals(currentValue)) {
                long newVersion = (current == null) ? 1 : current.getVersion() + 1;
                store.put(key, new VersionedValue(newValue, newVersion));
                return true;
            }
            return false;
        } finally {
            lock.writeLock().unlock();
        }
    }
}

// 版本化值
class VersionedValue {
    private final String value;
    private final long version;
    
    public VersionedValue(String value, long version) {
        this.value = value;
        this.version = version;
    }
    
    public String getValue() { return value; }
    public long getVersion() { return version; }
}

因果一致性实现

因果一致性保证有因果关系的操作按顺序执行,但没有因果关系的操作可以任意顺序。

# 因果一致性实现
class CausalConsistencyService:
    def __init__(self):
        self.vector_clocks = {}  # node_id -> VectorClock
        self.pending_messages = {}  # VectorClock -> message
    
    def send_message(self, sender_id, message):
        """发送消息"""
        # 1. 更新向量时钟
        if sender_id not in self.vector_clocks:
            self.vector_clocks[sender_id] = VectorClock()
        
        self.vector_clocks[sender_id].increment(sender_id)
        message.vector_clock = self.vector_clocks[sender_id].copy()
        
        # 2. 发送消息
        self.broadcast(message)
    
    def receive_message(self, receiver_id, message):
        """接收消息"""
        # 1. 检查是否可以立即处理
        if self.can_process_immediately(message):
            self.process_message(message)
            self.update_vector_clock(receiver_id, message.vector_clock)
        else:
            # 2. 保存到待处理队列
            self.pending_messages[message.id] = message
    
    def can_process_immediately(self, message):
        """检查是否可以立即处理"""
        # 检查是否所有因果相关的消息都已处理
        sender_clock = message.vector_clock
        
        for node_id, counter in sender_clock.clocks.items():
            if node_id == message.sender_id:
                # 发送者的时钟应该比本地时钟大1
                local_counter = self.vector_clocks.get(message.sender_id, VectorClock()).get(node_id)
                if counter != local_counter + 1:
                    return False
            else:
                # 其他节点的时钟应该不大于本地时钟
                local_counter = self.vector_clocks.get(node_id, VectorClock()).get(node_id)
                if counter > local_counter:
                    return False
        
        return True
    
    def process_pending_messages(self, receiver_id):
        """处理待处理的消息"""
        processed = True
        while processed:
            processed = False
            for msg_id, message in list(self.pending_messages.items()):
                if self.can_process_immediately(message):
                    self.process_message(message)
                    self.update_vector_clock(receiver_id, message.vector_clock)
                    del self.pending_messages[msg_id]
                    processed = True

# 向量时钟
class VectorClock:
    def __init__(self):
        self.clocks = {}
    
    def increment(self, node_id):
        self.clocks[node_id] = self.clocks.get(node_id, 0) + 1
    
    def get(self, node_id):
        return self.clocks.get(node_id, 0)
    
    def merge(self, other):
        for node_id, counter in other.clocks.items():
            self.clocks[node_id] = max(self.get(node_id), counter)
    
    def happens_before(self, other):
        """检查是否在其他之前发生"""
        for node_id, counter in self.clocks.items():
            if counter > other.get(node_id):
                return False
        return True

最终一致性实现

最终一致性是最弱的一致性模型,但性能最好,实现最简单。

// 最终一致性实现
@Service
public class EventualConsistencyService {
    @Autowired
    private ReplicationManager replicationManager;
    @Autowired
    private ConflictResolver conflictResolver;
    
    // 本地写入
    public void write(String key, String value) {
        // 1. 写入本地
        localStore.put(key, value);
        
        // 2. 异步复制到其他副本
        replicationManager.replicateAsync(key, value);
    }
    
    // 读取(可能读到旧数据)
    public String read(String key) {
        // 从任意副本读取
        return readFromAnyReplica(key);
    }
    
    // 读取最新版本
    public String readLatest(String key) {
        // 从主副本读取
        return readFromPrimaryReplica(key);
    }
}

// 冲突解决器
@Component
public class ConflictResolver {
    
    // 解决写冲突
    public ResolvedConflict resolveWriteConflict(String key, 
                                                  List<WriteOperation> conflicts) {
        // 1. 使用Last-Write-Wins策略
        WriteOperation latest = conflicts.stream()
            .max(Comparator.comparing(WriteOperation::getTimestamp))
            .orElse(null);
        
        // 2. 或使用业务逻辑解决
        if (latest == null) {
            latest = resolveByBusinessLogic(key, conflicts);
        }
        
        return ResolvedConflict.builder()
            .key(key)
            .resolvedValue(latest.getValue())
            .strategy("LWW")
            .build();
    }
    
    // 业务逻辑解决
    private WriteOperation resolveByBusinessLogic(String key, 
                                                   List<WriteOperation> conflicts) {
        // 根据具体业务场景实现
        // 例如:购物车合并
        if (key.startsWith("cart:")) {
            return mergeCartOperations(conflicts);
        }
        
        // 默认使用LWW
        return conflicts.stream()
            .max(Comparator.comparing(WriteOperation::getTimestamp))
            .orElse(null);
    }
}

一致性模型选择

选择一致性模型需要根据业务需求、性能要求、数据重要性等因素综合考虑。

一致性模型选择指南:

金融系统:
  银行转账:线性一致性
  账户余额:线性一致性
  交易记录:顺序一致性

电商系统:
  库存扣减:线性一致性
  订单状态:顺序一致性
  商品信息:最终一致性

社交系统:
  用户资料:最终一致性
  关注关系:因果一致性
  消息系统:因果一致性

内容系统:
  文章发布:最终一致性
  评论系统:因果一致性
  协作编辑:线性一致性

一致性与可用性权衡

一致性级别的选择直接影响系统的可用性和性能。

# 一致性与可用性分析
class ConsistencyAvailabilityAnalyzer:
    def analyze_tradeoff(self, consistency_model):
        """分析一致性和可用性的权衡"""
        tradeoffs = {
            'LINEARIZABILITY': {
                'consistency': 'STRONG',
                'availability': 'LOW',
                'performance': 'LOW',
                'complexity': 'HIGH',
                'use_cases': ['金融交易', '库存管理'],
            },
            'SEQUENTIAL': {
                'consistency': 'MEDIUM',
                'availability': 'MEDIUM',
                'performance': 'MEDIUM',
                'complexity': 'MEDIUM',
                'use_cases': ['订单系统', '用户认证'],
            },
            'CAUSAL': {
                'consistency': 'MEDIUM',
                'availability': 'HIGH',
                'performance': 'HIGH',
                'complexity': 'MEDIUM',
                'use_cases': ['消息系统', '协作编辑'],
            },
            'EVENTUAL': {
                'consistency': 'LOW',
                'availability': 'HIGH',
                'performance': 'HIGH',
                'complexity': 'LOW',
                'use_cases': ['缓存系统', '社交Feed'],
            },
        }
        
        return tradeoffs.get(consistency_model)