微服务测试架构:契约测试、集成测试与E2E策略
微服务测试架构:契约测试、集成测试与E2E策略
微服务测试挑战
微服务架构带来分布式系统的复杂性:服务间依赖众多、数据一致性难以保证、故障影响范围难以预测。测试策略需要从单体应用的"测试金字塔"演进为"测试蜂巢",在不同层次建立信心。
# 微服务测试策略定义
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class TestLevel(Enum):
UNIT = "unit"
CONTRACT = "contract"
INTEGRATION = "integration"
E2E = "e2e"
PERFORMANCE = "performance"
@dataclass
class TestStrategy:
name: str
levels: List[TestLevel]
coverage_target: float
automation_target: float
description: str = ""
@dataclass
class ServiceTestConfig:
service_name: str
dependencies: List[str]
test_strategy: TestStrategy
mock_services: List[str] = field(default_factory=list)
test_data_requirements: List[str] = field(default_factory=list)
class MicroserviceTestSuite:
def __init__(self, config: ServiceTestConfig):
self.config = config
self.test_results = []
def run_all_levels(self) -> Dict:
"""按测试层次执行"""
results = {}
for level in self.config.test_strategy.levels:
print(f"Running {level.value} tests...")
level_results = self._run_level(level)
results[level.value] = level_results
return {
"service": self.config.service_name,
"results": results,
"overall_status": self._calculate_status(results)
}
def _run_level(self, level: TestLevel) -> Dict:
"""执行指定层次的测试"""
if level == TestLevel.UNIT:
return self._run_unit_tests()
elif level == TestLevel.CONTRACT:
return self._run_contract_tests()
elif level == TestLevel.INTEGRATION:
return self._run_integration_tests()
elif level == TestLevel.E2E:
return self._run_e2e_tests()
return {"status": "skipped"}
def _run_unit_tests(self) -> Dict:
return {"passed": 150, "failed": 2, "skipped": 5}
def _run_contract_tests(self) -> Dict:
return {"passed": 25, "failed": 0, "skipped": 0}
def _run_integration_tests(self) -> Dict:
return {"passed": 40, "failed": 3, "skipped": 2}
def _run_e2e_tests(self) -> Dict:
return {"passed": 20, "failed": 1, "skipped": 0}
def _calculate_status(self, results: Dict) -> str:
for level, result in results.items():
if result.get("failed", 0) > 0:
return "failed"
return "passed"
服务虚拟化与Mock
服务虚拟化模拟依赖服务的行为,使测试可以在没有真实依赖的情况下进行。支持录制-回放模式和动态响应配置。
# 服务虚拟化框架
from typing import Callable
import json
from datetime import datetime
@dataclass
class MockEndpoint:
path: str
method: str
response_body: Any
status_code: int = 200
delay_ms: int = 0
headers: Dict = field(default_factory=dict)
class ServiceVirtualization:
def __init__(self, service_name: str):
self.service_name = service_name
self.endpoints: List[MockEndpoint] = []
self.recorded_requests = []
self.request_log = []
def mock_endpoint(self, path: str, method: str,
response: Any, status: int = 200,
delay: int = 0):
"""配置Mock端点"""
self.endpoints.append(MockEndpoint(
path=path,
method=method,
response_body=response,
status_code=status,
delay_ms=delay
))
def handle_request(self, method: str, path: str,
body: Dict = None) -> Dict:
"""处理模拟请求"""
# 记录请求
self.request_log.append({
"method": method,
"path": path,
"body": body,
"timestamp": datetime.now().isoformat()
})
# 查找匹配的Mock
for endpoint in self.endpoints:
if endpoint.path == path and endpoint.method == method:
return {
"status": endpoint.status_code,
"body": endpoint.response_body,
"headers": endpoint.headers
}
return {"status": 404, "body": {"error": "Not found"}}
def record_interaction(self, real_service_url: str,
requests: List[Dict]):
"""录制真实服务交互"""
for req in requests:
self.recorded_requests.append({
"request": req,
"recorded_at": datetime.now().isoformat()
})
def playback(self, request: Dict) -> Dict:
"""回放录制的交互"""
for recorded in self.recorded_requests:
if self._match_request(request, recorded["request"]):
return recorded.get("response", {"status": 200})
return {"status": 404}
def _match_request(self, actual: Dict, expected: Dict) -> bool:
return (actual.get("path") == expected.get("path") and
actual.get("method") == expected.get("method"))
# 使用示例
virtualization = ServiceVirtualization("user-service")
virtualization.mock_endpoint(
path="/users/123",
method="GET",
response={"id": 123, "name": "Test User", "email": "test@example.com"}
)
virtualization.mock_endpoint(
path="/users/123/orders",
method="GET",
response=[{"order_id": "ORD001", "amount": 99.99}]
)
response = virtualization.handle_request("GET", "/users/123")
print(response)
集成测试策略
集成测试验证服务间的实际交互,包括同步调用、异步消息和数据一致性。使用Testcontainers在测试中启动真实的依赖服务。
# 集成测试框架
from typing import List, Dict
import asyncio
class IntegrationTestBase:
def __init__(self):
self.containers = {}
self.test_fixtures = {}
async def setup(self):
"""启动测试依赖"""
# 启动数据库容器
self.containers["postgres"] = await self._start_container(
image="postgres:14",
ports={5432: 5432},
env={"POSTGRES_DB": "test_db"}
)
# 启动Redis容器
self.containers["redis"] = await self._start_container(
image="redis:7",
ports={6379: 6379}
)
# 加载测试数据
await self._load_test_data()
async def teardown(self):
"""清理测试环境"""
for container in self.containers.values():
await self._stop_container(container)
async def _start_container(self, image: str, ports: Dict,
env: Dict = None) -> Any:
"""启动容器"""
print(f"Starting container: {image}")
return {"image": image, "status": "running"}
async def _stop_container(self, container):
"""停止容器"""
print(f"Stopping container: {container['image']}")
async def _load_test_data(self):
"""加载测试数据"""
print("Loading test fixtures...")
class ServiceIntegrationTest(IntegrationTestBase):
def __init__(self, service_under_test):
super().__init__()
self.service = service_under_test
async def test_create_order_flow(self):
"""测试订单创建流程"""
await self.setup()
try:
# 1. 创建用户
user = await self.service.create_user({
"name": "Test User",
"email": "test@example.com"
})
assert user["id"] is not None
# 2. 创建订单
order = await self.service.create_order({
"user_id": user["id"],
"items": [{"product_id": "P001", "quantity": 2}]
})
assert order["status"] == "pending"
# 3. 支付订单
payment = await self.service.process_payment(order["id"])
assert payment["status"] == "success"
# 4. 验证订单状态
updated_order = await self.service.get_order(order["id"])
assert updated_order["status"] == "paid"
print("Test passed: create order flow")
finally:
await self.teardown()
async def test_concurrent_operations(self):
"""测试并发操作"""
await self.setup()
try:
# 并发更新库存
tasks = []
for i in range(10):
task = self.service.update_inventory(
product_id="P001",
quantity_change=-1
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 验证没有竞态条件
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) == 0, f"Concurrent update errors: {errors}"
# 验证最终库存正确
inventory = await self.service.get_inventory("P001")
assert inventory["quantity"] == 90 # 初始100 - 10
print("Test passed: concurrent operations")
finally:
await self.teardown()
# 数据一致性测试
class ConsistencyTest:
def __init__(self, service_a, service_b):
self.service_a = service_a
self.service_b = service_b
async def test_eventual_consistency(self):
"""测试最终一致性"""
# 在服务A创建数据
await self.service_a.create_record({"id": "R001", "value": "test"})
# 等待同步
await asyncio.sleep(1)
# 验证服务B最终有一致的数据
for _ in range(10):
record = await self.service_b.get_record("R001")
if record and record["value"] == "test":
return True
await asyncio.sleep(0.5)
return False
async def test_failure_recovery(self):
"""测试故障恢复"""
# 模拟服务B暂时不可用
await self.service_b.simulate_failure(duration=5)
# 服务A应该能够继续处理并重试
result = await self.service_a.create_record_with_retry(
{"id": "R002", "value": "test"},
max_retries=3,
retry_delay=2
)
# 恢复后验证数据同步
await asyncio.sleep(3)
record = await self.service_b.get_record("R002")
return record is not None
测试数据管理
测试数据需要覆盖各种场景,同时保护敏感信息。使用工厂模式生成测试数据,数据脱敏保护隐私,数据快照支持测试重现。
# 测试数据管理
import random
import string
from faker import Faker
class TestDataFactory:
def __init__(self):
self.faker = Faker()
def create_user(self, **overrides) -> Dict:
"""创建测试用户"""
default = {
"id": str(random.randint(1000, 9999)),
"name": self.faker.name(),
"email": self.faker.email(),
"phone": self.faker.phone_number(),
"created_at": self.faker.date_time().isoformat()
}
default.update(overrides)
return default
def create_order(self, user_id: str = None, **overrides) -> Dict:
"""创建测试订单"""
default = {
"order_id": f"ORD{random.randint(100000, 999999)}",
"user_id": user_id or str(random.randint(1000, 9999)),
"items": [
{
"product_id": f"P{random.randint(100, 999)}",
"quantity": random.randint(1, 5),
"price": round(random.uniform(10, 100), 2)
}
],
"status": random.choice(["pending", "paid", "shipped"]),
"total": round(random.uniform(50, 500), 2)
}
default.update(overrides)
return default
def create_batch(self, factory_fn, count: int, **kwargs) -> List[Dict]:
"""批量创建测试数据"""
return [factory_fn(**kwargs) for _ in range(count)]
class DataSanitizer:
"""测试数据脱敏"""
def __init__(self):
self.faker = Faker()
def sanitize_user(self, user: Dict) -> Dict:
"""脱敏用户数据"""
sanitized = user.copy()
sanitized["name"] = self.faker.name()
sanitized["email"] = self.faker.email()
sanitized["phone"] = self.faker.phone_number()
sanitized["address"] = self.faker.address()
return sanitized
def sanitize_batch(self, records: List[Dict],
sensitive_fields: List[str]) -> List[Dict]:
"""批量脱敏"""
sanitized = []
for record in records:
new_record = record.copy()
for field in sensitive_fields:
if field in new_record:
new_record[field] = self._mask_value(new_record[field])
sanitized.append(new_record)
return sanitized
def _mask_value(self, value) -> str:
"""掩码处理"""
if isinstance(value, str):
if "@" in value:
return self.faker.email()
return "***MASKED***"
return str(value)
# 测试数据快照
class TestDataSnapshot:
def __init__(self, storage_path: str):
self.storage_path = storage_path
def create_snapshot(self, name: str, data: Dict) -> str:
"""创建数据快照"""
snapshot = {
"name": name,
"data": data,
"created_at": datetime.now().isoformat()
}
snapshot_id = f"snapshot_{name}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
print(f"Created snapshot: {snapshot_id}")
return snapshot_id
def restore_snapshot(self, snapshot_id: str) -> Dict:
"""恢复数据快照"""
print(f"Restoring snapshot: {snapshot_id}")
return {"status": "restored"}
测试执行与报告
统一的测试执行框架支持多层测试的编排、并行执行和结果聚合,生成可视化的测试报告。
# 测试执行器
import time
from dataclasses import dataclass
from typing import Callable
@dataclass
class TestCase:
name: str
test_fn: Callable
level: TestLevel
timeout: int = 30
tags: List[str] = field(default_factory=list)
class TestExecutor:
def __init__(self):
self.test_cases: List[TestCase] = []
self.results = []
def add_test(self, test: TestCase):
"""添加测试用例"""
self.test_cases.append(test)
def run(self, levels: List[TestLevel] = None,
tags: List[str] = None) -> Dict:
"""执行测试"""
# 过滤测试用例
tests_to_run = self.test_cases
if levels:
tests_to_run = [t for t in tests_to_run if t.level in levels]
if tags:
tests_to_run = [t for t in tests_to_run
if any(tag in t.tags for tag in tags)]
# 执行测试
results = []
for test in tests_to_run:
start_time = time.time()
try:
test.test_fn()
duration = time.time() - start_time
results.append({
"name": test.name,
"status": "passed",
"duration": duration,
"level": test.level.value
})
except AssertionError as e:
duration = time.time() - start_time
results.append({
"name": test.name,
"status": "failed",
"duration": duration,
"error": str(e),
"level": test.level.value
})
except Exception as e:
duration = time.time() - start_time
results.append({
"name": test.name,
"status": "error",
"duration": duration,
"error": str(e),
"level": test.level.value
})
self.results = results
return self._generate_report()
def _generate_report(self) -> Dict:
"""生成测试报告"""
total = len(self.results)
passed = sum(1 for r in self.results if r["status"] == "passed")
failed = sum(1 for r in self.results if r["status"] == "failed")
errors = sum(1 for r in self.results if r["status"] == "error")
# 按层次统计
by_level = {}
for result in self.results:
level = result["level"]
if level not in by_level:
by_level[level] = {"passed": 0, "failed": 0, "errors": 0}
by_level[level][result["status"] + "s" if result["status"] != "error" else "errors"] += 1
return {
"summary": {
"total": total,
"passed": passed,
"failed": failed,
"errors": errors,
"pass_rate": passed / total if total > 0 else 0
},
"by_level": by_level,
"details": self.results
}