事件驱动架构:Event Sourcing、CQRS与事件溯源
事件驱动架构:Event Sourcing、CQRS与事件溯源
事件驱动架构(Event-Driven Architecture, EDA)是一种以事件的产生、检测、消费和响应为核心的软件架构模式。在现代分布式系统中,EDA已成为构建高可扩展性、高可观测性和最终一致性的关键架构选择。
事件溯源(Event Sourcing)
事件溯源的核心思想是:不直接存储实体的当前状态,而是存储导致状态变化的所有事件序列。系统的当前状态可以通过重放所有历史事件来重建。
from dataclasses import dataclass, field
from typing import List, Any
from datetime import datetime
@dataclass
class Event:
event_type: str
payload: dict
timestamp: datetime = field(default_factory=datetime.now)
class BankAccount:
def __init__(self, account_id: str):
self.account_id = account_id
self.balance = 0
self.events: List[Event] = []
def deposit(self, amount: float):
event = Event(
event_type="Deposit",
payload={"amount": amount}
)
self._apply(event)
self.events.append(event)
def withdraw(self, amount: float):
if amount > self.balance:
raise ValueError("余额不足")
event = Event(
event_type="Withdraw",
payload={"amount": amount}
)
self._apply(event)
self.events.append(event)
def _apply(self, event: Event):
if event.event_type == "Deposit":
self.balance += event.payload["amount"]
elif event.event_type == "Withdraw":
self.balance -= event.payload["amount"]
def get_current_state(self) -> dict:
return {
"account_id": self.account_id,
"balance": self.balance,
"event_count": len(self.events)
}
Event Store实现
事件存储是事件溯源的基础设施,负责持久化事件并支持事件重放:
import json
import sqlite3
from typing import List
class EventStore:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self._create_table()
def _create_table(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
timestamp TEXT NOT NULL,
version INTEGER NOT NULL
)
''')
self.conn.commit()
def append(self, aggregate_id: str, event: Event, version: int):
self.conn.execute(
'INSERT INTO events (aggregate_id, event_type, payload, timestamp, version) '
'VALUES (?, ?, ?, ?, ?)',
(aggregate_id, event.event_type,
json.dumps(event.payload), event.timestamp.isoformat(), version)
)
self.conn.commit()
def get_events(self, aggregate_id: str) -> List[Event]:
cursor = self.conn.execute(
'SELECT event_type, payload, timestamp FROM events '
'WHERE aggregate_id = ? ORDER BY version',
(aggregate_id,)
)
events = []
for row in cursor.fetchall():
events.append(Event(
event_type=row[0],
payload=json.loads(row[1]),
timestamp=datetime.fromisoformat(row[2])
))
return events
def rebuild_aggregate(self, aggregate_id: str) -> BankAccount:
account = BankAccount(aggregate_id)
events = self.get_events(aggregate_id)
for event in events:
account._apply(event)
return account
CQRS(命令查询职责分离)
CQRS将系统的读操作和写操作分离到不同的模型中,读写可以独立优化和扩展:
from abc import ABC, abstractcommand
from typing import Optional
class Command(ABC):
pass
class DepositCommand(Command):
def __init__(self, account_id: str, amount: float):
self.account_id = account_id
self.amount = amount
class WithdrawCommand(Command):
def __init__(self, account_id: str, amount: float):
self.account_id = account_id
self.amount = amount
class CommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle(self, command: Command):
if isinstance(command, DepositCommand):
account = self.event_store.rebuild_aggregate(command.account_id)
account.deposit(command.amount)
self.event_store.append(
command.account_id,
account.events[-1],
len(account.events)
)
elif isinstance(command, WithdrawCommand):
account = self.event_store.rebuild_aggregate(command.account_id)
account.withdraw(command.amount)
self.event_store.append(
command.account_id,
account.events[-1],
len(account.events)
)
物化视图(Materialized View)
物化视图是CQRS的读侧实现,通过消费事件流来维护查询友好的数据视图:
class AccountSummaryView:
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.summary = {}
def project_event(self, event: Event, account_id: str):
if account_id not in self.summary:
self.summary[account_id] = {
"total_deposits": 0,
"total_withdrawals": 0,
"current_balance": 0,
"transaction_count": 0
}
view = self.summary[account_id]
if event.event_type == "Deposit":
view["total_deposits"] += event.payload["amount"]
view["current_balance"] += event.payload["amount"]
elif event.event_type == "Withdraw":
view["total_withdrawals"] += event.payload["amount"]
view["current_balance"] -= event.payload["amount"]
view["transaction_count"] += 1
def rebuild(self):
self.summary.clear()
for account_id in self.get_all_accounts():
events = self.event_store.get_events(account_id)
for event in events:
self.project_event(event, account_id)
最佳实践与权衡
事件版本管理:当事件schema需要演化时,使用事件版本控制来确保向后兼容性。
快照机制:对于事件数量巨大的聚合根,定期创建快照以避免每次查询都重放所有事件。
事件存储选型:PostgreSQL、EventStoreDB、Kafka等都可以作为事件存储,选择时需考虑持久性、查询性能和扩展性。
事件驱动架构通过将系统行为建模为事件流,提供了出色的可观测性和可审计性,是构建复杂分布式系统的有力工具。