← 返回首页
🧩

观察者模式:事件通知与响应式架构

📂 architecture ⏱ 2 min 394 words

观察者模式:事件通知与响应式架构

观察者模式核心原理

观察者模式定义了对象间的一对多依赖关系,当一个对象状态改变时,所有依赖它的对象都会收到通知并自动更新。这是事件驱动架构的基础模式,实现了发布者和订阅者的松耦合。

// 事件定义
public abstract class DomainEvent {
    private final String eventId;
    private final Instant timestamp;
    
    protected DomainEvent() {
        this.eventId = UUID.randomUUID().toString();
        this.timestamp = Instant.now();
    }
}

// 观察者接口
public interface EventObserver<T extends DomainEvent> {
    void onEvent(T event);
    boolean supports(Class<?> eventType);
}

// 事件总线
public class EventBus {
    private final Map<Class<?>, List<EventObserver<?>>> observers = new ConcurrentHashMap<>();
    
    public <T extends DomainEvent> void subscribe(Class<T> eventType, EventObserver<T> observer) {
        observers.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>()).add(observer);
    }
    
    public <T extends DomainEvent> void publish(T event) {
        List<EventObserver<?>> eventObservers = observers.get(event.getClass());
        if (eventObservers != null) {
            eventObservers.forEach(observer -> {
                try {
                    ((EventObserver<T>) observer).onEvent(event);
                } catch (Exception e) {
                    log.error("Event handling failed: {}", event.getClass().getSimpleName(), e);
                }
            });
        }
    }
}

发布订阅模式实现

发布订阅模式是观察者模式的变体,引入中间件解耦发布者和订阅者,支持消息持久化、重试和广播。

// 内存发布订阅总线
class PubSubBus {
  private subscriptions = new Map<string, Set<Function>>();
  
  subscribe(topic: string, handler: Function): () => void {
    if (!this.subscriptions.has(topic)) {
      this.subscriptions.set(topic, new Set());
    }
    this.subscriptions.get(topic)!.add(handler);
    
    // 返回取消订阅函数
    return () => {
      this.subscriptions.get(topic)?.delete(handler);
    };
  }
  
  publish(topic: string, data: any): void {
    const handlers = this.subscriptions.get(topic);
    if (handlers) {
      handlers.forEach(handler => {
        try {
          handler(data);
        } catch (error) {
          console.error(`Handler error for topic ${topic}:`, error);
        }
      });
    }
  }
  
  // 支持通配符订阅
  subscribePattern(pattern: string, handler: Function): () => void {
    const regex = new RegExp('^' + pattern.replace('*', '.*') + '$');
    const allTopics = Array.from(this.subscriptions.keys());
    
    allTopics.forEach(topic => {
      if (regex.test(topic)) {
        this.subscribe(topic, handler);
      }
    });
    
    return () => {
      allTopics.forEach(topic => {
        if (regex.test(topic)) {
          this.subscriptions.get(topic)?.delete(handler);
        }
      });
    };
  }
}

事件溯源架构

事件溯源将所有状态变更记录为不可变的事件序列,通过重放事件重建当前状态。这是观察者模式的高级应用,提供了完整的审计追踪和时间旅行能力。

// 事件存储
type EventStore struct {
    events []DomainEvent
    mu     sync.RWMutex
}

func (es *EventStore) Append(event DomainEvent) {
    es.mu.Lock()
    defer es.mu.Unlock()
    es.events = append(es.events, event)
}

// 通过事件重放重建状态
func (es *EventStore) Replay(aggregateID string) *Aggregate {
    es.mu.RLock()
    defer es.mu.RUnlock()
    
    agg := NewAggregate(aggregateID)
    for _, event := range es.events {
        if event.AggregateID() == aggregateID {
            agg.Apply(event)
        }
    }
    return agg
}

// 事件订阅者 - 投影到读模型
type ProjectionHandler struct {
    readStore ReadModelStore
}

func (h *ProjectionHandler) Handle(event DomainEvent) {
    switch e := event.(type) {
    case OrderCreatedEvent:
        h.readStore.CreateOrderReadModel(e.OrderID, e.CustomerID, e.Amount)
    case OrderPaidEvent:
        h.readStore.UpdateOrderStatus(e.OrderID, "PAID")
    case OrderShippedEvent:
        h.readStore.UpdateOrderStatus(e.OrderID, "SHIPPED")
    }
}

响应式流处理

响应式架构将观察者模式扩展为背压感知的异步数据流,支持高吞吐量的实时数据处理。

// 使用Reactor实现响应式事件处理
@Service
public class OrderEventReactor {
    private final Sinks.Many<OrderEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer();
    
    public Flux<OrderEvent> subscribe() {
        return eventSink.asFlux();
    }
    
    public void publishEvent(OrderEvent event) {
        Sinks.EmitResult result = eventSink.tryEmitNext(event);
        if (result.isFailure()) {
            log.warn("Failed to emit event: {}", result);
        }
    }
    
    @PostConstruct
    public void init() {
        subscribe()
            .bufferTimeout(100, Duration.ofMillis(500))
            .flatMap(this::batchProcess)
            .subscribe();
    }
    
    private Mono<Void> batchProcess(List<OrderEvent> events) {
        return Flux.fromIterable(events)
            .flatMap(event -> processEvent(event))
            .then();
    }
}