← 返回首页

事件驱动架构:Event Sourcing、CQRS与事件溯源

📂 python ⏱ 3 min 413 words

事件驱动架构:Event Sourcing、CQRS与事件溯源

事件驱动架构(Event-Driven Architecture, EDA)是一种以事件的产生、检测、消费和响应为核心的软件架构模式。在现代分布式系统中,EDA已成为构建高可扩展性、高可观测性和最终一致性的关键架构选择。

事件溯源(Event Sourcing)

事件溯源的核心思想是:不直接存储实体的当前状态,而是存储导致状态变化的所有事件序列。系统的当前状态可以通过重放所有历史事件来重建。

from dataclasses import dataclass, field
from typing import List, Any
from datetime import datetime

@dataclass
class Event:
    event_type: str
    payload: dict
    timestamp: datetime = field(default_factory=datetime.now)

class BankAccount:
    def __init__(self, account_id: str):
        self.account_id = account_id
        self.balance = 0
        self.events: List[Event] = []
    
    def deposit(self, amount: float):
        event = Event(
            event_type="Deposit",
            payload={"amount": amount}
        )
        self._apply(event)
        self.events.append(event)
    
    def withdraw(self, amount: float):
        if amount > self.balance:
            raise ValueError("余额不足")
        event = Event(
            event_type="Withdraw",
            payload={"amount": amount}
        )
        self._apply(event)
        self.events.append(event)
    
    def _apply(self, event: Event):
        if event.event_type == "Deposit":
            self.balance += event.payload["amount"]
        elif event.event_type == "Withdraw":
            self.balance -= event.payload["amount"]
    
    def get_current_state(self) -> dict:
        return {
            "account_id": self.account_id,
            "balance": self.balance,
            "event_count": len(self.events)
        }

Event Store实现

事件存储是事件溯源的基础设施,负责持久化事件并支持事件重放:

import json
import sqlite3
from typing import List

class EventStore:
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self._create_table()
    
    def _create_table(self):
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                aggregate_id TEXT NOT NULL,
                event_type TEXT NOT NULL,
                payload TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                version INTEGER NOT NULL
            )
        ''')
        self.conn.commit()
    
    def append(self, aggregate_id: str, event: Event, version: int):
        self.conn.execute(
            'INSERT INTO events (aggregate_id, event_type, payload, timestamp, version) '
            'VALUES (?, ?, ?, ?, ?)',
            (aggregate_id, event.event_type, 
             json.dumps(event.payload), event.timestamp.isoformat(), version)
        )
        self.conn.commit()
    
    def get_events(self, aggregate_id: str) -> List[Event]:
        cursor = self.conn.execute(
            'SELECT event_type, payload, timestamp FROM events '
            'WHERE aggregate_id = ? ORDER BY version',
            (aggregate_id,)
        )
        events = []
        for row in cursor.fetchall():
            events.append(Event(
                event_type=row[0],
                payload=json.loads(row[1]),
                timestamp=datetime.fromisoformat(row[2])
            ))
        return events
    
    def rebuild_aggregate(self, aggregate_id: str) -> BankAccount:
        account = BankAccount(aggregate_id)
        events = self.get_events(aggregate_id)
        for event in events:
            account._apply(event)
        return account

CQRS(命令查询职责分离)

CQRS将系统的读操作和写操作分离到不同的模型中,读写可以独立优化和扩展:

from abc import ABC, abstractcommand
from typing import Optional

class Command(ABC):
    pass

class DepositCommand(Command):
    def __init__(self, account_id: str, amount: float):
        self.account_id = account_id
        self.amount = amount

class WithdrawCommand(Command):
    def __init__(self, account_id: str, amount: float):
        self.account_id = account_id
        self.amount = amount

class CommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle(self, command: Command):
        if isinstance(command, DepositCommand):
            account = self.event_store.rebuild_aggregate(command.account_id)
            account.deposit(command.amount)
            self.event_store.append(
                command.account_id,
                account.events[-1],
                len(account.events)
            )
        elif isinstance(command, WithdrawCommand):
            account = self.event_store.rebuild_aggregate(command.account_id)
            account.withdraw(command.amount)
            self.event_store.append(
                command.account_id,
                account.events[-1],
                len(account.events)
            )

物化视图(Materialized View)

物化视图是CQRS的读侧实现,通过消费事件流来维护查询友好的数据视图:

class AccountSummaryView:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.summary = {}
    
    def project_event(self, event: Event, account_id: str):
        if account_id not in self.summary:
            self.summary[account_id] = {
                "total_deposits": 0,
                "total_withdrawals": 0,
                "current_balance": 0,
                "transaction_count": 0
            }
        
        view = self.summary[account_id]
        if event.event_type == "Deposit":
            view["total_deposits"] += event.payload["amount"]
            view["current_balance"] += event.payload["amount"]
        elif event.event_type == "Withdraw":
            view["total_withdrawals"] += event.payload["amount"]
            view["current_balance"] -= event.payload["amount"]
        
        view["transaction_count"] += 1
    
    def rebuild(self):
        self.summary.clear()
        for account_id in self.get_all_accounts():
            events = self.event_store.get_events(account_id)
            for event in events:
                self.project_event(event, account_id)

最佳实践与权衡

事件版本管理:当事件schema需要演化时,使用事件版本控制来确保向后兼容性。

快照机制:对于事件数量巨大的聚合根,定期创建快照以避免每次查询都重放所有事件。

事件存储选型:PostgreSQL、EventStoreDB、Kafka等都可以作为事件存储,选择时需考虑持久性、查询性能和扩展性。

事件驱动架构通过将系统行为建模为事件流,提供了出色的可观测性和可审计性,是构建复杂分布式系统的有力工具。