← 返回首页
🧪

微服务测试架构:契约测试、集成测试与E2E策略

📂 architecture ⏱ 7 min 1209 words

微服务测试架构:契约测试、集成测试与E2E策略

微服务测试挑战

微服务架构带来分布式系统的复杂性:服务间依赖众多、数据一致性难以保证、故障影响范围难以预测。测试策略需要从单体应用的"测试金字塔"演进为"测试蜂巢",在不同层次建立信心。

# 微服务测试策略定义
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum

class TestLevel(Enum):
    UNIT = "unit"
    CONTRACT = "contract"
    INTEGRATION = "integration"
    E2E = "e2e"
    PERFORMANCE = "performance"

@dataclass
class TestStrategy:
    name: str
    levels: List[TestLevel]
    coverage_target: float
    automation_target: float
    description: str = ""

@dataclass
class ServiceTestConfig:
    service_name: str
    dependencies: List[str]
    test_strategy: TestStrategy
    mock_services: List[str] = field(default_factory=list)
    test_data_requirements: List[str] = field(default_factory=list)

class MicroserviceTestSuite:
    def __init__(self, config: ServiceTestConfig):
        self.config = config
        self.test_results = []
    
    def run_all_levels(self) -> Dict:
        """按测试层次执行"""
        results = {}
        
        for level in self.config.test_strategy.levels:
            print(f"Running {level.value} tests...")
            level_results = self._run_level(level)
            results[level.value] = level_results
        
        return {
            "service": self.config.service_name,
            "results": results,
            "overall_status": self._calculate_status(results)
        }
    
    def _run_level(self, level: TestLevel) -> Dict:
        """执行指定层次的测试"""
        if level == TestLevel.UNIT:
            return self._run_unit_tests()
        elif level == TestLevel.CONTRACT:
            return self._run_contract_tests()
        elif level == TestLevel.INTEGRATION:
            return self._run_integration_tests()
        elif level == TestLevel.E2E:
            return self._run_e2e_tests()
        return {"status": "skipped"}
    
    def _run_unit_tests(self) -> Dict:
        return {"passed": 150, "failed": 2, "skipped": 5}
    
    def _run_contract_tests(self) -> Dict:
        return {"passed": 25, "failed": 0, "skipped": 0}
    
    def _run_integration_tests(self) -> Dict:
        return {"passed": 40, "failed": 3, "skipped": 2}
    
    def _run_e2e_tests(self) -> Dict:
        return {"passed": 20, "failed": 1, "skipped": 0}
    
    def _calculate_status(self, results: Dict) -> str:
        for level, result in results.items():
            if result.get("failed", 0) > 0:
                return "failed"
        return "passed"

服务虚拟化与Mock

服务虚拟化模拟依赖服务的行为,使测试可以在没有真实依赖的情况下进行。支持录制-回放模式和动态响应配置。

# 服务虚拟化框架
from typing import Callable
import json
from datetime import datetime

@dataclass
class MockEndpoint:
    path: str
    method: str
    response_body: Any
    status_code: int = 200
    delay_ms: int = 0
    headers: Dict = field(default_factory=dict)

class ServiceVirtualization:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.endpoints: List[MockEndpoint] = []
        self.recorded_requests = []
        self.request_log = []
    
    def mock_endpoint(self, path: str, method: str, 
                     response: Any, status: int = 200,
                     delay: int = 0):
        """配置Mock端点"""
        self.endpoints.append(MockEndpoint(
            path=path,
            method=method,
            response_body=response,
            status_code=status,
            delay_ms=delay
        ))
    
    def handle_request(self, method: str, path: str, 
                      body: Dict = None) -> Dict:
        """处理模拟请求"""
        # 记录请求
        self.request_log.append({
            "method": method,
            "path": path,
            "body": body,
            "timestamp": datetime.now().isoformat()
        })
        
        # 查找匹配的Mock
        for endpoint in self.endpoints:
            if endpoint.path == path and endpoint.method == method:
                return {
                    "status": endpoint.status_code,
                    "body": endpoint.response_body,
                    "headers": endpoint.headers
                }
        
        return {"status": 404, "body": {"error": "Not found"}}
    
    def record_interaction(self, real_service_url: str, 
                          requests: List[Dict]):
        """录制真实服务交互"""
        for req in requests:
            self.recorded_requests.append({
                "request": req,
                "recorded_at": datetime.now().isoformat()
            })
    
    def playback(self, request: Dict) -> Dict:
        """回放录制的交互"""
        for recorded in self.recorded_requests:
            if self._match_request(request, recorded["request"]):
                return recorded.get("response", {"status": 200})
        
        return {"status": 404}
    
    def _match_request(self, actual: Dict, expected: Dict) -> bool:
        return (actual.get("path") == expected.get("path") and
                actual.get("method") == expected.get("method"))

# 使用示例
virtualization = ServiceVirtualization("user-service")
virtualization.mock_endpoint(
    path="/users/123",
    method="GET",
    response={"id": 123, "name": "Test User", "email": "test@example.com"}
)
virtualization.mock_endpoint(
    path="/users/123/orders",
    method="GET",
    response=[{"order_id": "ORD001", "amount": 99.99}]
)

response = virtualization.handle_request("GET", "/users/123")
print(response)

集成测试策略

集成测试验证服务间的实际交互,包括同步调用、异步消息和数据一致性。使用Testcontainers在测试中启动真实的依赖服务。

# 集成测试框架
from typing import List, Dict
import asyncio

class IntegrationTestBase:
    def __init__(self):
        self.containers = {}
        self.test_fixtures = {}
    
    async def setup(self):
        """启动测试依赖"""
        # 启动数据库容器
        self.containers["postgres"] = await self._start_container(
            image="postgres:14",
            ports={5432: 5432},
            env={"POSTGRES_DB": "test_db"}
        )
        
        # 启动Redis容器
        self.containers["redis"] = await self._start_container(
            image="redis:7",
            ports={6379: 6379}
        )
        
        # 加载测试数据
        await self._load_test_data()
    
    async def teardown(self):
        """清理测试环境"""
        for container in self.containers.values():
            await self._stop_container(container)
    
    async def _start_container(self, image: str, ports: Dict, 
                               env: Dict = None) -> Any:
        """启动容器"""
        print(f"Starting container: {image}")
        return {"image": image, "status": "running"}
    
    async def _stop_container(self, container):
        """停止容器"""
        print(f"Stopping container: {container['image']}")
    
    async def _load_test_data(self):
        """加载测试数据"""
        print("Loading test fixtures...")

class ServiceIntegrationTest(IntegrationTestBase):
    def __init__(self, service_under_test):
        super().__init__()
        self.service = service_under_test
    
    async def test_create_order_flow(self):
        """测试订单创建流程"""
        await self.setup()
        
        try:
            # 1. 创建用户
            user = await self.service.create_user({
                "name": "Test User",
                "email": "test@example.com"
            })
            assert user["id"] is not None
            
            # 2. 创建订单
            order = await self.service.create_order({
                "user_id": user["id"],
                "items": [{"product_id": "P001", "quantity": 2}]
            })
            assert order["status"] == "pending"
            
            # 3. 支付订单
            payment = await self.service.process_payment(order["id"])
            assert payment["status"] == "success"
            
            # 4. 验证订单状态
            updated_order = await self.service.get_order(order["id"])
            assert updated_order["status"] == "paid"
            
            print("Test passed: create order flow")
        
        finally:
            await self.teardown()
    
    async def test_concurrent_operations(self):
        """测试并发操作"""
        await self.setup()
        
        try:
            # 并发更新库存
            tasks = []
            for i in range(10):
                task = self.service.update_inventory(
                    product_id="P001",
                    quantity_change=-1
                )
                tasks.append(task)
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 验证没有竞态条件
            errors = [r for r in results if isinstance(r, Exception)]
            assert len(errors) == 0, f"Concurrent update errors: {errors}"
            
            # 验证最终库存正确
            inventory = await self.service.get_inventory("P001")
            assert inventory["quantity"] == 90  # 初始100 - 10
            
            print("Test passed: concurrent operations")
        
        finally:
            await self.teardown()

# 数据一致性测试
class ConsistencyTest:
    def __init__(self, service_a, service_b):
        self.service_a = service_a
        self.service_b = service_b
    
    async def test_eventual_consistency(self):
        """测试最终一致性"""
        # 在服务A创建数据
        await self.service_a.create_record({"id": "R001", "value": "test"})
        
        # 等待同步
        await asyncio.sleep(1)
        
        # 验证服务B最终有一致的数据
        for _ in range(10):
            record = await self.service_b.get_record("R001")
            if record and record["value"] == "test":
                return True
            await asyncio.sleep(0.5)
        
        return False
    
    async def test_failure_recovery(self):
        """测试故障恢复"""
        # 模拟服务B暂时不可用
        await self.service_b.simulate_failure(duration=5)
        
        # 服务A应该能够继续处理并重试
        result = await self.service_a.create_record_with_retry(
            {"id": "R002", "value": "test"},
            max_retries=3,
            retry_delay=2
        )
        
        # 恢复后验证数据同步
        await asyncio.sleep(3)
        record = await self.service_b.get_record("R002")
        
        return record is not None

测试数据管理

测试数据需要覆盖各种场景,同时保护敏感信息。使用工厂模式生成测试数据,数据脱敏保护隐私,数据快照支持测试重现。

# 测试数据管理
import random
import string
from faker import Faker

class TestDataFactory:
    def __init__(self):
        self.faker = Faker()
    
    def create_user(self, **overrides) -> Dict:
        """创建测试用户"""
        default = {
            "id": str(random.randint(1000, 9999)),
            "name": self.faker.name(),
            "email": self.faker.email(),
            "phone": self.faker.phone_number(),
            "created_at": self.faker.date_time().isoformat()
        }
        default.update(overrides)
        return default
    
    def create_order(self, user_id: str = None, **overrides) -> Dict:
        """创建测试订单"""
        default = {
            "order_id": f"ORD{random.randint(100000, 999999)}",
            "user_id": user_id or str(random.randint(1000, 9999)),
            "items": [
                {
                    "product_id": f"P{random.randint(100, 999)}",
                    "quantity": random.randint(1, 5),
                    "price": round(random.uniform(10, 100), 2)
                }
            ],
            "status": random.choice(["pending", "paid", "shipped"]),
            "total": round(random.uniform(50, 500), 2)
        }
        default.update(overrides)
        return default
    
    def create_batch(self, factory_fn, count: int, **kwargs) -> List[Dict]:
        """批量创建测试数据"""
        return [factory_fn(**kwargs) for _ in range(count)]

class DataSanitizer:
    """测试数据脱敏"""
    
    def __init__(self):
        self.faker = Faker()
    
    def sanitize_user(self, user: Dict) -> Dict:
        """脱敏用户数据"""
        sanitized = user.copy()
        sanitized["name"] = self.faker.name()
        sanitized["email"] = self.faker.email()
        sanitized["phone"] = self.faker.phone_number()
        sanitized["address"] = self.faker.address()
        return sanitized
    
    def sanitize_batch(self, records: List[Dict], 
                      sensitive_fields: List[str]) -> List[Dict]:
        """批量脱敏"""
        sanitized = []
        for record in records:
            new_record = record.copy()
            for field in sensitive_fields:
                if field in new_record:
                    new_record[field] = self._mask_value(new_record[field])
            sanitized.append(new_record)
        return sanitized
    
    def _mask_value(self, value) -> str:
        """掩码处理"""
        if isinstance(value, str):
            if "@" in value:
                return self.faker.email()
            return "***MASKED***"
        return str(value)

# 测试数据快照
class TestDataSnapshot:
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
    
    def create_snapshot(self, name: str, data: Dict) -> str:
        """创建数据快照"""
        snapshot = {
            "name": name,
            "data": data,
            "created_at": datetime.now().isoformat()
        }
        
        snapshot_id = f"snapshot_{name}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        print(f"Created snapshot: {snapshot_id}")
        return snapshot_id
    
    def restore_snapshot(self, snapshot_id: str) -> Dict:
        """恢复数据快照"""
        print(f"Restoring snapshot: {snapshot_id}")
        return {"status": "restored"}

测试执行与报告

统一的测试执行框架支持多层测试的编排、并行执行和结果聚合,生成可视化的测试报告。

# 测试执行器
import time
from dataclasses import dataclass
from typing import Callable

@dataclass
class TestCase:
    name: str
    test_fn: Callable
    level: TestLevel
    timeout: int = 30
    tags: List[str] = field(default_factory=list)

class TestExecutor:
    def __init__(self):
        self.test_cases: List[TestCase] = []
        self.results = []
    
    def add_test(self, test: TestCase):
        """添加测试用例"""
        self.test_cases.append(test)
    
    def run(self, levels: List[TestLevel] = None, 
            tags: List[str] = None) -> Dict:
        """执行测试"""
        # 过滤测试用例
        tests_to_run = self.test_cases
        
        if levels:
            tests_to_run = [t for t in tests_to_run if t.level in levels]
        
        if tags:
            tests_to_run = [t for t in tests_to_run 
                           if any(tag in t.tags for tag in tags)]
        
        # 执行测试
        results = []
        for test in tests_to_run:
            start_time = time.time()
            try:
                test.test_fn()
                duration = time.time() - start_time
                results.append({
                    "name": test.name,
                    "status": "passed",
                    "duration": duration,
                    "level": test.level.value
                })
            except AssertionError as e:
                duration = time.time() - start_time
                results.append({
                    "name": test.name,
                    "status": "failed",
                    "duration": duration,
                    "error": str(e),
                    "level": test.level.value
                })
            except Exception as e:
                duration = time.time() - start_time
                results.append({
                    "name": test.name,
                    "status": "error",
                    "duration": duration,
                    "error": str(e),
                    "level": test.level.value
                })
        
        self.results = results
        return self._generate_report()
    
    def _generate_report(self) -> Dict:
        """生成测试报告"""
        total = len(self.results)
        passed = sum(1 for r in self.results if r["status"] == "passed")
        failed = sum(1 for r in self.results if r["status"] == "failed")
        errors = sum(1 for r in self.results if r["status"] == "error")
        
        # 按层次统计
        by_level = {}
        for result in self.results:
            level = result["level"]
            if level not in by_level:
                by_level[level] = {"passed": 0, "failed": 0, "errors": 0}
            by_level[level][result["status"] + "s" if result["status"] != "error" else "errors"] += 1
        
        return {
            "summary": {
                "total": total,
                "passed": passed,
                "failed": failed,
                "errors": errors,
                "pass_rate": passed / total if total > 0 else 0
            },
            "by_level": by_level,
            "details": self.results
        }