契约测试架构:消费者驱动的Pact实践
契约测试架构:消费者驱动的Pact实践
契约测试概述
契约测试验证服务间的接口契约是否一致,确保消费者和提供者对API的理解一致。相比集成测试,契约测试更轻量且可以独立运行,是微服务测试策略的核心组成部分。
# 契约测试核心概念
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
@dataclass
class Interaction:
description: str
provider_state: str
request: Dict
response: Dict
@dataclass
class ConsumerContract:
consumer: str
provider: str
interactions: List[Interaction]
metadata: Dict = field(default_factory=dict)
@dataclass
class Pact:
consumer: Dict[str, str]
provider: Dict[str, str]
interactions: List[Interaction]
metadata: Dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
class ContractTestBase:
def __init__(self, consumer_name: str, provider_name: str):
self.consumer = consumer_name
self.provider = provider_name
self.interactions: List[Interaction] = []
def given(self, provider_state: str):
"""设置提供者状态"""
self.current_state = provider_state
return self
def upon_receiving(self, description: str):
"""描述交互"""
self.current_description = description
return self
def with_request(self, method: str, path: str,
headers: Dict = None, body: Any = None):
"""设置请求"""
self.current_request = {
"method": method,
"path": path,
"headers": headers or {},
"body": body
}
return self
def will_respond_with(self, status: int, headers: Dict = None,
body: Any = None):
"""设置预期响应"""
interaction = Interaction(
description=self.current_description,
provider_state=self.current_state,
request=self.current_request,
response={
"status": status,
"headers": headers or {},
"body": body
}
)
self.interactions.append(interaction)
return self
def build_pact(self) -> Pact:
"""构建Pact"""
return Pact(
consumer={"name": self.consumer},
provider={"name": self.provider},
interactions=self.interactions
)
# 使用示例
contract = ContractTestBase("OrderService", "UserService")
pact = (
contract
.given("user 123 exists")
.upon_receiving("a request to get user details")
.with_request("GET", "/users/123", headers={"Accept": "application/json"})
.will_respond_with(
status=200,
headers={"Content-Type": "application/json"},
body={"id": 123, "name": "Test User", "email": "test@example.com"}
)
.given("user 456 does not exist")
.upon_receiving("a request for non-existent user")
.with_request("GET", "/users/456")
.will_respond_with(status=404, body={"error": "User not found"})
.build_pact()
)
print(f"Created pact with {len(pact.interactions)} interactions")
消费者端测试
消费者端测试验证消费者对提供者API的使用是否正确。测试生成的契约文件描述了消费者的期望,可共享给提供者进行验证。
# 消费者端契约测试
import aiohttp
import asyncio
from typing import Callable
class ConsumerContractTest:
def __init__(self, consumer_name: str):
self.consumer_name = consumer_name
self.mock_server_url = None
self.contract = None
async def setup_mock_server(self, port: int = 1234):
"""设置Mock服务器"""
self.mock_server_url = f"http://localhost:{port}"
print(f"Mock server started at {self.mock_server_url}")
async def test_get_user(self):
"""测试获取用户接口"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.mock_server_url}/users/123",
headers={"Accept": "application/json"}
) as response:
assert response.status == 200
data = await response.json()
assert data["id"] == 123
assert "name" in data
return data
async def test_user_not_found(self):
"""测试用户不存在的情况"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.mock_server_url}/users/999"
) as response:
assert response.status == 404
data = await response.json()
assert "error" in data
return data
async def run_tests(self, mock_server_port: int = 1234):
"""运行所有消费者测试"""
await self.setup_mock_server(mock_server_port)
results = []
try:
result = await self.test_get_user()
results.append({"test": "get_user", "status": "passed"})
except Exception as e:
results.append({"test": "get_user", "status": "failed", "error": str(e)})
try:
result = await self.test_user_not_found()
results.append({"test": "user_not_found", "status": "passed"})
except Exception as e:
results.append({"test": "user_not_found", "status": "failed", "error": str(e)})
return results
# 生成Pact文件
class PactFileGenerator:
def __init__(self, consumer: str, provider: str):
self.consumer = consumer
self.provider = provider
self.interactions = []
def add_interaction(self, description: str, provider_state: str,
request: Dict, response: Dict):
"""添加交互"""
self.interactions.append({
"description": description,
"providerState": provider_state,
"request": request,
"response": response
})
def generate_pact_file(self) -> Dict:
"""生成Pact文件"""
return {
"consumer": {"name": self.consumer},
"provider": {"name": self.provider},
"interactions": self.interactions,
"metadata": {
"pactSpecification": {"version": "3.0.0"},
"client": {"name": "pact-python", "version": "1.0.0"}
}
}
def save_pact(self, filepath: str):
"""保存Pact文件"""
pact_data = self.generate_pact_file()
with open(filepath, 'w') as f:
json.dump(pact_data, f, indent=2)
print(f"Pact saved to {filepath}")
# 使用示例
generator = PactFileGenerator("OrderService", "UserService")
generator.add_interaction(
description="get user by id",
provider_state="user 123 exists",
request={"method": "GET", "path": "/users/123"},
response={
"status": 200,
"headers": {"Content-Type": "application/json"},
"body": {"id": 123, "name": "Test User"}
}
)
generator.save_pact("pacts/order-service-user-service.json")
提供者端验证
提供者端验证确保实际API行为符合消费者定义的契约。使用Pact Broker存储和共享契约,支持多消费者场景和版本管理。
# 提供者端验证
from typing import List, Dict
import subprocess
class ProviderVerifier:
def __init__(self, provider_name: str, provider_base_url: str):
self.provider_name = provider_name
self.provider_base_url = provider_base_url
self.pact_broker_url = None
self.provider_app = None
def set_pact_broker(self, broker_url: str, token: str = None):
"""设置Pact Broker"""
self.pact_broker_url = broker_url
self.broker_token = token
def set_provider_app(self, app):
"""设置提供者应用"""
self.provider_app = app
def verify_pact(self, pact_file: str) -> Dict:
"""验证单个Pact文件"""
# 读取Pact文件
with open(pact_file, 'r') as f:
pact_data = json.load(f)
results = []
for interaction in pact_data.get("interactions", []):
result = self._verify_interaction(interaction)
results.append(result)
return {
"provider": self.provider_name,
"pact_file": pact_file,
"interactions_verified": len(results),
"passed": all(r["passed"] for r in results),
"results": results
}
def _verify_interaction(self, interaction: Dict) -> Dict:
"""验证单个交互"""
description = interaction["description"]
request = interaction["request"]
expected_response = interaction["response"]
try:
# 模拟API调用
actual_response = self._make_request(request)
# 验证响应
passed = (
actual_response["status"] == expected_response["status"] and
self._compare_body(actual_response.get("body"),
expected_response.get("body"))
)
return {
"interaction": description,
"passed": passed,
"expected_status": expected_response["status"],
"actual_status": actual_response["status"]
}
except Exception as e:
return {
"interaction": description,
"passed": False,
"error": str(e)
}
def _make_request(self, request: Dict) -> Dict:
"""发送请求"""
# 模拟实际HTTP请求
return {"status": 200, "body": {"id": 123, "name": "Test User"}}
def _compare_body(self, actual: Any, expected: Any) -> bool:
"""比较响应体"""
if expected is None:
return True
return actual == expected
def verify_from_broker(self, consumer_tags: List[str] = None) -> Dict:
"""从Pact Broker验证"""
if not self.pact_broker_url:
raise ValueError("Pact Broker URL not set")
# 构建验证命令
cmd = [
"pact-provider-verifier",
"--provider-base-url", self.provider_base_url,
"--provider", self.provider_name,
"--pact-broker-base-url", self.pact_broker_url,
"--provider-app-version", "1.0.0",
"--publish-verification-results"
]
if consumer_tags:
cmd.extend(["--consumer-version-tag"] + consumer_tags)
print(f"Running: {' '.join(cmd)}")
# 执行验证
result = subprocess.run(cmd, capture_output=True, text=True)
return {
"success": result.returncode == 0,
"output": result.stdout,
"errors": result.stderr
}
# 提供者状态设置
class ProviderStateSetup:
def __init__(self):
self.states = {}
def register_state(self, state_name: str, setup_fn: Callable):
"""注册状态设置函数"""
self.states[state_name] = setup_fn
def setup_state(self, state_name: str):
"""执行状态设置"""
setup_fn = self.states.get(state_name)
if setup_fn:
setup_fn()
print(f"Provider state set: {state_name}")
else:
print(f"Unknown state: {state_name}")
def teardown_state(self, state_name: str):
"""清理状态"""
print(f"Tearing down state: {state_name}")
# 使用示例
provider_setup = ProviderStateSetup()
provider_setup.register_state(
"user 123 exists",
lambda: print("Setting up test user in database")
)
provider_setup.register_state(
"user 456 does not exist",
lambda: print("Ensuring user 456 does not exist")
)
Pact Broker集成
Pact Broker是契约的中央存储库,支持版本管理、标签、矩阵视图和Canary发布。通过Webhook实现自动化验证流程。
# Pact Broker客户端
import requests
from typing import List, Dict
class PactBrokerClient:
def __init__(self, base_url: str, token: str = None):
self.base_url = base_url.rstrip('/')
self.token = token
self.headers = {"Accept": "application/hal+json"}
if token:
self.headers["Authorization"] = f"Bearer {token}"
def publish_pact(self, pact_content: Dict,
consumer_version: str,
tags: List[str] = None) -> Dict:
"""发布Pact到Broker"""
consumer = pact_content["consumer"]["name"]
provider = pact_content["provider"]["name"]
url = f"{self.base_url}/pacts/provider/{provider}/consumer/{consumer}/version/{consumer_version}"
response = requests.put(
url,
json=pact_content,
headers=self.headers
)
# 添加标签
if tags:
for tag in tags:
self._tag_version(consumer, consumer_version, tag)
return {
"status": response.status_code,
"url": response.json().get("_links", {}).get("self", {}).get("href")
}
def _tag_version(self, consumer: str, version: str, tag: str):
"""给版本打标签"""
url = f"{self.base_url}/pacticipants/{consumer}/versions/{version}/tags/{tag}"
requests.put(url, headers=self.headers)
def get_pacts_for_provider(self, provider: str,
consumer_version_tag: str = None) -> List[Dict]:
"""获取提供者的所有契约"""
url = f"{self.base_url}/pacts/provider/{provider}/latest"
if consumer_version_tag:
url += f"/{consumer_version_tag}"
response = requests.get(url, headers=self.headers)
return response.json().get("_embedded", {}).get("pacts", [])
def can_i_deploy(self, consumer: str, consumer_version: str,
provider: str, provider_version: str = None) -> Dict:
"""检查是否可以部署"""
url = f"{self.base_url}/can-i-deploy/pactsicipant/{consumer}/version/{consumer_version}/to/{provider}"
if provider_version:
url += f"/{provider_version}"
response = requests.get(url, headers=self.headers)
return response.json()
def get_verification_results(self, provider: str,
provider_version: str) -> List[Dict]:
"""获取验证结果"""
url = f"{self.base_url}/pacts/provider/{provider}/versions/{provider_version}/verification-results"
response = requests.get(url, headers=self.headers)
return response.json().get("_embedded", {}).get("verification-results", [])
# Webhook配置
class PactWebhook:
def __init__(self, broker_client: PactBrokerClient):
self.broker = broker_client
def create_webhook(self, event: str, provider: str,
webhook_url: str) -> Dict:
"""创建Webhook"""
webhook = {
"description": f"Trigger CI for {provider} on {event}",
"provider": {"name": provider},
"events": [{"name": event}],
"request": {
"method": "POST",
"url": webhook_url,
"headers": {"Content-Type": "application/json"}
}
}
url = f"{self.broker.base_url}/webhooks"
response = requests.post(url, json=webhook, headers=self.broker.headers)
return {"status": response.status_code, "webhook": webhook}
# 使用示例
broker = PactBrokerClient("https://pact.example.com", token="xxx")
broker.publish_pact(
pact_content={
"consumer": {"name": "OrderService"},
"provider": {"name": "UserService"},
"interactions": []
},
consumer_version="1.0.0",
tags=["main", "production"]
)
result = broker.can_i_deploy(
consumer="OrderService",
consumer_version="1.0.0",
provider="UserService"
)
print(f"Can deploy: {result.get('summary', {}).get('deployable', False)}")
契约测试最佳实践
契约测试的成功依赖于良好的实践:清晰的交互描述、合理的提供者状态、版本管理和CI/CD集成。
# 契约测试最佳实践
class ContractTestBestPractices:
"""契约测试最佳实践指南"""
@staticmethod
def naming_conventions():
"""命名规范"""
return {
"interaction_description": "应使用清晰、具体的行为描述,如 'get user by valid id'",
"provider_state": "使用 Given 风格,如 'user 123 exists with active status'",
"consumer_names": "使用服务实际名称,如 'OrderService' 而非 'order'"
}
@staticmethod
def interaction_design():
"""交互设计原则"""
return {
"minimal": "每个交互只测试一个场景",
"specific": "避免过于通用的交互描述",
"isolated": "交互之间不应有依赖",
"realistic": "使用真实的请求和响应示例"
}
@staticmethod
def version_management():
"""版本管理策略"""
return {
"semver": "使用语义化版本号",
"tags": "使用标签标记重要版本(main, production)",
"branch": "为每个分支维护独立版本",
"retention": "定期清理旧版本契约"
}
# 契约测试质量检查
class ContractTestValidator:
def __init__(self):
self.issues = []
def validate_pact(self, pact: Dict) -> Dict:
"""验证Pact质量"""
self.issues = []
# 检查交互数量
interactions = pact.get("interactions", [])
if len(interactions) == 0:
self.issues.append({"severity": "error", "message": "No interactions defined"})
# 检查每个交互
for interaction in interactions:
self._validate_interaction(interaction)
return {
"valid": len(self.issues) == 0,
"issues": self.issues
}
def _validate_interaction(self, interaction: Dict):
"""验证单个交互"""
# 检查描述
description = interaction.get("description", "")
if len(description) < 10:
self.issues.append({
"severity": "warning",
"message": f"Description too short: '{description}'"
})
# 检查提供者状态
provider_state = interaction.get("providerState", "")
if not provider_state:
self.issues.append({
"severity": "warning",
"message": "Missing provider state"
})
# 检查请求
request = interaction.get("request", {})
if "method" not in request or "path" not in request:
self.issues.append({
"severity": "error",
"message": "Invalid request definition"
})
# 检查响应
response = interaction.get("response", {})
if "status" not in response:
self.issues.append({
"severity": "error",
"message": "Missing response status"
})
# 使用示例
validator = ContractTestValidator()
result = validator.validate_pact({
"consumer": {"name": "Test"},
"provider": {"name": "TestProvider"},
"interactions": [{
"description": "get user",
"providerState": "user exists",
"request": {"method": "GET", "path": "/users/123"},
"response": {"status": 200, "body": {"id": 123}}
}]
})
print(f"Validation: {result}")