观察者模式:事件通知与响应式架构
观察者模式:事件通知与响应式架构
观察者模式核心原理
观察者模式定义了对象间的一对多依赖关系,当一个对象状态改变时,所有依赖它的对象都会收到通知并自动更新。这是事件驱动架构的基础模式,实现了发布者和订阅者的松耦合。
// 事件定义
public abstract class DomainEvent {
private final String eventId;
private final Instant timestamp;
protected DomainEvent() {
this.eventId = UUID.randomUUID().toString();
this.timestamp = Instant.now();
}
}
// 观察者接口
public interface EventObserver<T extends DomainEvent> {
void onEvent(T event);
boolean supports(Class<?> eventType);
}
// 事件总线
public class EventBus {
private final Map<Class<?>, List<EventObserver<?>>> observers = new ConcurrentHashMap<>();
public <T extends DomainEvent> void subscribe(Class<T> eventType, EventObserver<T> observer) {
observers.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>()).add(observer);
}
public <T extends DomainEvent> void publish(T event) {
List<EventObserver<?>> eventObservers = observers.get(event.getClass());
if (eventObservers != null) {
eventObservers.forEach(observer -> {
try {
((EventObserver<T>) observer).onEvent(event);
} catch (Exception e) {
log.error("Event handling failed: {}", event.getClass().getSimpleName(), e);
}
});
}
}
}
发布订阅模式实现
发布订阅模式是观察者模式的变体,引入中间件解耦发布者和订阅者,支持消息持久化、重试和广播。
// 内存发布订阅总线
class PubSubBus {
private subscriptions = new Map<string, Set<Function>>();
subscribe(topic: string, handler: Function): () => void {
if (!this.subscriptions.has(topic)) {
this.subscriptions.set(topic, new Set());
}
this.subscriptions.get(topic)!.add(handler);
// 返回取消订阅函数
return () => {
this.subscriptions.get(topic)?.delete(handler);
};
}
publish(topic: string, data: any): void {
const handlers = this.subscriptions.get(topic);
if (handlers) {
handlers.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`Handler error for topic ${topic}:`, error);
}
});
}
}
// 支持通配符订阅
subscribePattern(pattern: string, handler: Function): () => void {
const regex = new RegExp('^' + pattern.replace('*', '.*') + '$');
const allTopics = Array.from(this.subscriptions.keys());
allTopics.forEach(topic => {
if (regex.test(topic)) {
this.subscribe(topic, handler);
}
});
return () => {
allTopics.forEach(topic => {
if (regex.test(topic)) {
this.subscriptions.get(topic)?.delete(handler);
}
});
};
}
}
事件溯源架构
事件溯源将所有状态变更记录为不可变的事件序列,通过重放事件重建当前状态。这是观察者模式的高级应用,提供了完整的审计追踪和时间旅行能力。
// 事件存储
type EventStore struct {
events []DomainEvent
mu sync.RWMutex
}
func (es *EventStore) Append(event DomainEvent) {
es.mu.Lock()
defer es.mu.Unlock()
es.events = append(es.events, event)
}
// 通过事件重放重建状态
func (es *EventStore) Replay(aggregateID string) *Aggregate {
es.mu.RLock()
defer es.mu.RUnlock()
agg := NewAggregate(aggregateID)
for _, event := range es.events {
if event.AggregateID() == aggregateID {
agg.Apply(event)
}
}
return agg
}
// 事件订阅者 - 投影到读模型
type ProjectionHandler struct {
readStore ReadModelStore
}
func (h *ProjectionHandler) Handle(event DomainEvent) {
switch e := event.(type) {
case OrderCreatedEvent:
h.readStore.CreateOrderReadModel(e.OrderID, e.CustomerID, e.Amount)
case OrderPaidEvent:
h.readStore.UpdateOrderStatus(e.OrderID, "PAID")
case OrderShippedEvent:
h.readStore.UpdateOrderStatus(e.OrderID, "SHIPPED")
}
}
响应式流处理
响应式架构将观察者模式扩展为背压感知的异步数据流,支持高吞吐量的实时数据处理。
// 使用Reactor实现响应式事件处理
@Service
public class OrderEventReactor {
private final Sinks.Many<OrderEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer();
public Flux<OrderEvent> subscribe() {
return eventSink.asFlux();
}
public void publishEvent(OrderEvent event) {
Sinks.EmitResult result = eventSink.tryEmitNext(event);
if (result.isFailure()) {
log.warn("Failed to emit event: {}", result);
}
}
@PostConstruct
public void init() {
subscribe()
.bufferTimeout(100, Duration.ofMillis(500))
.flatMap(this::batchProcess)
.subscribe();
}
private Mono<Void> batchProcess(List<OrderEvent> events) {
return Flux.fromIterable(events)
.flatMap(event -> processEvent(event))
.then();
}
}