← 返回首页
🧪

变异测试:代码质量与测试有效性评估

📂 architecture ⏱ 7 min 1370 words

变异测试:代码质量与测试有效性评估

变异测试原理

变异测试通过修改源代码(引入变异体),检查测试套件是否能检测到这些变更。如果测试通过了变异代码,说明测试存在盲点。变异分数(杀死的变异体比例)衡量测试的有效性。

# 变异测试核心框架
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
from enum import Enum
import ast
import copy

class MutationType(Enum):
    ARITHMETIC = "arithmetic"      # 算术运算符变异
    RELATIONAL = "relational"      # 关系运算符变异
    LOGICAL = "logical"            # 逻辑运算符变异
    BOUNDARY = "boundary"          # 边界值变异
    RETURN = "return"              # 返回值变异
    EXCEPTION = "exception"        # 异常变异

@dataclass
class Mutation:
    id: str
    mutation_type: MutationType
    original_code: str
    mutated_code: str
    location: Dict[str, int]  # line, column
    description: str

@dataclass
class MutationResult:
    mutation: Mutation
    killed: bool  # 测试是否检测到变异
    test_name: Optional[str]  # 杀死变异的测试
    execution_time: float = 0.0

class MutantGenerator:
    def __init__(self):
        self.mutation_operators = {
            MutationType.ARITHMETIC: self._arithmetic_mutations,
            MutationType.RELATIONAL: self._relational_mutations,
            MutationType.LOGICAL: self._logical_mutations,
            MutationType.BOUNDARY: self._boundary_mutations,
        }
    
    def generate_mutations(self, source_code: str, 
                          allowed_types: List[MutationType] = None) -> List[Mutation]:
        """生成变异体"""
        mutations = []
        
        # 解析AST
        tree = ast.parse(source_code)
        
        for mutation_type in (allowed_types or MutationType):
            generator = self.mutation_operators.get(mutation_type)
            if generator:
                mutations.extend(generator(tree, source_code))
        
        return mutations
    
    def _arithmetic_mutations(self, tree: ast.AST, 
                             source_code: str) -> List[Mutation]:
        """算术运算符变异"""
        mutations = []
        
        for node in ast.walk(tree):
            if isinstance(node, ast.BinOp):
                if isinstance(node.op, ast.Add):
                    mutated = self._replace_operator(source_code, node, "-")
                    mutations.append(Mutation(
                        id=f"mut_{node.lineno}_{node.col_offset}",
                        mutation_type=MutationType.ARITHMETIC,
                        original_code="+",
                        mutated_code="-",
                        location={"line": node.lineno, "col": node.col_offset},
                        description="Changed + to -"
                    ))
                elif isinstance(node.op, ast.Sub):
                    mutated = self._replace_operator(source_code, node, "+")
                    mutations.append(Mutation(
                        id=f"mut_{node.lineno}_{node.col_offset}",
                        mutation_type=MutationType.ARITHMETIC,
                        original_code="-",
                        mutated_code="+",
                        location={"line": node.lineno, "col": node.col_offset},
                        description="Changed - to +"
                    ))
        
        return mutations
    
    def _relational_mutations(self, tree: ast.AST,
                             source_code: str) -> List[Mutation]:
        """关系运算符变异"""
        mutations = []
        operator_map = {
            ast.Eq: ("==", "!="),
            ast.NotEq: ("!=", "=="),
            ast.Lt: ("<", ">="),
            ast.LtE: ("<=", ">"),
            ast.Gt: (">", "<="),
            ast.GtE: (">=", "<"),
        }
        
        for node in ast.walk(tree):
            if isinstance(node, ast.Compare):
                for op in node.ops:
                    if type(op) in operator_map:
                        original, mutated_op = operator_map[type(op)]
                        mutations.append(Mutation(
                            id=f"mut_{node.lineno}_{node.col_offset}",
                            mutation_type=MutationType.RELATIONAL,
                            original_code=original,
                            mutated_code=mutated_op,
                            location={"line": node.lineno, "col": node.col_offset},
                            description=f"Changed {original} to {mutated_op}"
                        ))
        
        return mutations
    
    def _logical_mutations(self, tree: ast.AST,
                          source_code: str) -> List[Mutation]:
        """逻辑运算符变异"""
        mutations = []
        
        for node in ast.walk(tree):
            if isinstance(node, ast.BoolOp):
                if isinstance(node.op, ast.And):
                    mutations.append(Mutation(
                        id=f"mut_{node.lineno}_{node.col_offset}",
                        mutation_type=MutationType.LOGICAL,
                        original_code="and",
                        mutated_code="or",
                        location={"line": node.lineno, "col": node.col_offset},
                        description="Changed and to or"
                    ))
                elif isinstance(node.op, ast.Or):
                    mutations.append(Mutation(
                        id=f"mut_{node.lineno}_{node.col_offset}",
                        mutation_type=MutationType.LOGICAL,
                        original_code="or",
                        mutated_code="and",
                        location={"line": node.lineno, "col": node.col_offset},
                        description="Changed or to and"
                    ))
        
        return mutations
    
    def _boundary_mutations(self, tree: ast.AST,
                           source_code: str) -> List[Mutation]:
        """边界值变异"""
        mutations = []
        
        for node in ast.walk(tree):
            if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
                # 变异数字常量
                if node.value == 0:
                    mutations.append(Mutation(
                        id=f"mut_{node.lineno}_{node.col_offset}",
                        mutation_type=MutationType.BOUNDARY,
                        original_code="0",
                        mutated_code="1",
                        location={"line": node.lineno, "col": node.col_offset},
                        description="Changed 0 to 1"
                    ))
                elif node.value == 1:
                    mutations.append(Mutation(
                        id=f"mut_{node.lineno}_{node.col_offset}",
                        mutation_type=MutationType.BOUNDARY,
                        original_code="1",
                        mutated_code="0",
                        location={"line": node.lineno, "col": node.col_offset},
                        description="Changed 1 to 0"
                    ))
        
        return mutations
    
    def _replace_operator(self, source_code: str, 
                         node: ast.BinOp, new_op: str) -> str:
        """替换运算符"""
        # 简化的替换逻辑
        lines = source_code.split('\n')
        if node.lineno <= len(lines):
            line = lines[node.lineno - 1]
            # 实际实现需要更精确的替换
            return source_code
        return source_code

# 变异测试执行器
class MutationTestRunner:
    def __init__(self, test_suite: Callable, target_module: Any):
        self.test_suite = test_suite
        self.target_module = target_module
        self.generator = MutantGenerator()
    
    def run_mutation_test(self, source_code: str,
                         allowed_types: List[MutationType] = None) -> Dict:
        """运行变异测试"""
        # 生成变异体
        mutations = self.generator.generate_mutations(source_code, allowed_types)
        
        results = []
        
        for mutation in mutations:
            # 应用变异
            mutated_code = self._apply_mutation(source_code, mutation)
            
            # 运行测试
            killed = self._run_tests_on_mutant(mutated_code)
            
            results.append(MutationResult(
                mutation=mutation,
                killed=killed,
                test_name="killing_test" if killed else None
            ))
        
        # 计算变异分数
        killed_count = sum(1 for r in results if r.killed)
        mutation_score = killed_count / len(results) if results else 0
        
        return {
            "total_mutations": len(mutations),
            "killed": killed_count,
            "survived": len(mutations) - killed_count,
            "mutation_score": mutation_score,
            "results": results
        }
    
    def _apply_mutation(self, source_code: str, mutation: Mutation) -> str:
        """应用变异"""
        lines = source_code.split('\n')
        line_idx = mutation.location["line"] - 1
        
        if 0 <= line_idx < len(lines):
            line = lines[line_idx]
            mutated_line = line.replace(
                mutation.original_code, 
                mutation.mutated_code, 
                1
            )
            lines[line_idx] = mutated_line
        
        return '\n'.join(lines)
    
    def _run_tests_on_mutant(self, mutated_code: str) -> bool:
        """在变异代码上运行测试"""
        # 简化:实际需要动态编译和执行
        # 返回True表示测试失败(变异被杀死)
        return False

# 变异分数分析
class MutationScoreAnalyzer:
    def __init__(self, results: List[MutationResult]):
        self.results = results
    
    def calculate_score(self) -> Dict:
        """计算变异分数"""
        total = len(self.results)
        killed = sum(1 for r in self.results if r.killed)
        
        return {
            "mutation_score": killed / total if total > 0 else 0,
            "total": total,
            "killed": killed,
            "survived": total - killed
        }
    
    def analyze_by_type(self) -> Dict:
        """按变异类型分析"""
        by_type = {}
        
        for result in self.results:
            mut_type = result.mutation.mutation_type.value
            if mut_type not in by_type:
                by_type[mut_type] = {"total": 0, "killed": 0}
            
            by_type[mut_type]["total"] += 1
            if result.killed:
                by_type[mut_type]["killed"] += 1
        
        # 计算每种类型的分数
        for mut_type, stats in by_type.items():
            stats["score"] = stats["killed"] / stats["total"] if stats["total"] > 0 else 0
        
        return by_type
    
    def identify_weak_spots(self) -> List[Dict]:
        """识别测试薄弱点"""
        weak_spots = []
        
        for result in self.results:
            if not result.killed:
                weak_spots.append({
                    "mutation_id": result.mutation.id,
                    "type": result.mutation.mutation_type.value,
                    "description": result.mutation.description,
                    "location": result.mutation.location,
                    "recommendation": self._get_recommendation(result.mutation)
                })
        
        return weak_spots
    
    def _get_recommendation(self, mutation: Mutation) -> str:
        """获取改进建议"""
        recommendations = {
            MutationType.ARITHMETIC: "添加算术运算的边界测试",
            MutationType.RELATIONAL: "添加条件分支的覆盖测试",
            MutationType.LOGICAL: "添加逻辑组合的测试用例",
            MutationType.BOUNDARY: "添加边界值测试",
        }
        
        return recommendations.get(mutation.mutation_type, "增强测试覆盖")

变异测试工具集成

变异测试可以集成到现有测试框架中,支持pytest、unittest等主流框架。

# pytest变异测试插件
import pytest
from typing import List, Dict

class MutationTestPlugin:
    """pytest变异测试插件"""
    
    def __init__(self):
        self.mutations = []
        self.results = []
    
    def pytest_collection_modifyitems(self, items):
        """修改测试收集"""
        # 标记变异测试
        for item in items:
            if "mutation" in item.nodeid:
                item.add_marker(pytest.mark.mutation)
    
    def pytest_runtest_makereport(self, item, call):
        """处理测试结果"""
        if call.when == "call" and "mutation" in item.nodeid:
            self.results.append({
                "test": item.nodeid,
                "passed": call.excinfo is None
            })

# 变异测试配置
@dataclass
class MutationTestConfig:
    """变异测试配置"""
    target_files: List[str] = field(default_factory=list)
    excluded_files: List[str] = field(default_factory=list)
    mutation_types: List[MutationType] = field(default_factory=list)
    timeout_per_mutation: int = 30
    max_mutations: int = 100
    parallel: bool = True

class MutationTestSuite:
    """变异测试套件"""
    
    def __init__(self, config: MutationTestConfig):
        self.config = config
        self.generator = MutantGenerator()
        self.results: List[MutationResult] = []
    
    def run(self, source_files: Dict[str, str]) -> Dict:
        """运行变异测试"""
        all_results = []
        
        for file_path, source_code in source_files.items():
            if self._should_mutate(file_path):
                file_results = self._mutate_file(file_path, source_code)
                all_results.extend(file_results)
        
        self.results = all_results
        
        return self._generate_report()
    
    def _should_mutate(self, file_path: str) -> bool:
        """判断是否应该对该文件进行变异"""
        # 检查排除列表
        for excluded in self.config.excluded_files:
            if excluded in file_path:
                return False
        
        # 检查目标列表
        if self.config.target_files:
            return any(target in file_path for target in self.config.target_files)
        
        return True
    
    def _mutate_file(self, file_path: str, source_code: str) -> List[MutationResult]:
        """对单个文件进行变异"""
        mutations = self.generator.generate_mutations(
            source_code, 
            self.config.mutation_types
        )
        
        # 限制变异数量
        if len(mutations) > self.config.max_mutations:
            mutations = mutations[:self.config.max_mutations]
        
        results = []
        for mutation in mutations:
            # 执行变异测试
            result = self._test_mutation(mutation)
            results.append(result)
        
        return results
    
    def _test_mutation(self, mutation: Mutation) -> MutationResult:
        """测试单个变异"""
        # 简化实现
        return MutationResult(
            mutation=mutation,
            killed=False,
            test_name=None
        )
    
    def _generate_report(self) -> Dict:
        """生成报告"""
        analyzer = MutationScoreAnalyzer(self.results)
        
        return {
            "summary": analyzer.calculate_score(),
            "by_type": analyzer.analyze_by_type(),
            "weak_spots": analyzer.identify_weak_spots()
        }

# 命令行接口
class MutationTestCLI:
    def __init__(self):
        self.parser = self._create_parser()
    
    def _create_parser(self):
        """创建命令行解析器"""
        import argparse
        
        parser = argparse.ArgumentParser(
            description="Mutation Testing Tool"
        )
        parser.add_argument(
            "target",
            help="Target file or directory"
        )
        parser.add_argument(
            "--types",
            nargs="+",
            choices=[t.value for t in MutationType],
            help="Mutation types to apply"
        )
        parser.add_argument(
            "--max-mutations",
            type=int,
            default=100,
            help="Maximum number of mutations"
        )
        parser.add_argument(
            "--timeout",
            type=int,
            default=30,
            help="Timeout per mutation (seconds)"
        )
        parser.add_argument(
            "--output",
            choices=["console", "json", "html"],
            default="console",
            help="Output format"
        )
        
        return parser
    
    def run(self, args=None):
        """运行命令"""
        parsed_args = self.parser.parse_args(args)
        
        config = MutationTestConfig(
            target_files=[parsed_args.target],
            max_mutations=parsed_args.max_mutations,
            timeout_per_mutation=parsed_args.timeout
        )
        
        if parsed_args.types:
            config.mutation_types = [MutationType(t) for t in parsed_args.types]
        
        suite = MutationTestSuite(config)
        
        # 读取目标文件
        import os
        source_files = {}
        if os.path.isfile(parsed_args.target):
            with open(parsed_args.target, 'r') as f:
                source_files[parsed_args.target] = f.read()
        
        # 运行测试
        results = suite.run(source_files)
        
        # 输出结果
        self._output_results(results, parsed_args.output)
    
    def _output_results(self, results: Dict, format: str):
        """输出结果"""
        if format == "console":
            self._print_console(results)
        elif format == "json":
            import json
            print(json.dumps(results, indent=2))
    
    def _print_console(self, results: Dict):
        """打印到控制台"""
        summary = results["summary"]
        print(f"\nMutation Testing Results:")
        print(f"  Total mutations: {summary['total']}")
        print(f"  Killed: {summary['killed']}")
        print(f"  Survived: {summary['survived']}")
        print(f"  Mutation Score: {summary['mutation_score']:.2%}")
        
        if results["weak_spots"]:
            print(f"\nWeak spots ({len(results['weak_spots'])} mutations survived):")
            for spot in results["weak_spots"][:5]:
                print(f"  - {spot['description']} at line {spot['location']['line']}")
                print(f"    Recommendation: {spot['recommendation']}")

变异测试最佳实践

变异测试需要平衡变异数量、执行时间和测试质量。以下是关键的最佳实践。

# 变异测试最佳实践
class MutationTestBestPractices:
    """变异测试最佳实践指南"""
    
    @staticmethod
    def get_target_selection_rules():
        """目标选择规则"""
        return {
            "优先级": [
                "核心业务逻辑",
                "历史缺陷多的代码",
                "复杂度高的代码",
                "新开发的功能"
            ],
            "排除": [
                "自动生成的代码",
                "简单的getter/setter",
                "配置文件",
                "第三方库代码"
            ]
        }
    
    @staticmethod
    def get_mutation_type_strategy():
        """变异类型策略"""
        return {
            "基础": [MutationType.ARITHMETIC, MutationType.RELATIONAL],
            "进阶": [MutationType.LOGICAL, MutationType.BOUNDARY],
            "高级": [MutationType.RETURN, MutationType.EXCEPTION]
        }
    
    @staticmethod
    def get_score_interpretation():
        """分数解释"""
        return {
            "90-100%": "优秀:测试套件非常有效",
            "70-89%": "良好:测试套件基本有效,有改进空间",
            "50-69%": "一般:测试套件需要加强",
            "below 50%": "较差:测试套件存在严重盲点"
        }

# 增量变异测试
class IncrementalMutationTest:
    """增量变异测试"""
    
    def __init__(self, baseline_score: float = 0.8):
        self.baseline_score = baseline_score
        self.history = []
    
    def should_run_mutation_test(self, changes: Dict) -> bool:
        """判断是否应该运行变异测试"""
        # 始终对新代码运行
        if changes.get("new_files"):
            return True
        
        # 对修改的文件运行
        if changes.get("modified_files"):
            return True
        
        # 定期运行(如每周)
        last_run = self.history[-1] if self.history else None
        if last_run:
            days_since = (datetime.now() - last_run["timestamp"]).days
            if days_since >= 7:
                return True
        
        return False
    
    def compare_with_baseline(self, current_score: float) -> Dict:
        """与基线对比"""
        baseline = self.baseline_score
        
        if self.history:
            baseline = self.history[-1]["score"]
        
        change = current_score - baseline
        
        return {
            "baseline": baseline,
            "current": current_score,
            "change": change,
            "regression": change < -0.05,  # 5%退化阈值
            "recommendation": "improve tests" if change < 0 else "maintain quality"
        }

# 变异测试报告
class MutationTestReport:
    def __init__(self, results: Dict):
        self.results = results
    
    def generate_html_report(self) -> str:
        """生成HTML报告"""
        summary = self.results["summary"]
        
        html = f"""
<!DOCTYPE html>
<html>
<head>
    <title>Mutation Test Report</title>
    <style>
        .score {{ font-size: 24px; font-weight: bold; }}
        .good {{ color: green; }}
        .warning {{ color: orange; }}
        .bad {{ color: red; }}
    </style>
</head>
<body>
    <h1>Mutation Test Report</h1>
    
    <div class="summary">
        <h2>Summary</h2>
        <p>Total Mutations: {summary['total']}</p>
        <p>Killed: {summary['killed']}</p>
        <p>Survived: {summary['survived']}</p>
        <p class="score {self._score_class(summary['mutation_score'])}">
            Mutation Score: {summary['mutation_score']:.2%}
        </p>
    </div>
    
    <div class="by-type">
        <h2>By Mutation Type</h2>
        <table>
            <tr><th>Type</th><th>Total</th><th>Killed</th><th>Score</th></tr>
"""
        
        for mut_type, stats in self.results.get("by_type", {}).items():
            html += f"""
            <tr>
                <td>{mut_type}</td>
                <td>{stats['total']}</td>
                <td>{stats['killed']}</td>
                <td>{stats['score']:.2%}</td>
            </tr>
"""
        
        html += """
        </table>
    </div>
    
    <div class="weak-spots">
        <h2>Weak Spots</h2>
        <ul>
"""
        
        for spot in self.results.get("weak_spots", [])[:10]:
            html += f"""
            <li>
                <strong>{spot['description']}</strong> at line {spot['location']['line']}<br>
                Recommendation: {spot['recommendation']}
            </li>
"""
        
        html += """
        </ul>
    </div>
</body>
</html>
"""
        return html
    
    def _score_class(self, score: float) -> str:
        if score >= 0.9:
            return "good"
        elif score >= 0.7:
            return "warning"
        return "bad"