变异测试:代码质量与测试有效性评估
变异测试:代码质量与测试有效性评估
变异测试原理
变异测试通过修改源代码(引入变异体),检查测试套件是否能检测到这些变更。如果测试通过了变异代码,说明测试存在盲点。变异分数(杀死的变异体比例)衡量测试的有效性。
# 变异测试核心框架
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
from enum import Enum
import ast
import copy
class MutationType(Enum):
ARITHMETIC = "arithmetic" # 算术运算符变异
RELATIONAL = "relational" # 关系运算符变异
LOGICAL = "logical" # 逻辑运算符变异
BOUNDARY = "boundary" # 边界值变异
RETURN = "return" # 返回值变异
EXCEPTION = "exception" # 异常变异
@dataclass
class Mutation:
id: str
mutation_type: MutationType
original_code: str
mutated_code: str
location: Dict[str, int] # line, column
description: str
@dataclass
class MutationResult:
mutation: Mutation
killed: bool # 测试是否检测到变异
test_name: Optional[str] # 杀死变异的测试
execution_time: float = 0.0
class MutantGenerator:
def __init__(self):
self.mutation_operators = {
MutationType.ARITHMETIC: self._arithmetic_mutations,
MutationType.RELATIONAL: self._relational_mutations,
MutationType.LOGICAL: self._logical_mutations,
MutationType.BOUNDARY: self._boundary_mutations,
}
def generate_mutations(self, source_code: str,
allowed_types: List[MutationType] = None) -> List[Mutation]:
"""生成变异体"""
mutations = []
# 解析AST
tree = ast.parse(source_code)
for mutation_type in (allowed_types or MutationType):
generator = self.mutation_operators.get(mutation_type)
if generator:
mutations.extend(generator(tree, source_code))
return mutations
def _arithmetic_mutations(self, tree: ast.AST,
source_code: str) -> List[Mutation]:
"""算术运算符变异"""
mutations = []
for node in ast.walk(tree):
if isinstance(node, ast.BinOp):
if isinstance(node.op, ast.Add):
mutated = self._replace_operator(source_code, node, "-")
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.ARITHMETIC,
original_code="+",
mutated_code="-",
location={"line": node.lineno, "col": node.col_offset},
description="Changed + to -"
))
elif isinstance(node.op, ast.Sub):
mutated = self._replace_operator(source_code, node, "+")
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.ARITHMETIC,
original_code="-",
mutated_code="+",
location={"line": node.lineno, "col": node.col_offset},
description="Changed - to +"
))
return mutations
def _relational_mutations(self, tree: ast.AST,
source_code: str) -> List[Mutation]:
"""关系运算符变异"""
mutations = []
operator_map = {
ast.Eq: ("==", "!="),
ast.NotEq: ("!=", "=="),
ast.Lt: ("<", ">="),
ast.LtE: ("<=", ">"),
ast.Gt: (">", "<="),
ast.GtE: (">=", "<"),
}
for node in ast.walk(tree):
if isinstance(node, ast.Compare):
for op in node.ops:
if type(op) in operator_map:
original, mutated_op = operator_map[type(op)]
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.RELATIONAL,
original_code=original,
mutated_code=mutated_op,
location={"line": node.lineno, "col": node.col_offset},
description=f"Changed {original} to {mutated_op}"
))
return mutations
def _logical_mutations(self, tree: ast.AST,
source_code: str) -> List[Mutation]:
"""逻辑运算符变异"""
mutations = []
for node in ast.walk(tree):
if isinstance(node, ast.BoolOp):
if isinstance(node.op, ast.And):
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.LOGICAL,
original_code="and",
mutated_code="or",
location={"line": node.lineno, "col": node.col_offset},
description="Changed and to or"
))
elif isinstance(node.op, ast.Or):
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.LOGICAL,
original_code="or",
mutated_code="and",
location={"line": node.lineno, "col": node.col_offset},
description="Changed or to and"
))
return mutations
def _boundary_mutations(self, tree: ast.AST,
source_code: str) -> List[Mutation]:
"""边界值变异"""
mutations = []
for node in ast.walk(tree):
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
# 变异数字常量
if node.value == 0:
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.BOUNDARY,
original_code="0",
mutated_code="1",
location={"line": node.lineno, "col": node.col_offset},
description="Changed 0 to 1"
))
elif node.value == 1:
mutations.append(Mutation(
id=f"mut_{node.lineno}_{node.col_offset}",
mutation_type=MutationType.BOUNDARY,
original_code="1",
mutated_code="0",
location={"line": node.lineno, "col": node.col_offset},
description="Changed 1 to 0"
))
return mutations
def _replace_operator(self, source_code: str,
node: ast.BinOp, new_op: str) -> str:
"""替换运算符"""
# 简化的替换逻辑
lines = source_code.split('\n')
if node.lineno <= len(lines):
line = lines[node.lineno - 1]
# 实际实现需要更精确的替换
return source_code
return source_code
# 变异测试执行器
class MutationTestRunner:
def __init__(self, test_suite: Callable, target_module: Any):
self.test_suite = test_suite
self.target_module = target_module
self.generator = MutantGenerator()
def run_mutation_test(self, source_code: str,
allowed_types: List[MutationType] = None) -> Dict:
"""运行变异测试"""
# 生成变异体
mutations = self.generator.generate_mutations(source_code, allowed_types)
results = []
for mutation in mutations:
# 应用变异
mutated_code = self._apply_mutation(source_code, mutation)
# 运行测试
killed = self._run_tests_on_mutant(mutated_code)
results.append(MutationResult(
mutation=mutation,
killed=killed,
test_name="killing_test" if killed else None
))
# 计算变异分数
killed_count = sum(1 for r in results if r.killed)
mutation_score = killed_count / len(results) if results else 0
return {
"total_mutations": len(mutations),
"killed": killed_count,
"survived": len(mutations) - killed_count,
"mutation_score": mutation_score,
"results": results
}
def _apply_mutation(self, source_code: str, mutation: Mutation) -> str:
"""应用变异"""
lines = source_code.split('\n')
line_idx = mutation.location["line"] - 1
if 0 <= line_idx < len(lines):
line = lines[line_idx]
mutated_line = line.replace(
mutation.original_code,
mutation.mutated_code,
1
)
lines[line_idx] = mutated_line
return '\n'.join(lines)
def _run_tests_on_mutant(self, mutated_code: str) -> bool:
"""在变异代码上运行测试"""
# 简化:实际需要动态编译和执行
# 返回True表示测试失败(变异被杀死)
return False
# 变异分数分析
class MutationScoreAnalyzer:
def __init__(self, results: List[MutationResult]):
self.results = results
def calculate_score(self) -> Dict:
"""计算变异分数"""
total = len(self.results)
killed = sum(1 for r in self.results if r.killed)
return {
"mutation_score": killed / total if total > 0 else 0,
"total": total,
"killed": killed,
"survived": total - killed
}
def analyze_by_type(self) -> Dict:
"""按变异类型分析"""
by_type = {}
for result in self.results:
mut_type = result.mutation.mutation_type.value
if mut_type not in by_type:
by_type[mut_type] = {"total": 0, "killed": 0}
by_type[mut_type]["total"] += 1
if result.killed:
by_type[mut_type]["killed"] += 1
# 计算每种类型的分数
for mut_type, stats in by_type.items():
stats["score"] = stats["killed"] / stats["total"] if stats["total"] > 0 else 0
return by_type
def identify_weak_spots(self) -> List[Dict]:
"""识别测试薄弱点"""
weak_spots = []
for result in self.results:
if not result.killed:
weak_spots.append({
"mutation_id": result.mutation.id,
"type": result.mutation.mutation_type.value,
"description": result.mutation.description,
"location": result.mutation.location,
"recommendation": self._get_recommendation(result.mutation)
})
return weak_spots
def _get_recommendation(self, mutation: Mutation) -> str:
"""获取改进建议"""
recommendations = {
MutationType.ARITHMETIC: "添加算术运算的边界测试",
MutationType.RELATIONAL: "添加条件分支的覆盖测试",
MutationType.LOGICAL: "添加逻辑组合的测试用例",
MutationType.BOUNDARY: "添加边界值测试",
}
return recommendations.get(mutation.mutation_type, "增强测试覆盖")
变异测试工具集成
变异测试可以集成到现有测试框架中,支持pytest、unittest等主流框架。
# pytest变异测试插件
import pytest
from typing import List, Dict
class MutationTestPlugin:
"""pytest变异测试插件"""
def __init__(self):
self.mutations = []
self.results = []
def pytest_collection_modifyitems(self, items):
"""修改测试收集"""
# 标记变异测试
for item in items:
if "mutation" in item.nodeid:
item.add_marker(pytest.mark.mutation)
def pytest_runtest_makereport(self, item, call):
"""处理测试结果"""
if call.when == "call" and "mutation" in item.nodeid:
self.results.append({
"test": item.nodeid,
"passed": call.excinfo is None
})
# 变异测试配置
@dataclass
class MutationTestConfig:
"""变异测试配置"""
target_files: List[str] = field(default_factory=list)
excluded_files: List[str] = field(default_factory=list)
mutation_types: List[MutationType] = field(default_factory=list)
timeout_per_mutation: int = 30
max_mutations: int = 100
parallel: bool = True
class MutationTestSuite:
"""变异测试套件"""
def __init__(self, config: MutationTestConfig):
self.config = config
self.generator = MutantGenerator()
self.results: List[MutationResult] = []
def run(self, source_files: Dict[str, str]) -> Dict:
"""运行变异测试"""
all_results = []
for file_path, source_code in source_files.items():
if self._should_mutate(file_path):
file_results = self._mutate_file(file_path, source_code)
all_results.extend(file_results)
self.results = all_results
return self._generate_report()
def _should_mutate(self, file_path: str) -> bool:
"""判断是否应该对该文件进行变异"""
# 检查排除列表
for excluded in self.config.excluded_files:
if excluded in file_path:
return False
# 检查目标列表
if self.config.target_files:
return any(target in file_path for target in self.config.target_files)
return True
def _mutate_file(self, file_path: str, source_code: str) -> List[MutationResult]:
"""对单个文件进行变异"""
mutations = self.generator.generate_mutations(
source_code,
self.config.mutation_types
)
# 限制变异数量
if len(mutations) > self.config.max_mutations:
mutations = mutations[:self.config.max_mutations]
results = []
for mutation in mutations:
# 执行变异测试
result = self._test_mutation(mutation)
results.append(result)
return results
def _test_mutation(self, mutation: Mutation) -> MutationResult:
"""测试单个变异"""
# 简化实现
return MutationResult(
mutation=mutation,
killed=False,
test_name=None
)
def _generate_report(self) -> Dict:
"""生成报告"""
analyzer = MutationScoreAnalyzer(self.results)
return {
"summary": analyzer.calculate_score(),
"by_type": analyzer.analyze_by_type(),
"weak_spots": analyzer.identify_weak_spots()
}
# 命令行接口
class MutationTestCLI:
def __init__(self):
self.parser = self._create_parser()
def _create_parser(self):
"""创建命令行解析器"""
import argparse
parser = argparse.ArgumentParser(
description="Mutation Testing Tool"
)
parser.add_argument(
"target",
help="Target file or directory"
)
parser.add_argument(
"--types",
nargs="+",
choices=[t.value for t in MutationType],
help="Mutation types to apply"
)
parser.add_argument(
"--max-mutations",
type=int,
default=100,
help="Maximum number of mutations"
)
parser.add_argument(
"--timeout",
type=int,
default=30,
help="Timeout per mutation (seconds)"
)
parser.add_argument(
"--output",
choices=["console", "json", "html"],
default="console",
help="Output format"
)
return parser
def run(self, args=None):
"""运行命令"""
parsed_args = self.parser.parse_args(args)
config = MutationTestConfig(
target_files=[parsed_args.target],
max_mutations=parsed_args.max_mutations,
timeout_per_mutation=parsed_args.timeout
)
if parsed_args.types:
config.mutation_types = [MutationType(t) for t in parsed_args.types]
suite = MutationTestSuite(config)
# 读取目标文件
import os
source_files = {}
if os.path.isfile(parsed_args.target):
with open(parsed_args.target, 'r') as f:
source_files[parsed_args.target] = f.read()
# 运行测试
results = suite.run(source_files)
# 输出结果
self._output_results(results, parsed_args.output)
def _output_results(self, results: Dict, format: str):
"""输出结果"""
if format == "console":
self._print_console(results)
elif format == "json":
import json
print(json.dumps(results, indent=2))
def _print_console(self, results: Dict):
"""打印到控制台"""
summary = results["summary"]
print(f"\nMutation Testing Results:")
print(f" Total mutations: {summary['total']}")
print(f" Killed: {summary['killed']}")
print(f" Survived: {summary['survived']}")
print(f" Mutation Score: {summary['mutation_score']:.2%}")
if results["weak_spots"]:
print(f"\nWeak spots ({len(results['weak_spots'])} mutations survived):")
for spot in results["weak_spots"][:5]:
print(f" - {spot['description']} at line {spot['location']['line']}")
print(f" Recommendation: {spot['recommendation']}")
变异测试最佳实践
变异测试需要平衡变异数量、执行时间和测试质量。以下是关键的最佳实践。
# 变异测试最佳实践
class MutationTestBestPractices:
"""变异测试最佳实践指南"""
@staticmethod
def get_target_selection_rules():
"""目标选择规则"""
return {
"优先级": [
"核心业务逻辑",
"历史缺陷多的代码",
"复杂度高的代码",
"新开发的功能"
],
"排除": [
"自动生成的代码",
"简单的getter/setter",
"配置文件",
"第三方库代码"
]
}
@staticmethod
def get_mutation_type_strategy():
"""变异类型策略"""
return {
"基础": [MutationType.ARITHMETIC, MutationType.RELATIONAL],
"进阶": [MutationType.LOGICAL, MutationType.BOUNDARY],
"高级": [MutationType.RETURN, MutationType.EXCEPTION]
}
@staticmethod
def get_score_interpretation():
"""分数解释"""
return {
"90-100%": "优秀:测试套件非常有效",
"70-89%": "良好:测试套件基本有效,有改进空间",
"50-69%": "一般:测试套件需要加强",
"below 50%": "较差:测试套件存在严重盲点"
}
# 增量变异测试
class IncrementalMutationTest:
"""增量变异测试"""
def __init__(self, baseline_score: float = 0.8):
self.baseline_score = baseline_score
self.history = []
def should_run_mutation_test(self, changes: Dict) -> bool:
"""判断是否应该运行变异测试"""
# 始终对新代码运行
if changes.get("new_files"):
return True
# 对修改的文件运行
if changes.get("modified_files"):
return True
# 定期运行(如每周)
last_run = self.history[-1] if self.history else None
if last_run:
days_since = (datetime.now() - last_run["timestamp"]).days
if days_since >= 7:
return True
return False
def compare_with_baseline(self, current_score: float) -> Dict:
"""与基线对比"""
baseline = self.baseline_score
if self.history:
baseline = self.history[-1]["score"]
change = current_score - baseline
return {
"baseline": baseline,
"current": current_score,
"change": change,
"regression": change < -0.05, # 5%退化阈值
"recommendation": "improve tests" if change < 0 else "maintain quality"
}
# 变异测试报告
class MutationTestReport:
def __init__(self, results: Dict):
self.results = results
def generate_html_report(self) -> str:
"""生成HTML报告"""
summary = self.results["summary"]
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Mutation Test Report</title>
<style>
.score {{ font-size: 24px; font-weight: bold; }}
.good {{ color: green; }}
.warning {{ color: orange; }}
.bad {{ color: red; }}
</style>
</head>
<body>
<h1>Mutation Test Report</h1>
<div class="summary">
<h2>Summary</h2>
<p>Total Mutations: {summary['total']}</p>
<p>Killed: {summary['killed']}</p>
<p>Survived: {summary['survived']}</p>
<p class="score {self._score_class(summary['mutation_score'])}">
Mutation Score: {summary['mutation_score']:.2%}
</p>
</div>
<div class="by-type">
<h2>By Mutation Type</h2>
<table>
<tr><th>Type</th><th>Total</th><th>Killed</th><th>Score</th></tr>
"""
for mut_type, stats in self.results.get("by_type", {}).items():
html += f"""
<tr>
<td>{mut_type}</td>
<td>{stats['total']}</td>
<td>{stats['killed']}</td>
<td>{stats['score']:.2%}</td>
</tr>
"""
html += """
</table>
</div>
<div class="weak-spots">
<h2>Weak Spots</h2>
<ul>
"""
for spot in self.results.get("weak_spots", [])[:10]:
html += f"""
<li>
<strong>{spot['description']}</strong> at line {spot['location']['line']}<br>
Recommendation: {spot['recommendation']}
</li>
"""
html += """
</ul>
</div>
</body>
</html>
"""
return html
def _score_class(self, score: float) -> str:
if score >= 0.9:
return "good"
elif score >= 0.7:
return "warning"
return "bad"