← 返回首页
🧪

契约测试架构:消费者驱动的Pact实践

📂 architecture ⏱ 7 min 1308 words

契约测试架构:消费者驱动的Pact实践

契约测试概述

契约测试验证服务间的接口契约是否一致,确保消费者和提供者对API的理解一致。相比集成测试,契约测试更轻量且可以独立运行,是微服务测试策略的核心组成部分。

# 契约测试核心概念
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
import json

@dataclass
class Interaction:
    description: str
    provider_state: str
    request: Dict
    response: Dict

@dataclass
class ConsumerContract:
    consumer: str
    provider: str
    interactions: List[Interaction]
    metadata: Dict = field(default_factory=dict)

@dataclass
class Pact:
    consumer: Dict[str, str]
    provider: Dict[str, str]
    interactions: List[Interaction]
    metadata: Dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.now)

class ContractTestBase:
    def __init__(self, consumer_name: str, provider_name: str):
        self.consumer = consumer_name
        self.provider = provider_name
        self.interactions: List[Interaction] = []
    
    def given(self, provider_state: str):
        """设置提供者状态"""
        self.current_state = provider_state
        return self
    
    def upon_receiving(self, description: str):
        """描述交互"""
        self.current_description = description
        return self
    
    def with_request(self, method: str, path: str, 
                    headers: Dict = None, body: Any = None):
        """设置请求"""
        self.current_request = {
            "method": method,
            "path": path,
            "headers": headers or {},
            "body": body
        }
        return self
    
    def will_respond_with(self, status: int, headers: Dict = None,
                         body: Any = None):
        """设置预期响应"""
        interaction = Interaction(
            description=self.current_description,
            provider_state=self.current_state,
            request=self.current_request,
            response={
                "status": status,
                "headers": headers or {},
                "body": body
            }
        )
        self.interactions.append(interaction)
        return self
    
    def build_pact(self) -> Pact:
        """构建Pact"""
        return Pact(
            consumer={"name": self.consumer},
            provider={"name": self.provider},
            interactions=self.interactions
        )

# 使用示例
contract = ContractTestBase("OrderService", "UserService")

pact = (
    contract
    .given("user 123 exists")
    .upon_receiving("a request to get user details")
    .with_request("GET", "/users/123", headers={"Accept": "application/json"})
    .will_respond_with(
        status=200,
        headers={"Content-Type": "application/json"},
        body={"id": 123, "name": "Test User", "email": "test@example.com"}
    )
    .given("user 456 does not exist")
    .upon_receiving("a request for non-existent user")
    .with_request("GET", "/users/456")
    .will_respond_with(status=404, body={"error": "User not found"})
    .build_pact()
)

print(f"Created pact with {len(pact.interactions)} interactions")

消费者端测试

消费者端测试验证消费者对提供者API的使用是否正确。测试生成的契约文件描述了消费者的期望,可共享给提供者进行验证。

# 消费者端契约测试
import aiohttp
import asyncio
from typing import Callable

class ConsumerContractTest:
    def __init__(self, consumer_name: str):
        self.consumer_name = consumer_name
        self.mock_server_url = None
        self.contract = None
    
    async def setup_mock_server(self, port: int = 1234):
        """设置Mock服务器"""
        self.mock_server_url = f"http://localhost:{port}"
        print(f"Mock server started at {self.mock_server_url}")
    
    async def test_get_user(self):
        """测试获取用户接口"""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.mock_server_url}/users/123",
                headers={"Accept": "application/json"}
            ) as response:
                assert response.status == 200
                data = await response.json()
                assert data["id"] == 123
                assert "name" in data
                return data
    
    async def test_user_not_found(self):
        """测试用户不存在的情况"""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.mock_server_url}/users/999"
            ) as response:
                assert response.status == 404
                data = await response.json()
                assert "error" in data
                return data
    
    async def run_tests(self, mock_server_port: int = 1234):
        """运行所有消费者测试"""
        await self.setup_mock_server(mock_server_port)
        
        results = []
        
        try:
            result = await self.test_get_user()
            results.append({"test": "get_user", "status": "passed"})
        except Exception as e:
            results.append({"test": "get_user", "status": "failed", "error": str(e)})
        
        try:
            result = await self.test_user_not_found()
            results.append({"test": "user_not_found", "status": "passed"})
        except Exception as e:
            results.append({"test": "user_not_found", "status": "failed", "error": str(e)})
        
        return results

# 生成Pact文件
class PactFileGenerator:
    def __init__(self, consumer: str, provider: str):
        self.consumer = consumer
        self.provider = provider
        self.interactions = []
    
    def add_interaction(self, description: str, provider_state: str,
                       request: Dict, response: Dict):
        """添加交互"""
        self.interactions.append({
            "description": description,
            "providerState": provider_state,
            "request": request,
            "response": response
        })
    
    def generate_pact_file(self) -> Dict:
        """生成Pact文件"""
        return {
            "consumer": {"name": self.consumer},
            "provider": {"name": self.provider},
            "interactions": self.interactions,
            "metadata": {
                "pactSpecification": {"version": "3.0.0"},
                "client": {"name": "pact-python", "version": "1.0.0"}
            }
        }
    
    def save_pact(self, filepath: str):
        """保存Pact文件"""
        pact_data = self.generate_pact_file()
        with open(filepath, 'w') as f:
            json.dump(pact_data, f, indent=2)
        print(f"Pact saved to {filepath}")

# 使用示例
generator = PactFileGenerator("OrderService", "UserService")
generator.add_interaction(
    description="get user by id",
    provider_state="user 123 exists",
    request={"method": "GET", "path": "/users/123"},
    response={
        "status": 200,
        "headers": {"Content-Type": "application/json"},
        "body": {"id": 123, "name": "Test User"}
    }
)
generator.save_pact("pacts/order-service-user-service.json")

提供者端验证

提供者端验证确保实际API行为符合消费者定义的契约。使用Pact Broker存储和共享契约,支持多消费者场景和版本管理。

# 提供者端验证
from typing import List, Dict
import subprocess

class ProviderVerifier:
    def __init__(self, provider_name: str, provider_base_url: str):
        self.provider_name = provider_name
        self.provider_base_url = provider_base_url
        self.pact_broker_url = None
        self.provider_app = None
    
    def set_pact_broker(self, broker_url: str, token: str = None):
        """设置Pact Broker"""
        self.pact_broker_url = broker_url
        self.broker_token = token
    
    def set_provider_app(self, app):
        """设置提供者应用"""
        self.provider_app = app
    
    def verify_pact(self, pact_file: str) -> Dict:
        """验证单个Pact文件"""
        # 读取Pact文件
        with open(pact_file, 'r') as f:
            pact_data = json.load(f)
        
        results = []
        
        for interaction in pact_data.get("interactions", []):
            result = self._verify_interaction(interaction)
            results.append(result)
        
        return {
            "provider": self.provider_name,
            "pact_file": pact_file,
            "interactions_verified": len(results),
            "passed": all(r["passed"] for r in results),
            "results": results
        }
    
    def _verify_interaction(self, interaction: Dict) -> Dict:
        """验证单个交互"""
        description = interaction["description"]
        request = interaction["request"]
        expected_response = interaction["response"]
        
        try:
            # 模拟API调用
            actual_response = self._make_request(request)
            
            # 验证响应
            passed = (
                actual_response["status"] == expected_response["status"] and
                self._compare_body(actual_response.get("body"), 
                                  expected_response.get("body"))
            )
            
            return {
                "interaction": description,
                "passed": passed,
                "expected_status": expected_response["status"],
                "actual_status": actual_response["status"]
            }
        
        except Exception as e:
            return {
                "interaction": description,
                "passed": False,
                "error": str(e)
            }
    
    def _make_request(self, request: Dict) -> Dict:
        """发送请求"""
        # 模拟实际HTTP请求
        return {"status": 200, "body": {"id": 123, "name": "Test User"}}
    
    def _compare_body(self, actual: Any, expected: Any) -> bool:
        """比较响应体"""
        if expected is None:
            return True
        return actual == expected
    
    def verify_from_broker(self, consumer_tags: List[str] = None) -> Dict:
        """从Pact Broker验证"""
        if not self.pact_broker_url:
            raise ValueError("Pact Broker URL not set")
        
        # 构建验证命令
        cmd = [
            "pact-provider-verifier",
            "--provider-base-url", self.provider_base_url,
            "--provider", self.provider_name,
            "--pact-broker-base-url", self.pact_broker_url,
            "--provider-app-version", "1.0.0",
            "--publish-verification-results"
        ]
        
        if consumer_tags:
            cmd.extend(["--consumer-version-tag"] + consumer_tags)
        
        print(f"Running: {' '.join(cmd)}")
        
        # 执行验证
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        return {
            "success": result.returncode == 0,
            "output": result.stdout,
            "errors": result.stderr
        }

# 提供者状态设置
class ProviderStateSetup:
    def __init__(self):
        self.states = {}
    
    def register_state(self, state_name: str, setup_fn: Callable):
        """注册状态设置函数"""
        self.states[state_name] = setup_fn
    
    def setup_state(self, state_name: str):
        """执行状态设置"""
        setup_fn = self.states.get(state_name)
        if setup_fn:
            setup_fn()
            print(f"Provider state set: {state_name}")
        else:
            print(f"Unknown state: {state_name}")
    
    def teardown_state(self, state_name: str):
        """清理状态"""
        print(f"Tearing down state: {state_name}")

# 使用示例
provider_setup = ProviderStateSetup()
provider_setup.register_state(
    "user 123 exists",
    lambda: print("Setting up test user in database")
)
provider_setup.register_state(
    "user 456 does not exist",
    lambda: print("Ensuring user 456 does not exist")
)

Pact Broker集成

Pact Broker是契约的中央存储库,支持版本管理、标签、矩阵视图和Canary发布。通过Webhook实现自动化验证流程。

# Pact Broker客户端
import requests
from typing import List, Dict

class PactBrokerClient:
    def __init__(self, base_url: str, token: str = None):
        self.base_url = base_url.rstrip('/')
        self.token = token
        self.headers = {"Accept": "application/hal+json"}
        if token:
            self.headers["Authorization"] = f"Bearer {token}"
    
    def publish_pact(self, pact_content: Dict, 
                    consumer_version: str,
                    tags: List[str] = None) -> Dict:
        """发布Pact到Broker"""
        consumer = pact_content["consumer"]["name"]
        provider = pact_content["provider"]["name"]
        
        url = f"{self.base_url}/pacts/provider/{provider}/consumer/{consumer}/version/{consumer_version}"
        
        response = requests.put(
            url,
            json=pact_content,
            headers=self.headers
        )
        
        # 添加标签
        if tags:
            for tag in tags:
                self._tag_version(consumer, consumer_version, tag)
        
        return {
            "status": response.status_code,
            "url": response.json().get("_links", {}).get("self", {}).get("href")
        }
    
    def _tag_version(self, consumer: str, version: str, tag: str):
        """给版本打标签"""
        url = f"{self.base_url}/pacticipants/{consumer}/versions/{version}/tags/{tag}"
        requests.put(url, headers=self.headers)
    
    def get_pacts_for_provider(self, provider: str, 
                              consumer_version_tag: str = None) -> List[Dict]:
        """获取提供者的所有契约"""
        url = f"{self.base_url}/pacts/provider/{provider}/latest"
        
        if consumer_version_tag:
            url += f"/{consumer_version_tag}"
        
        response = requests.get(url, headers=self.headers)
        return response.json().get("_embedded", {}).get("pacts", [])
    
    def can_i_deploy(self, consumer: str, consumer_version: str,
                    provider: str, provider_version: str = None) -> Dict:
        """检查是否可以部署"""
        url = f"{self.base_url}/can-i-deploy/pactsicipant/{consumer}/version/{consumer_version}/to/{provider}"
        
        if provider_version:
            url += f"/{provider_version}"
        
        response = requests.get(url, headers=self.headers)
        return response.json()
    
    def get_verification_results(self, provider: str, 
                                provider_version: str) -> List[Dict]:
        """获取验证结果"""
        url = f"{self.base_url}/pacts/provider/{provider}/versions/{provider_version}/verification-results"
        
        response = requests.get(url, headers=self.headers)
        return response.json().get("_embedded", {}).get("verification-results", [])

# Webhook配置
class PactWebhook:
    def __init__(self, broker_client: PactBrokerClient):
        self.broker = broker_client
    
    def create_webhook(self, event: str, provider: str, 
                      webhook_url: str) -> Dict:
        """创建Webhook"""
        webhook = {
            "description": f"Trigger CI for {provider} on {event}",
            "provider": {"name": provider},
            "events": [{"name": event}],
            "request": {
                "method": "POST",
                "url": webhook_url,
                "headers": {"Content-Type": "application/json"}
            }
        }
        
        url = f"{self.broker.base_url}/webhooks"
        response = requests.post(url, json=webhook, headers=self.broker.headers)
        
        return {"status": response.status_code, "webhook": webhook}

# 使用示例
broker = PactBrokerClient("https://pact.example.com", token="xxx")
broker.publish_pact(
    pact_content={
        "consumer": {"name": "OrderService"},
        "provider": {"name": "UserService"},
        "interactions": []
    },
    consumer_version="1.0.0",
    tags=["main", "production"]
)

result = broker.can_i_deploy(
    consumer="OrderService",
    consumer_version="1.0.0",
    provider="UserService"
)
print(f"Can deploy: {result.get('summary', {}).get('deployable', False)}")

契约测试最佳实践

契约测试的成功依赖于良好的实践:清晰的交互描述、合理的提供者状态、版本管理和CI/CD集成。

# 契约测试最佳实践
class ContractTestBestPractices:
    """契约测试最佳实践指南"""
    
    @staticmethod
    def naming_conventions():
        """命名规范"""
        return {
            "interaction_description": "应使用清晰、具体的行为描述,如 'get user by valid id'",
            "provider_state": "使用 Given 风格,如 'user 123 exists with active status'",
            "consumer_names": "使用服务实际名称,如 'OrderService' 而非 'order'"
        }
    
    @staticmethod
    def interaction_design():
        """交互设计原则"""
        return {
            "minimal": "每个交互只测试一个场景",
            "specific": "避免过于通用的交互描述",
            "isolated": "交互之间不应有依赖",
            "realistic": "使用真实的请求和响应示例"
        }
    
    @staticmethod
    def version_management():
        """版本管理策略"""
        return {
            "semver": "使用语义化版本号",
            "tags": "使用标签标记重要版本(main, production)",
            "branch": "为每个分支维护独立版本",
            "retention": "定期清理旧版本契约"
        }

# 契约测试质量检查
class ContractTestValidator:
    def __init__(self):
        self.issues = []
    
    def validate_pact(self, pact: Dict) -> Dict:
        """验证Pact质量"""
        self.issues = []
        
        # 检查交互数量
        interactions = pact.get("interactions", [])
        if len(interactions) == 0:
            self.issues.append({"severity": "error", "message": "No interactions defined"})
        
        # 检查每个交互
        for interaction in interactions:
            self._validate_interaction(interaction)
        
        return {
            "valid": len(self.issues) == 0,
            "issues": self.issues
        }
    
    def _validate_interaction(self, interaction: Dict):
        """验证单个交互"""
        # 检查描述
        description = interaction.get("description", "")
        if len(description) < 10:
            self.issues.append({
                "severity": "warning",
                "message": f"Description too short: '{description}'"
            })
        
        # 检查提供者状态
        provider_state = interaction.get("providerState", "")
        if not provider_state:
            self.issues.append({
                "severity": "warning",
                "message": "Missing provider state"
            })
        
        # 检查请求
        request = interaction.get("request", {})
        if "method" not in request or "path" not in request:
            self.issues.append({
                "severity": "error",
                "message": "Invalid request definition"
            })
        
        # 检查响应
        response = interaction.get("response", {})
        if "status" not in response:
            self.issues.append({
                "severity": "error",
                "message": "Missing response status"
            })

# 使用示例
validator = ContractTestValidator()
result = validator.validate_pact({
    "consumer": {"name": "Test"},
    "provider": {"name": "TestProvider"},
    "interactions": [{
        "description": "get user",
        "providerState": "user exists",
        "request": {"method": "GET", "path": "/users/123"},
        "response": {"status": 200, "body": {"id": 123}}
    }]
})
print(f"Validation: {result}")