← 返回首页
🔗

Webhook架构

📂 architecture ⏱ 2 min 230 words

Webhook架构

Webhook工作原理

Webhook是一种基于HTTP的事件推送机制,也被称为反向API或回调URL。与传统的轮询模式不同,Webhook采用推送模式:当源系统中发生特定事件时,源系统主动向预先注册的URL发送HTTP请求,通知目标系统事件的发生。

Webhook的工作流程:首先,消费者(接收方)向提供者(发送方)注册一个回调URL和感兴趣的事件类型;然后,当事件发生时,提供者向回调URL发送HTTP POST请求,包含事件的详细数据;最后,消费者处理接收到的数据并返回适当的响应。

from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)

WEBHOOK_SECRET = "your-webhook-secret"

@app.route("/webhooks/payments", methods=["POST"])
def handle_payment_webhook():
    """处理支付网关的Webhook通知"""
    # 验证签名
    signature = request.headers.get("X-Webhook-Signature")
    if not verify_signature(request.data, signature, WEBHOOK_SECRET):
        return jsonify({"error": "签名验证失败"}), 401
    
    event = request.json
    event_type = event.get("type")
    
    if event_type == "payment.succeeded":
        handle_payment_success(event["data"])
    elif event_type == "payment.failed":
        handle_payment_failure(event["data"])
    
    return jsonify({"received": True}), 200

def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
    """验证Webhook签名"""
    expected = hmac.new(
        secret.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(signature, expected)

签名与安全

Webhook的安全性至关重要,因为回调URL暴露在公网上。签名验证是确保Webhook来源可信的核心机制。提供者在发送Webhook时,使用共享密钥对请求体进行签名,并将签名放在请求头中;消费者收到请求后,使用相同的密钥重新计算签名并比对。

除了签名验证,Webhook的安全实践还包括:使用HTTPS传输、验证来源IP地址、设置合理的超时时间、实现幂等性处理(避免重复处理)、记录完整的请求日志。

class WebhookSecurity:
    def __init__(self, secret: str):
        self.secret = secret
        self.trusted_ips = []
    
    def generate_signature(self, payload: bytes) -> str:
        """生成HMAC-SHA256签名"""
        return hmac.new(
            self.secret.encode(), payload, hashlib.sha256
        ).hexdigest()
    
    def verify_request(self, request) -> bool:
        """验证Webhook请求的完整性和来源"""
        # 1. 验证IP白名单
        if self.trusted_ips and request.remote_addr not in self.trusted_ips:
            return False
        
        # 2. 验证签名
        signature = request.headers.get("X-Webhook-Signature")
        expected = self.generate_signature(request.data)
        if not hmac.compare_digest(signature, expected):
            return False
        
        # 3. 验证时间戳(防止重放攻击)
        timestamp = int(request.headers.get("X-Webhook-Timestamp", 0))
        if abs(time.time() - timestamp) > 300:  # 5分钟有效期
            return False
        
        return True

重试机制

Webhook的重试机制是确保事件可靠传递的关键。当消费者返回非2xx响应或连接超时时,提供者应该按照退避策略重试发送。重试策略通常采用指数退避(Exponential Backoff),即每次重试的间隔时间逐渐增加。

重试机制的设计要点包括:设置最大重试次数(通常3-5次)、使用指数退避算法、实现死信队列(存储多次重试失败的事件)、提供手动重试接口。消费者端应该实现幂等性处理,确保重复的Webhook不会导致重复的业务操作。

class WebhookRetryHandler:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def send_with_retry(self, url: str, payload: dict) -> bool:
        """带重试的Webhook发送"""
        for attempt in range(self.max_retries + 1):
            try:
                response = requests.post(url, json=payload, timeout=10)
                if response.status_code == 200:
                    return True
                if response.status_code >= 500:
                    # 服务器错误,继续重试
                    delay = self.base_delay * (2 ** attempt)
                    time.sleep(delay)
                    continue
                # 客户端错误,不重试
                return False
            except requests.RequestException:
                if attempt < self.max_retries:
                    delay = self.base_delay * (2 ** attempt)
                    time.sleep(delay)
                continue
        return False