Webhook架构
Webhook架构
Webhook工作原理
Webhook是一种基于HTTP的事件推送机制,也被称为反向API或回调URL。与传统的轮询模式不同,Webhook采用推送模式:当源系统中发生特定事件时,源系统主动向预先注册的URL发送HTTP请求,通知目标系统事件的发生。
Webhook的工作流程:首先,消费者(接收方)向提供者(发送方)注册一个回调URL和感兴趣的事件类型;然后,当事件发生时,提供者向回调URL发送HTTP POST请求,包含事件的详细数据;最后,消费者处理接收到的数据并返回适当的响应。
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = "your-webhook-secret"
@app.route("/webhooks/payments", methods=["POST"])
def handle_payment_webhook():
"""处理支付网关的Webhook通知"""
# 验证签名
signature = request.headers.get("X-Webhook-Signature")
if not verify_signature(request.data, signature, WEBHOOK_SECRET):
return jsonify({"error": "签名验证失败"}), 401
event = request.json
event_type = event.get("type")
if event_type == "payment.succeeded":
handle_payment_success(event["data"])
elif event_type == "payment.failed":
handle_payment_failure(event["data"])
return jsonify({"received": True}), 200
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
"""验证Webhook签名"""
expected = hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
签名与安全
Webhook的安全性至关重要,因为回调URL暴露在公网上。签名验证是确保Webhook来源可信的核心机制。提供者在发送Webhook时,使用共享密钥对请求体进行签名,并将签名放在请求头中;消费者收到请求后,使用相同的密钥重新计算签名并比对。
除了签名验证,Webhook的安全实践还包括:使用HTTPS传输、验证来源IP地址、设置合理的超时时间、实现幂等性处理(避免重复处理)、记录完整的请求日志。
class WebhookSecurity:
def __init__(self, secret: str):
self.secret = secret
self.trusted_ips = []
def generate_signature(self, payload: bytes) -> str:
"""生成HMAC-SHA256签名"""
return hmac.new(
self.secret.encode(), payload, hashlib.sha256
).hexdigest()
def verify_request(self, request) -> bool:
"""验证Webhook请求的完整性和来源"""
# 1. 验证IP白名单
if self.trusted_ips and request.remote_addr not in self.trusted_ips:
return False
# 2. 验证签名
signature = request.headers.get("X-Webhook-Signature")
expected = self.generate_signature(request.data)
if not hmac.compare_digest(signature, expected):
return False
# 3. 验证时间戳(防止重放攻击)
timestamp = int(request.headers.get("X-Webhook-Timestamp", 0))
if abs(time.time() - timestamp) > 300: # 5分钟有效期
return False
return True
重试机制
Webhook的重试机制是确保事件可靠传递的关键。当消费者返回非2xx响应或连接超时时,提供者应该按照退避策略重试发送。重试策略通常采用指数退避(Exponential Backoff),即每次重试的间隔时间逐渐增加。
重试机制的设计要点包括:设置最大重试次数(通常3-5次)、使用指数退避算法、实现死信队列(存储多次重试失败的事件)、提供手动重试接口。消费者端应该实现幂等性处理,确保重复的Webhook不会导致重复的业务操作。
class WebhookRetryHandler:
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def send_with_retry(self, url: str, payload: dict) -> bool:
"""带重试的Webhook发送"""
for attempt in range(self.max_retries + 1):
try:
response = requests.post(url, json=payload, timeout=10)
if response.status_code == 200:
return True
if response.status_code >= 500:
# 服务器错误,继续重试
delay = self.base_delay * (2 ** attempt)
time.sleep(delay)
continue
# 客户端错误,不重试
return False
except requests.RequestException:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
time.sleep(delay)
continue
return False