回归测试架构:风险分析与选择性执行
回归测试架构:风险分析与选择性执行
回归测试策略
回归测试确保代码变更没有引入新的缺陷。随着代码库增长,全量回归测试变得不可行,需要智能策略选择最有价值的测试执行。核心策略包括:基于风险的选择、变更影响分析和测试优先级排序。
# 回归测试框架
from dataclasses import dataclass, field
from typing import Dict, List, Set, Callable, Optional
from enum import Enum
import time
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class TestImpact:
test_id: str
affected_components: Set[str]
risk_score: float
execution_time: float
last_run: Optional[float] = None
failure_history: List[bool] = field(default_factory=list)
@dataclass
class CodeChange:
file_path: str
change_type: str # added, modified, deleted
lines_changed: int
components: Set[str]
class RegressionTestSelector:
def __init__(self):
self.test_impacts: Dict[str, TestImpact] = {}
self.component_graph: Dict[str, Set[str]] = {}
self.risk_weights = {
"file_complexity": 0.3,
"change_frequency": 0.2,
"test_failure_rate": 0.3,
"component_criticality": 0.2
}
def register_test(self, test_id: str, components: Set[str],
execution_time: float):
"""注册测试"""
self.test_impacts[test_id] = TestImpact(
test_id=test_id,
affected_components=components,
risk_score=0.0,
execution_time=execution_time
)
def register_component_dependency(self, component: str,
dependencies: Set[str]):
"""注册组件依赖"""
self.component_graph[component] = dependencies
def analyze_changes(self, changes: List[CodeChange]) -> Set[str]:
"""分析代码变更影响"""
affected_components = set()
for change in changes:
affected_components.update(change.components)
# 扩展到依赖组件
expanded = self._expand_dependencies(affected_components)
return expanded
def _expand_dependencies(self, components: Set[str]) -> Set[str]:
"""扩展依赖关系"""
expanded = set(components)
queue = list(components)
while queue:
component = queue.pop(0)
deps = self.component_graph.get(component, set())
for dep in deps:
if dep not in expanded:
expanded.add(dep)
queue.append(dep)
return expanded
def calculate_risk_scores(self, changes: List[CodeChange]) -> Dict[str, float]:
"""计算风险分数"""
affected_components = self.analyze_changes(changes)
scores = {}
for test_id, impact in self.test_impacts.items():
# 计算组件重叠度
component_overlap = len(impact.affected_components & affected_components)
component_score = component_overlap / max(len(impact.affected_components), 1)
# 计算失败历史分数
failure_rate = sum(impact.failure_history) / max(len(impact.failure_history), 1)
# 计算综合风险分数
risk_score = (
component_score * self.risk_weights["component_criticality"] +
failure_rate * self.risk_weights["test_failure_rate"]
)
impact.risk_score = risk_score
scores[test_id] = risk_score
return scores
def select_tests(self, changes: List[CodeChange],
max_tests: int = None,
max_time: int = None) -> List[str]:
"""选择回归测试"""
# 计算风险分数
self.calculate_risk_scores(changes)
# 按风险分数排序
sorted_tests = sorted(
self.test_impacts.items(),
key=lambda x: x[1].risk_score,
reverse=True
)
selected = []
total_time = 0
for test_id, impact in sorted_tests:
if max_tests and len(selected) >= max_tests:
break
if max_time and total_time + impact.execution_time > max_time:
continue
selected.append(test_id)
total_time += impact.execution_time
return selected
def record_test_result(self, test_id: str, passed: bool):
"""记录测试结果"""
if test_id in self.test_impacts:
self.test_impacts[test_id].failure_history.append(passed)
# 保持最近100次结果
if len(self.test_impacts[test_id].failure_history) > 100:
self.test_impacts[test_id].failure_history = \
self.test_impacts[test_id].failure_history[-100:]
# 使用示例
selector = RegressionTestSelector()
# 注册测试
selector.register_test("test_user_login", {"auth", "user"}, 2.5)
selector.register_test("test_order_creation", {"order", "payment"}, 5.0)
selector.register_test("test_inventory_update", {"inventory", "product"}, 3.0)
# 注册组件依赖
selector.register_component_dependency("order", {"payment", "inventory"})
selector.register_component_dependency("payment", {"user"})
# 分析变更
changes = [
CodeChange("order/service.py", "modified", 50, {"order"}),
CodeChange("payment/gateway.py", "modified", 30, {"payment"})
]
# 选择测试
selected = selector.select_tests(changes, max_time=10)
print(f"Selected tests: {selected}")
变更影响分析
变更影响分析追踪代码变更如何影响测试用例。通过代码-测试映射关系,快速定位需要重新执行的测试。
# 变更影响分析器
from typing import Dict, List, Set
from collections import defaultdict
class ChangeImpactAnalyzer:
def __init__(self):
self.code_to_tests: Dict[str, Set[str]] = defaultdict(set)
self.test_to_code: Dict[str, Set[str]] = defaultdict(set)
self.test_metadata: Dict[str, Dict] = {}
def register_mapping(self, test_id: str, code_files: Set[str]):
"""注册代码-测试映射"""
for file_path in code_files:
self.code_to_tests[file_path].add(test_id)
self.test_to_code[test_id].add(file_path)
def register_test_metadata(self, test_id: str, metadata: Dict):
"""注册测试元数据"""
self.test_metadata[test_id] = metadata
def analyze_impact(self, changed_files: Set[str]) -> Dict[str, Set[str]]:
"""分析变更影响"""
impacted_tests = defaultdict(set)
for file_path in changed_files:
# 直接影响的测试
direct_tests = self.code_to_tests.get(file_path, set())
for test_id in direct_tests:
impacted_tests[test_id].add("direct")
# 扩展影响(通过测试间依赖)
expanded = self._expand_test_dependencies(impacted_tests)
return expanded
def _expand_test_dependencies(self,
impacted: Dict[str, Set[str]]) -> Dict[str, Set[str]]:
"""扩展测试依赖"""
# 简化实现:实际应考虑测试套件依赖关系
return impacted
def get_impact_report(self, changed_files: Set[str]) -> Dict:
"""生成影响报告"""
impacted = self.analyze_impact(changed_files)
report = {
"changed_files": list(changed_files),
"total_impacted_tests": len(impacted),
"tests_by_impact": {
"direct": [],
"indirect": []
},
"summary": {}
}
for test_id, impact_types in impacted.items():
if "direct" in impact_types:
report["tests_by_impact"]["direct"].append(test_id)
else:
report["tests_by_impact"]["indirect"].append(test_id)
# 添加测试详情
report["test_details"] = []
for test_id in impacted:
metadata = self.test_metadata.get(test_id, {})
report["test_details"].append({
"test_id": test_id,
"impact_types": list(impacted[test_id]),
"execution_time": metadata.get("execution_time", 0),
"risk_level": metadata.get("risk_level", "unknown")
})
return report
# 代码覆盖率分析
class CoverageAnalyzer:
"""代码覆盖率分析"""
def __init__(self):
self.coverage_data: Dict[str, Dict[str, bool]] = defaultdict(dict)
def record_coverage(self, test_id: str, file_path: str,
lines: List[int]):
"""记录覆盖率"""
for line in lines:
self.coverage_data[file_path][f"{test_id}:{line}"] = True
def get_file_coverage(self, file_path: str) -> Dict:
"""获取文件覆盖率"""
lines = self.coverage_data.get(file_path, {})
if not lines:
return {"coverage": 0, "total_lines": 0, "covered_lines": 0}
# 按行号分组
covered_lines = set()
for key in lines:
test_id, line = key.split(":")
covered_lines.add(int(line))
return {
"coverage": len(covered_lines) / max(len(lines), 1),
"total_lines": len(lines),
"covered_lines": len(covered_lines)
}
def get_test_coverage(self, test_id: str) -> Dict:
"""获取测试覆盖率"""
covered_files = []
for file_path, lines in self.coverage_data.items():
test_lines = [k for k in lines.keys() if k.startswith(f"{test_id}:")]
if test_lines:
covered_files.append({
"file": file_path,
"lines": len(test_lines)
})
return {
"test_id": test_id,
"files_covered": len(covered_files),
"details": covered_files
}
def find_uncovered_code(self, file_path: str) -> List[int]:
"""查找未覆盖的代码"""
lines = self.coverage_data.get(file_path, {})
covered_lines = set()
for key in lines:
_, line = key.split(":")
covered_lines.add(int(line))
# 假设代码行号从1到某个最大值
max_line = max(covered_lines) if covered_lines else 0
return [i for i in range(1, max_line + 1) if i not in covered_lines]
# 测试优先级排序
class TestPrioritizer:
"""测试优先级排序"""
def __init__(self):
self.test_history: Dict[str, List[Dict]] = defaultdict(list)
def record_execution(self, test_id: str, passed: bool,
execution_time: float, timestamp: float):
"""记录执行历史"""
self.test_history[test_id].append({
"passed": passed,
"execution_time": execution_time,
"timestamp": timestamp
})
def calculate_priority(self, test_id: str) -> float:
"""计算测试优先级"""
history = self.test_history.get(test_id, [])
if not history:
return 0.5 # 默认中等优先级
# 计算失败率
failure_rate = sum(1 for h in history if not h["passed"]) / len(history)
# 计算平均执行时间
avg_time = sum(h["execution_time"] for h in history) / len(history)
# 计算优先级(失败率高、执行时间短的优先)
priority = failure_rate * 0.7 + (1 / (avg_time + 1)) * 0.3
return min(max(priority, 0), 1) # 限制在0-1范围
def prioritize_tests(self, test_ids: List[str]) -> List[str]:
"""对测试进行优先级排序"""
prioritized = sorted(
test_ids,
key=lambda x: self.calculate_priority(x),
reverse=True
)
return prioritized
# 智能测试选择
class SmartTestSelector:
"""智能测试选择器"""
def __init__(self):
self.impact_analyzer = ChangeImpactAnalyzer()
self.prioritizer = TestPrioritizer()
self.coverage_analyzer = CoverageAnalyzer()
def select_optimal_tests(self, changed_files: Set[str],
time_budget: int,
coverage_target: float = 0.8) -> Dict:
"""选择最优测试集"""
# 获取受影响的测试
impacted = self.impact_analyzer.analyze_impact(changed_files)
# 对测试进行优先级排序
prioritized = self.prioritizer.prioritize_tests(list(impacted.keys()))
# 选择测试直到满足时间预算
selected = []
total_time = 0
for test_id in prioritized:
if total_time >= time_budget:
break
# 估计执行时间
test_time = self._estimate_execution_time(test_id)
if total_time + test_time <= time_budget:
selected.append(test_id)
total_time += test_time
# 计算覆盖率
coverage = self._calculate_selection_coverage(selected, changed_files)
return {
"selected_tests": selected,
"total_time": total_time,
"time_budget": time_budget,
"estimated_coverage": coverage,
"meets_target": coverage >= coverage_target
}
def _estimate_execution_time(self, test_id: str) -> float:
"""估计测试执行时间"""
# 简化:使用历史平均时间
history = self.prioritizer.test_history.get(test_id, [])
if history:
return sum(h["execution_time"] for h in history) / len(history)
return 5.0 # 默认5秒
def _calculate_selection_coverage(self, selected: List[str],
changed_files: Set[str]) -> float:
"""计算选择覆盖率"""
if not changed_files:
return 1.0
covered_files = set()
for test_id in selected:
test_files = self.impact_analyzer.test_to_code.get(test_id, set())
covered_files.update(test_files & changed_files)
return len(covered_files) / len(changed_files) if changed_files else 1.0
回归测试优化
回归测试优化通过并行执行、测试聚合和智能调度提高测试效率。
# 回归测试优化器
from typing import List, Dict, Set
from concurrent.futures import ThreadPoolExecutor, as_completed
import heapq
class RegressionTestOptimizer:
"""回归测试优化器"""
def __init__(self, max_parallel: int = 4):
self.max_parallel = max_parallel
self.test_groups: Dict[str, List[str]] = {}
def group_tests(self, tests: List[str],
grouping_fn: Callable) -> Dict[str, List[str]]:
"""对测试进行分组"""
groups = defaultdict(list)
for test in tests:
group_key = grouping_fn(test)
groups[group_key].append(test)
self.test_groups = dict(groups)
return self.test_groups
def parallelize_execution(self, tests: List[str],
test_fn: Callable) -> List[Dict]:
"""并行执行测试"""
results = []
with ThreadPoolExecutor(max_workers=self.max_parallel) as executor:
future_to_test = {
executor.submit(test_fn, test): test
for test in tests
}
for future in as_completed(future_to_test):
test = future_to_test[future]
try:
result = future.result()
results.append({
"test": test,
"status": "passed",
"result": result
})
except Exception as e:
results.append({
"test": test,
"status": "failed",
"error": str(e)
})
return results
def schedule_tests(self, tests: List[Dict[str, float]],
max_time: int) -> List[List[str]]:
"""调度测试(装箱问题)"""
# 按执行时间降序排序
sorted_tests = sorted(tests, key=lambda x: x["time"], reverse=True)
bins = []
current_bin = []
current_time = 0
for test in sorted_tests:
if current_time + test["time"] <= max_time:
current_bin.append(test["id"])
current_time += test["time"]
else:
if current_bin:
bins.append(current_bin)
current_bin = [test["id"]]
current_time = test["time"]
if current_bin:
bins.append(current_bin)
return bins
def optimize_test_order(self, tests: List[str],
dependency_graph: Dict[str, Set[str]]) -> List[str]:
"""优化测试执行顺序"""
# 拓扑排序
in_degree = defaultdict(int)
for test in tests:
in_degree[test] = len(dependency_graph.get(test, set()))
queue = [test for test in tests if in_degree[test] == 0]
result = []
while queue:
# 选择执行时间最短的(启发式)
current = min(queue, key=lambda x: self._estimate_time(x))
result.append(current)
queue.remove(current)
# 更新入度
for test in tests:
if current in dependency_graph.get(test, set()):
in_degree[test] -= 1
if in_degree[test] == 0:
queue.append(test)
return result
def _estimate_time(self, test_id: str) -> float:
"""估计测试执行时间"""
# 简化:返回默认值
return 5.0
# 测试结果缓存
class TestResultCache:
"""测试结果缓存"""
def __init__(self):
self.cache: Dict[str, Dict] = {}
def get_cached_result(self, test_id: str,
code_hash: str) -> Optional[Dict]:
"""获取缓存结果"""
cached = self.cache.get(test_id)
if cached and cached.get("code_hash") == code_hash:
return cached.get("result")
return None
def cache_result(self, test_id: str, code_hash: str,
result: Dict):
"""缓存测试结果"""
self.cache[test_id] = {
"code_hash": code_hash,
"result": result,
"timestamp": time.time()
}
def invalidate(self, test_ids: List[str]):
"""使缓存失效"""
for test_id in test_ids:
if test_id in self.cache:
del self.cache[test_id]
def get_cache_stats(self) -> Dict:
"""获取缓存统计"""
return {
"total_cached": len(self.cache),
"cache_size_mb": sum(
len(str(v)) for v in self.cache.values()
) / 1024 / 1024
}
# 持续回归测试
class ContinuousRegressionTest:
"""持续回归测试"""
def __init__(self):
self.selector = SmartTestSelector()
self.optimizer = RegressionTestOptimizer()
self.cache = TestResultCache()
def on_code_change(self, changed_files: Set[str],
time_budget: int = 600) -> Dict:
"""代码变更时触发"""
# 计算代码哈希
code_hash = self._calculate_code_hash(changed_files)
# 选择测试
selection = self.selector.select_optimal_tests(
changed_files, time_budget
)
# 检查缓存
tests_to_run = []
cached_results = []
for test_id in selection["selected_tests"]:
cached = self.cache.get_cached_result(test_id, code_hash)
if cached:
cached_results.append({"test": test_id, "result": cached})
else:
tests_to_run.append(test_id)
# 执行测试
if tests_to_run:
results = self.optimizer.parallelize_execution(
tests_to_run, self._run_test
)
# 缓存结果
for result in results:
self.cache.cache_result(
result["test"], code_hash, result
)
else:
results = []
return {
"total_selected": len(selection["selected_tests"]),
"from_cache": len(cached_results),
"executed": len(tests_to_run),
"results": results + cached_results,
"estimated_coverage": selection["estimated_coverage"]
}
def _calculate_code_hash(self, files: Set[str]) -> str:
"""计算代码哈希"""
import hashlib
hasher = hashlib.md5()
for file_path in sorted(files):
hasher.update(file_path.encode())
return hasher.hexdigest()
def _run_test(self, test_id: str) -> Dict:
"""运行测试"""
# 简化:实际应执行测试
return {"status": "passed", "test": test_id}
# 回归测试报告
class RegressionTestReport:
"""回归测试报告"""
def __init__(self, results: List[Dict]):
self.results = results
def generate_summary(self) -> Dict:
"""生成摘要"""
total = len(self.results)
passed = sum(1 for r in self.results if r.get("status") == "passed")
return {
"total": total,
"passed": passed,
"failed": total - passed,
"pass_rate": passed / total if total > 0 else 0
}
def generate_html_report(self) -> str:
"""生成HTML报告"""
summary = self.generate_summary()
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Regression Test Report</title>
<style>
.passed {{ color: green; }}
.failed {{ color: red; }}
.summary {{ font-size: 18px; }}
</style>
</head>
<body>
<h1>Regression Test Report</h1>
<div class="summary">
<h2>Summary</h2>
<p>Total: {summary['total']}</p>
<p class="passed">Passed: {summary['passed']}</p>
<p class="failed">Failed: {summary['failed']}</p>
<p>Pass Rate: {summary['pass_rate']:.1%}</p>
</div>
<h2>Test Results</h2>
<table>
<tr><th>Test</th><th>Status</th><th>Time</th></tr>
"""
for result in self.results:
status_class = "passed" if result.get("status") == "passed" else "failed"
html += f"""
<tr>
<td>{result.get('test', 'Unknown')}</td>
<td class="{status_class}">{result.get('status', 'Unknown')}</td>
<td>{result.get('time', 0):.2f}s</td>
</tr>
"""
html += """
</table>
</body>
</html>
"""
return html