← 返回首页
🧪

回归测试架构:风险分析与选择性执行

📂 architecture ⏱ 8 min 1499 words

回归测试架构:风险分析与选择性执行

回归测试策略

回归测试确保代码变更没有引入新的缺陷。随着代码库增长,全量回归测试变得不可行,需要智能策略选择最有价值的测试执行。核心策略包括:基于风险的选择、变更影响分析和测试优先级排序。

# 回归测试框架
from dataclasses import dataclass, field
from typing import Dict, List, Set, Callable, Optional
from enum import Enum
import time

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class TestImpact:
    test_id: str
    affected_components: Set[str]
    risk_score: float
    execution_time: float
    last_run: Optional[float] = None
    failure_history: List[bool] = field(default_factory=list)

@dataclass
class CodeChange:
    file_path: str
    change_type: str  # added, modified, deleted
    lines_changed: int
    components: Set[str]

class RegressionTestSelector:
    def __init__(self):
        self.test_impacts: Dict[str, TestImpact] = {}
        self.component_graph: Dict[str, Set[str]] = {}
        self.risk_weights = {
            "file_complexity": 0.3,
            "change_frequency": 0.2,
            "test_failure_rate": 0.3,
            "component_criticality": 0.2
        }
    
    def register_test(self, test_id: str, components: Set[str],
                     execution_time: float):
        """注册测试"""
        self.test_impacts[test_id] = TestImpact(
            test_id=test_id,
            affected_components=components,
            risk_score=0.0,
            execution_time=execution_time
        )
    
    def register_component_dependency(self, component: str,
                                     dependencies: Set[str]):
        """注册组件依赖"""
        self.component_graph[component] = dependencies
    
    def analyze_changes(self, changes: List[CodeChange]) -> Set[str]:
        """分析代码变更影响"""
        affected_components = set()
        
        for change in changes:
            affected_components.update(change.components)
        
        # 扩展到依赖组件
        expanded = self._expand_dependencies(affected_components)
        
        return expanded
    
    def _expand_dependencies(self, components: Set[str]) -> Set[str]:
        """扩展依赖关系"""
        expanded = set(components)
        queue = list(components)
        
        while queue:
            component = queue.pop(0)
            deps = self.component_graph.get(component, set())
            
            for dep in deps:
                if dep not in expanded:
                    expanded.add(dep)
                    queue.append(dep)
        
        return expanded
    
    def calculate_risk_scores(self, changes: List[CodeChange]) -> Dict[str, float]:
        """计算风险分数"""
        affected_components = self.analyze_changes(changes)
        
        scores = {}
        for test_id, impact in self.test_impacts.items():
            # 计算组件重叠度
            component_overlap = len(impact.affected_components & affected_components)
            component_score = component_overlap / max(len(impact.affected_components), 1)
            
            # 计算失败历史分数
            failure_rate = sum(impact.failure_history) / max(len(impact.failure_history), 1)
            
            # 计算综合风险分数
            risk_score = (
                component_score * self.risk_weights["component_criticality"] +
                failure_rate * self.risk_weights["test_failure_rate"]
            )
            
            impact.risk_score = risk_score
            scores[test_id] = risk_score
        
        return scores
    
    def select_tests(self, changes: List[CodeChange],
                    max_tests: int = None,
                    max_time: int = None) -> List[str]:
        """选择回归测试"""
        # 计算风险分数
        self.calculate_risk_scores(changes)
        
        # 按风险分数排序
        sorted_tests = sorted(
            self.test_impacts.items(),
            key=lambda x: x[1].risk_score,
            reverse=True
        )
        
        selected = []
        total_time = 0
        
        for test_id, impact in sorted_tests:
            if max_tests and len(selected) >= max_tests:
                break
            
            if max_time and total_time + impact.execution_time > max_time:
                continue
            
            selected.append(test_id)
            total_time += impact.execution_time
        
        return selected
    
    def record_test_result(self, test_id: str, passed: bool):
        """记录测试结果"""
        if test_id in self.test_impacts:
            self.test_impacts[test_id].failure_history.append(passed)
            # 保持最近100次结果
            if len(self.test_impacts[test_id].failure_history) > 100:
                self.test_impacts[test_id].failure_history = \
                    self.test_impacts[test_id].failure_history[-100:]

# 使用示例
selector = RegressionTestSelector()

# 注册测试
selector.register_test("test_user_login", {"auth", "user"}, 2.5)
selector.register_test("test_order_creation", {"order", "payment"}, 5.0)
selector.register_test("test_inventory_update", {"inventory", "product"}, 3.0)

# 注册组件依赖
selector.register_component_dependency("order", {"payment", "inventory"})
selector.register_component_dependency("payment", {"user"})

# 分析变更
changes = [
    CodeChange("order/service.py", "modified", 50, {"order"}),
    CodeChange("payment/gateway.py", "modified", 30, {"payment"})
]

# 选择测试
selected = selector.select_tests(changes, max_time=10)
print(f"Selected tests: {selected}")

变更影响分析

变更影响分析追踪代码变更如何影响测试用例。通过代码-测试映射关系,快速定位需要重新执行的测试。

# 变更影响分析器
from typing import Dict, List, Set
from collections import defaultdict

class ChangeImpactAnalyzer:
    def __init__(self):
        self.code_to_tests: Dict[str, Set[str]] = defaultdict(set)
        self.test_to_code: Dict[str, Set[str]] = defaultdict(set)
        self.test_metadata: Dict[str, Dict] = {}
    
    def register_mapping(self, test_id: str, code_files: Set[str]):
        """注册代码-测试映射"""
        for file_path in code_files:
            self.code_to_tests[file_path].add(test_id)
            self.test_to_code[test_id].add(file_path)
    
    def register_test_metadata(self, test_id: str, metadata: Dict):
        """注册测试元数据"""
        self.test_metadata[test_id] = metadata
    
    def analyze_impact(self, changed_files: Set[str]) -> Dict[str, Set[str]]:
        """分析变更影响"""
        impacted_tests = defaultdict(set)
        
        for file_path in changed_files:
            # 直接影响的测试
            direct_tests = self.code_to_tests.get(file_path, set())
            
            for test_id in direct_tests:
                impacted_tests[test_id].add("direct")
        
        # 扩展影响(通过测试间依赖)
        expanded = self._expand_test_dependencies(impacted_tests)
        
        return expanded
    
    def _expand_test_dependencies(self, 
                                 impacted: Dict[str, Set[str]]) -> Dict[str, Set[str]]:
        """扩展测试依赖"""
        # 简化实现:实际应考虑测试套件依赖关系
        return impacted
    
    def get_impact_report(self, changed_files: Set[str]) -> Dict:
        """生成影响报告"""
        impacted = self.analyze_impact(changed_files)
        
        report = {
            "changed_files": list(changed_files),
            "total_impacted_tests": len(impacted),
            "tests_by_impact": {
                "direct": [],
                "indirect": []
            },
            "summary": {}
        }
        
        for test_id, impact_types in impacted.items():
            if "direct" in impact_types:
                report["tests_by_impact"]["direct"].append(test_id)
            else:
                report["tests_by_impact"]["indirect"].append(test_id)
        
        # 添加测试详情
        report["test_details"] = []
        for test_id in impacted:
            metadata = self.test_metadata.get(test_id, {})
            report["test_details"].append({
                "test_id": test_id,
                "impact_types": list(impacted[test_id]),
                "execution_time": metadata.get("execution_time", 0),
                "risk_level": metadata.get("risk_level", "unknown")
            })
        
        return report

# 代码覆盖率分析
class CoverageAnalyzer:
    """代码覆盖率分析"""
    
    def __init__(self):
        self.coverage_data: Dict[str, Dict[str, bool]] = defaultdict(dict)
    
    def record_coverage(self, test_id: str, file_path: str,
                       lines: List[int]):
        """记录覆盖率"""
        for line in lines:
            self.coverage_data[file_path][f"{test_id}:{line}"] = True
    
    def get_file_coverage(self, file_path: str) -> Dict:
        """获取文件覆盖率"""
        lines = self.coverage_data.get(file_path, {})
        
        if not lines:
            return {"coverage": 0, "total_lines": 0, "covered_lines": 0}
        
        # 按行号分组
        covered_lines = set()
        for key in lines:
            test_id, line = key.split(":")
            covered_lines.add(int(line))
        
        return {
            "coverage": len(covered_lines) / max(len(lines), 1),
            "total_lines": len(lines),
            "covered_lines": len(covered_lines)
        }
    
    def get_test_coverage(self, test_id: str) -> Dict:
        """获取测试覆盖率"""
        covered_files = []
        
        for file_path, lines in self.coverage_data.items():
            test_lines = [k for k in lines.keys() if k.startswith(f"{test_id}:")]
            if test_lines:
                covered_files.append({
                    "file": file_path,
                    "lines": len(test_lines)
                })
        
        return {
            "test_id": test_id,
            "files_covered": len(covered_files),
            "details": covered_files
        }
    
    def find_uncovered_code(self, file_path: str) -> List[int]:
        """查找未覆盖的代码"""
        lines = self.coverage_data.get(file_path, {})
        
        covered_lines = set()
        for key in lines:
            _, line = key.split(":")
            covered_lines.add(int(line))
        
        # 假设代码行号从1到某个最大值
        max_line = max(covered_lines) if covered_lines else 0
        
        return [i for i in range(1, max_line + 1) if i not in covered_lines]

# 测试优先级排序
class TestPrioritizer:
    """测试优先级排序"""
    
    def __init__(self):
        self.test_history: Dict[str, List[Dict]] = defaultdict(list)
    
    def record_execution(self, test_id: str, passed: bool,
                        execution_time: float, timestamp: float):
        """记录执行历史"""
        self.test_history[test_id].append({
            "passed": passed,
            "execution_time": execution_time,
            "timestamp": timestamp
        })
    
    def calculate_priority(self, test_id: str) -> float:
        """计算测试优先级"""
        history = self.test_history.get(test_id, [])
        
        if not history:
            return 0.5  # 默认中等优先级
        
        # 计算失败率
        failure_rate = sum(1 for h in history if not h["passed"]) / len(history)
        
        # 计算平均执行时间
        avg_time = sum(h["execution_time"] for h in history) / len(history)
        
        # 计算优先级(失败率高、执行时间短的优先)
        priority = failure_rate * 0.7 + (1 / (avg_time + 1)) * 0.3
        
        return min(max(priority, 0), 1)  # 限制在0-1范围
    
    def prioritize_tests(self, test_ids: List[str]) -> List[str]:
        """对测试进行优先级排序"""
        prioritized = sorted(
            test_ids,
            key=lambda x: self.calculate_priority(x),
            reverse=True
        )
        
        return prioritized

# 智能测试选择
class SmartTestSelector:
    """智能测试选择器"""
    
    def __init__(self):
        self.impact_analyzer = ChangeImpactAnalyzer()
        self.prioritizer = TestPrioritizer()
        self.coverage_analyzer = CoverageAnalyzer()
    
    def select_optimal_tests(self, changed_files: Set[str],
                            time_budget: int,
                            coverage_target: float = 0.8) -> Dict:
        """选择最优测试集"""
        # 获取受影响的测试
        impacted = self.impact_analyzer.analyze_impact(changed_files)
        
        # 对测试进行优先级排序
        prioritized = self.prioritizer.prioritize_tests(list(impacted.keys()))
        
        # 选择测试直到满足时间预算
        selected = []
        total_time = 0
        
        for test_id in prioritized:
            if total_time >= time_budget:
                break
            
            # 估计执行时间
            test_time = self._estimate_execution_time(test_id)
            
            if total_time + test_time <= time_budget:
                selected.append(test_id)
                total_time += test_time
        
        # 计算覆盖率
        coverage = self._calculate_selection_coverage(selected, changed_files)
        
        return {
            "selected_tests": selected,
            "total_time": total_time,
            "time_budget": time_budget,
            "estimated_coverage": coverage,
            "meets_target": coverage >= coverage_target
        }
    
    def _estimate_execution_time(self, test_id: str) -> float:
        """估计测试执行时间"""
        # 简化:使用历史平均时间
        history = self.prioritizer.test_history.get(test_id, [])
        if history:
            return sum(h["execution_time"] for h in history) / len(history)
        return 5.0  # 默认5秒
    
    def _calculate_selection_coverage(self, selected: List[str],
                                     changed_files: Set[str]) -> float:
        """计算选择覆盖率"""
        if not changed_files:
            return 1.0
        
        covered_files = set()
        for test_id in selected:
            test_files = self.impact_analyzer.test_to_code.get(test_id, set())
            covered_files.update(test_files & changed_files)
        
        return len(covered_files) / len(changed_files) if changed_files else 1.0

回归测试优化

回归测试优化通过并行执行、测试聚合和智能调度提高测试效率。

# 回归测试优化器
from typing import List, Dict, Set
from concurrent.futures import ThreadPoolExecutor, as_completed
import heapq

class RegressionTestOptimizer:
    """回归测试优化器"""
    
    def __init__(self, max_parallel: int = 4):
        self.max_parallel = max_parallel
        self.test_groups: Dict[str, List[str]] = {}
    
    def group_tests(self, tests: List[str], 
                   grouping_fn: Callable) -> Dict[str, List[str]]:
        """对测试进行分组"""
        groups = defaultdict(list)
        
        for test in tests:
            group_key = grouping_fn(test)
            groups[group_key].append(test)
        
        self.test_groups = dict(groups)
        return self.test_groups
    
    def parallelize_execution(self, tests: List[str],
                             test_fn: Callable) -> List[Dict]:
        """并行执行测试"""
        results = []
        
        with ThreadPoolExecutor(max_workers=self.max_parallel) as executor:
            future_to_test = {
                executor.submit(test_fn, test): test
                for test in tests
            }
            
            for future in as_completed(future_to_test):
                test = future_to_test[future]
                try:
                    result = future.result()
                    results.append({
                        "test": test,
                        "status": "passed",
                        "result": result
                    })
                except Exception as e:
                    results.append({
                        "test": test,
                        "status": "failed",
                        "error": str(e)
                    })
        
        return results
    
    def schedule_tests(self, tests: List[Dict[str, float]],
                      max_time: int) -> List[List[str]]:
        """调度测试(装箱问题)"""
        # 按执行时间降序排序
        sorted_tests = sorted(tests, key=lambda x: x["time"], reverse=True)
        
        bins = []
        current_bin = []
        current_time = 0
        
        for test in sorted_tests:
            if current_time + test["time"] <= max_time:
                current_bin.append(test["id"])
                current_time += test["time"]
            else:
                if current_bin:
                    bins.append(current_bin)
                current_bin = [test["id"]]
                current_time = test["time"]
        
        if current_bin:
            bins.append(current_bin)
        
        return bins
    
    def optimize_test_order(self, tests: List[str],
                           dependency_graph: Dict[str, Set[str]]) -> List[str]:
        """优化测试执行顺序"""
        # 拓扑排序
        in_degree = defaultdict(int)
        for test in tests:
            in_degree[test] = len(dependency_graph.get(test, set()))
        
        queue = [test for test in tests if in_degree[test] == 0]
        result = []
        
        while queue:
            # 选择执行时间最短的(启发式)
            current = min(queue, key=lambda x: self._estimate_time(x))
            result.append(current)
            queue.remove(current)
            
            # 更新入度
            for test in tests:
                if current in dependency_graph.get(test, set()):
                    in_degree[test] -= 1
                    if in_degree[test] == 0:
                        queue.append(test)
        
        return result
    
    def _estimate_time(self, test_id: str) -> float:
        """估计测试执行时间"""
        # 简化:返回默认值
        return 5.0

# 测试结果缓存
class TestResultCache:
    """测试结果缓存"""
    
    def __init__(self):
        self.cache: Dict[str, Dict] = {}
    
    def get_cached_result(self, test_id: str, 
                         code_hash: str) -> Optional[Dict]:
        """获取缓存结果"""
        cached = self.cache.get(test_id)
        
        if cached and cached.get("code_hash") == code_hash:
            return cached.get("result")
        
        return None
    
    def cache_result(self, test_id: str, code_hash: str,
                    result: Dict):
        """缓存测试结果"""
        self.cache[test_id] = {
            "code_hash": code_hash,
            "result": result,
            "timestamp": time.time()
        }
    
    def invalidate(self, test_ids: List[str]):
        """使缓存失效"""
        for test_id in test_ids:
            if test_id in self.cache:
                del self.cache[test_id]
    
    def get_cache_stats(self) -> Dict:
        """获取缓存统计"""
        return {
            "total_cached": len(self.cache),
            "cache_size_mb": sum(
                len(str(v)) for v in self.cache.values()
            ) / 1024 / 1024
        }

# 持续回归测试
class ContinuousRegressionTest:
    """持续回归测试"""
    
    def __init__(self):
        self.selector = SmartTestSelector()
        self.optimizer = RegressionTestOptimizer()
        self.cache = TestResultCache()
    
    def on_code_change(self, changed_files: Set[str],
                      time_budget: int = 600) -> Dict:
        """代码变更时触发"""
        # 计算代码哈希
        code_hash = self._calculate_code_hash(changed_files)
        
        # 选择测试
        selection = self.selector.select_optimal_tests(
            changed_files, time_budget
        )
        
        # 检查缓存
        tests_to_run = []
        cached_results = []
        
        for test_id in selection["selected_tests"]:
            cached = self.cache.get_cached_result(test_id, code_hash)
            if cached:
                cached_results.append({"test": test_id, "result": cached})
            else:
                tests_to_run.append(test_id)
        
        # 执行测试
        if tests_to_run:
            results = self.optimizer.parallelize_execution(
                tests_to_run, self._run_test
            )
            
            # 缓存结果
            for result in results:
                self.cache.cache_result(
                    result["test"], code_hash, result
                )
        else:
            results = []
        
        return {
            "total_selected": len(selection["selected_tests"]),
            "from_cache": len(cached_results),
            "executed": len(tests_to_run),
            "results": results + cached_results,
            "estimated_coverage": selection["estimated_coverage"]
        }
    
    def _calculate_code_hash(self, files: Set[str]) -> str:
        """计算代码哈希"""
        import hashlib
        
        hasher = hashlib.md5()
        for file_path in sorted(files):
            hasher.update(file_path.encode())
        
        return hasher.hexdigest()
    
    def _run_test(self, test_id: str) -> Dict:
        """运行测试"""
        # 简化:实际应执行测试
        return {"status": "passed", "test": test_id}

# 回归测试报告
class RegressionTestReport:
    """回归测试报告"""
    
    def __init__(self, results: List[Dict]):
        self.results = results
    
    def generate_summary(self) -> Dict:
        """生成摘要"""
        total = len(self.results)
        passed = sum(1 for r in self.results if r.get("status") == "passed")
        
        return {
            "total": total,
            "passed": passed,
            "failed": total - passed,
            "pass_rate": passed / total if total > 0 else 0
        }
    
    def generate_html_report(self) -> str:
        """生成HTML报告"""
        summary = self.generate_summary()
        
        html = f"""
<!DOCTYPE html>
<html>
<head>
    <title>Regression Test Report</title>
    <style>
        .passed {{ color: green; }}
        .failed {{ color: red; }}
        .summary {{ font-size: 18px; }}
    </style>
</head>
<body>
    <h1>Regression Test Report</h1>
    
    <div class="summary">
        <h2>Summary</h2>
        <p>Total: {summary['total']}</p>
        <p class="passed">Passed: {summary['passed']}</p>
        <p class="failed">Failed: {summary['failed']}</p>
        <p>Pass Rate: {summary['pass_rate']:.1%}</p>
    </div>
    
    <h2>Test Results</h2>
    <table>
        <tr><th>Test</th><th>Status</th><th>Time</th></tr>
"""
        
        for result in self.results:
            status_class = "passed" if result.get("status") == "passed" else "failed"
            html += f"""
        <tr>
            <td>{result.get('test', 'Unknown')}</td>
            <td class="{status_class}">{result.get('status', 'Unknown')}</td>
            <td>{result.get('time', 0):.2f}s</td>
        </tr>
"""
        
        html += """
    </table>
</body>
</html>
"""
        return html