LLM审计
--- title: "LLM审计" description: "全面介绍LLM审计流程,包括算法审计、数据审计、第三方审计等关键环节和最佳实践" tags: ["LLM审计", "算法审计", "数据审计", "第三方审计"] category: "llm" icon: "🧠"
LLM审计
LLM审计概述
LLM审计是对大语言模型的开发、训练、部署和使用过程进行系统性检查和评估的过程。审计旨在确保模型的安全性、公平性、合规性和可靠性。
审计框架
from datetime import datetime
from typing import Dict, List, Any
import json
class LLMAuditFramework:
"""LLM审计框架"""
def __init__(self, model_name: str, audit_scope: str):
self.model_name = model_name
self.audit_scope = audit_scope
self.audit_id = f"audit_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.findings = []
self.recommendations = []
self.compliance_status = {}
def conduct_audit(self) -> Dict[str, Any]:
"""执行审计"""
audit_report = {
"audit_id": self.audit_id,
"model_name": self.model_name,
"audit_scope": self.audit_scope,
"timestamp": datetime.now().isoformat(),
"executive_summary": self._generate_executive_summary(),
"findings": self.findings,
"recommendations": self.recommendations,
"compliance_status": self.compliance_status,
"risk_assessment": self._assess_risks(),
"next_steps": self._determine_next_steps()
}
return audit_report
def _generate_executive_summary(self) -> str:
"""生成执行摘要"""
critical_findings = [f for f in self.findings if f["severity"] == "critical"]
high_findings = [f for f in self.findings if f["severity"] == "high"]
summary = f"审计完成。发现 {len(self.findings)} 个问题,"
summary += f"其中 {len(critical_findings)} 个严重,{len(high_findings)} 个高危。"
return summary
def _assess_risks(self) -> Dict[str, Any]:
"""风险评估"""
return {
"overall_risk_level": self._calculate_risk_level(),
"risk_categories": self._categorize_risks(),
"mitigation_strategies": self._suggest_mitigations()
}
def _calculate_risk_level(self) -> str:
"""计算风险等级"""
critical_count = sum(1 for f in self.findings if f["severity"] == "critical")
high_count = sum(1 for f in self.findings if f["severity"] == "high")
if critical_count > 0:
return "critical"
elif high_count > 2:
return "high"
elif high_count > 0:
return "medium"
else:
return "low"
def _categorize_risks(self) -> Dict[str, int]:
"""风险分类"""
categories = {}
for finding in self.findings:
category = finding.get("category", "unknown")
categories[category] = categories.get(category, 0) + 1
return categories
def _suggest_mitigations(self) -> List[str]:
"""建议缓解措施"""
mitigations = []
for finding in self.findings:
if finding["severity"] in ["critical", "high"]:
mitigations.append(f"针对 {finding['title']} 的缓解措施")
return mitigations
def _determine_next_steps(self) -> List[str]:
"""确定下一步"""
next_steps = []
if self._calculate_risk_level() in ["critical", "high"]:
next_steps.append("立即修复严重和高危问题")
next_steps.append("安排后续审计")
else:
next_steps.append("监控已识别问题")
next_steps.append("定期复审")
return next_steps
算法审计
公平性审计
import numpy as np
from typing import List, Dict
class FairnessAuditor:
"""公平性审计器"""
def __init__(self, model, protected_attributes: List[str]):
self.model = model
self.protected_attributes = protected_attributes
self.metrics = {}
def audit_demographic_parity(self, test_data: List[Dict]) -> Dict[str, float]:
"""审计人口统计平等性"""
results = {}
for attr in self.protected_attributes:
# 计算不同群体的正类预测率
groups = self._split_by_attribute(test_data, attr)
positive_rates = {}
for group_name, group_data in groups.items():
predictions = self._get_predictions(group_data)
positive_rate = np.mean(predictions)
positive_rates[group_name] = positive_rate
# 计算差异
rates = list(positive_rates.values())
max_rate = max(rates)
min_rate = min(rates)
disparity = max_rate - min_rate
results[attr] = {
"positive_rates": positive_rates,
"disparity": disparity,
"passes_threshold": disparity < 0.1 # 10%差异阈值
}
return results
def audit_equalized_odds(self, test_data: List[Dict]) -> Dict[str, float]:
"""审计均等机会"""
results = {}
for attr in self.protected_attributes:
groups = self._split_by_attribute(test_data, attr)
tpr_by_group = {}
fpr_by_group = {}
for group_name, group_data in groups.items():
predictions = self._get_predictions(group_data)
true_labels = self._get_true_labels(group_data)
# 计算真正率和假正率
tpr = self._calculate_tpr(predictions, true_labels)
fpr = self._calculate_fpr(predictions, true_labels)
tpr_by_group[group_name] = tpr
fpr_by_group[group_name] = fpr
results[attr] = {
"tpr_by_group": tpr_by_group,
"fpr_by_group": fpr_by_group,
"tpr_parity": self._check_parity(tpr_by_group),
"fpr_parity": self._check_parity(fpr_by_group)
}
return results
def audit_calibration(self, test_data: List[Dict]) -> Dict[str, float]:
"""审计校准性"""
results = {}
for attr in self.protected_attributes:
groups = self._split_by_attribute(test_data, attr)
calibration_scores = {}
for group_name, group_data in groups.items():
predictions = self._get_predictions(group_data)
true_labels = self._get_true_labels(group_data)
# 计算校准分数
calibration = self._calculate_calibration(predictions, true_labels)
calibration_scores[group_name] = calibration
results[attr] = {
"calibration_scores": calibration_scores,
"calibration_parity": self._check_parity(calibration_scores)
}
return results
def _split_by_attribute(self, data: List[Dict], attribute: str) -> Dict[str, List]:
"""按属性分组"""
groups = {}
for item in data:
group = item.get(attribute, "unknown")
if group not in groups:
groups[group] = []
groups[group].append(item)
return groups
def _get_predictions(self, data: List[Dict]) -> np.ndarray:
"""获取预测结果"""
# 简化实现
return np.random.randint(0, 2, len(data))
def _get_true_labels(self, data: List[Dict]) -> np.ndarray:
"""获取真实标签"""
# 简化实现
return np.random.randint(0, 2, len(data))
def _calculate_tpr(self, predictions: np.ndarray, true_labels: np.ndarray) -> float:
"""计算真正率"""
tp = np.sum((predictions == 1) & (true_labels == 1))
fn = np.sum((predictions == 0) & (true_labels == 1))
return tp / (tp + fn) if (tp + fn) > 0 else 0.0
def _calculate_fpr(self, predictions: np.ndarray, true_labels: np.ndarray) -> float:
"""计算假正率"""
fp = np.sum((predictions == 1) & (true_labels == 0))
tn = np.sum((predictions == 0) & (true_labels == 0))
return fp / (fp + tn) if (fp + tn) > 0 else 0.0
def _calculate_calibration(self, predictions: np.ndarray, true_labels: np.ndarray) -> float:
"""计算校准分数"""
# 简化实现
return np.mean(np.abs(predictions - true_labels))
def _check_parity(self, scores: Dict[str, float]) -> bool:
"""检查均等性"""
values = list(scores.values())
if not values:
return True
return max(values) - min(values) < 0.1
鲁棒性审计
class RobustnessAuditor:
"""鲁棒性审计器"""
def __init__(self, model):
self.model = model
self.attack_methods = ["adversarial", "perturbation", "extraction"]
def audit_adversarial_robustness(self, test_samples: List) -> Dict[str, float]:
"""审计对抗鲁棒性"""
results = {}
for attack in self.attack_methods:
success_rate = self._simulate_attack(test_samples, attack)
results[attack] = {
"success_rate": success_rate,
"passes_threshold": success_rate < 0.05 # 5%攻击成功率阈值
}
return results
def audit_input_perturbation(self, test_samples: List) -> Dict[str, float]:
"""审计输入扰动"""
perturbation_levels = [0.01, 0.05, 0.1, 0.2]
results = {}
for level in perturbation_levels:
accuracy = self._test_with_perturbation(test_samples, level)
results[f"perturbation_{level}"] = {
"accuracy": accuracy,
"degradation": 1.0 - accuracy
}
return results
def audit_distribution_shift(self, train_data: List, test_data: List) -> Dict[str, float]:
"""审计分布偏移"""
train_features = self._extract_features(train_data)
test_features = self._extract_features(test_data)
# 计算分布差异
distribution_distance = self._calculate_distribution_distance(train_features, test_features)
return {
"distribution_distance": distribution_distance,
"shift_severity": self._classify_shift_severity(distribution_distance)
}
def _simulate_attack(self, samples: List, attack_type: str) -> float:
"""模拟攻击"""
# 简化实现
return np.random.uniform(0, 0.1)
def _test_with_perturbation(self, samples: List, perturbation_level: float) -> float:
"""带扰动测试"""
# 简化实现
return 0.95 - perturbation_level * 0.5
def _extract_features(self, data: List) -> np.ndarray:
"""提取特征"""
# 简化实现
return np.random.randn(len(data), 10)
def _calculate_distribution_distance(self, dist1: np.ndarray, dist2: np.ndarray) -> float:
"""计算分布距离"""
from scipy.stats import wasserstein_distance
return wasserstein_distance(dist1.flatten(), dist2.flatten())
def _classify_shift_severity(self, distance: float) -> str:
"""分类偏移严重程度"""
if distance > 0.5:
return "severe"
elif distance > 0.2:
return "moderate"
else:
return "mild"
数据审计
训练数据审计
class TrainingDataAuditor:
"""训练数据审计器"""
def __init__(self):
self.data_quality_metrics = {}
self.privacy_metrics = {}
self.bias_metrics = {}
def audit_data_quality(self, training_data: List[Dict]) -> Dict[str, Any]:
"""审计数据质量"""
quality_report = {
"completeness": self._check_completeness(training_data),
"consistency": self._check_consistency(training_data),
"accuracy": self._check_accuracy(training_data),
"timeliness": self._check_timeliness(training_data),
"uniqueness": self._check_uniqueness(training_data)
}
return quality_report
def audit_data_privacy(self, training_data: List[Dict]) -> Dict[str, Any]:
"""审计数据隐私"""
privacy_report = {
"pii_detection": self._detect_pii(training_data),
"anonymization_level": self._assess_anonymization(training_data),
"consent_coverage": self._check_consent_coverage(training_data),
"retention_compliance": self._check_retention_compliance(training_data)
}
return privacy_report
def audit_data_bias(self, training_data: List[Dict]) -> Dict[str, Any]:
"""审计数据偏差"""
bias_report = {
"representation_balance": self._check_representation_balance(training_data),
"label_distribution": self._analyze_label_distribution(training_data),
"attribute_correlation": self._analyze_attribute_correlation(training_data),
"historical_bias": self._detect_historical_bias(training_data)
}
return bias_report
def _check_completeness(self, data: List[Dict]) -> Dict[str, float]:
"""检查完整性"""
completeness_scores = {}
for field in ["text", "label", "metadata"]:
non_null_count = sum(1 for item in data if item.get(field) is not None)
completeness_scores[field] = non_null_count / len(data) if data else 0
return completeness_scores
def _check_consistency(self, data: List[Dict]) -> float:
"""检查一致性"""
# 简化实现
return 0.95
def _check_accuracy(self, data: List[Dict]) -> float:
"""检查准确性"""
# 简化实现
return 0.92
def _check_timeliness(self, data: List[Dict]) -> Dict[str, Any]:
"""检查时效性"""
return {
"average_age_days": 180,
"recency_score": 0.8
}
def _check_uniqueness(self, data: List[Dict]) -> float:
"""检查唯一性"""
# 简化实现
return 0.98
def _detect_pii(self, data: List[Dict]) -> Dict[str, int]:
"""检测PII"""
import re
pii_counts = {"email": 0, "phone": 0, "id_card": 0, "name": 0}
for item in data:
text = str(item.get("text", ""))
# 邮箱检测
if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
pii_counts["email"] += 1
# 手机号检测
if re.search(r'\b1[3-9]\d{9}\b', text):
pii_counts["phone"] += 1
return pii_counts
def _assess_anonymization(self, data: List[Dict]) -> float:
"""评估匿名化程度"""
# 简化实现
return 0.85
def _check_consent_coverage(self, data: List[Dict]) -> float:
"""检查同意覆盖率"""
# 简化实现
return 0.9
def _check_retention_compliance(self, data: List[Dict]) -> float:
"""检查保留合规性"""
# 简化实现
return 0.95
def _check_representation_balance(self, data: List[Dict]) -> Dict[str, float]:
"""检查表示平衡性"""
# 简化实现
return {
"gender_balance": 0.48,
"age_balance": 0.45,
"geographic_balance": 0.42
}
def _analyze_label_distribution(self, data: List[Dict]) -> Dict[str, int]:
"""分析标签分布"""
# 简化实现
return {"positive": 450, "negative": 550}
def _analyze_attribute_correlation(self, data: List[Dict]) -> Dict[str, float]:
"""分析属性相关性"""
# 简化实现
return {"gender_label_corr": 0.15, "age_label_corr": 0.12}
def _detect_historical_bias(self, data: List[Dict]) -> Dict[str, Any]:
"""检测历史偏差"""
# 简化实现
return {"bias_detected": False, "bias_level": "low"}
推理数据审计
class InferenceDataAuditor:
"""推理数据审计器"""
def __init__(self):
self.input_patterns = {}
self.output_patterns = {}
def audit_input_data(self, input_logs: List[Dict]) -> Dict[str, Any]:
"""审计输入数据"""
input_report = {
"volume_analysis": self._analyze_volume(input_logs),
"pattern_detection": self._detect_patterns(input_logs),
"anomaly_detection": self._detect_anomalies(input_logs),
"privacy_assessment": self._assess_privacy(input_logs)
}
return input_report
def audit_output_data(self, output_logs: List[Dict]) -> Dict[str, Any]:
"""审计输出数据"""
output_report = {
"quality_metrics": self._assess_quality(output_logs),
"safety_metrics": self._assess_safety(output_logs),
"consistency_metrics": self._assess_consistency(output_logs),
"performance_metrics": self._assess_performance(output_logs)
}
return output_report
def _analyze_volume(self, logs: List[Dict]) -> Dict[str, Any]:
"""分析数据量"""
return {
"total_requests": len(logs),
"average_daily": len(logs) / 30,
"peak_daily": len(logs) / 15
}
def _detect_patterns(self, logs: List[Dict]) -> Dict[str, Any]:
"""检测模式"""
return {
"temporal_patterns": "normal",
"user_patterns": "diverse",
"content_patterns": "varied"
}
def _detect_anomalies(self, logs: List[Dict]) -> List[Dict]:
"""检测异常"""
anomalies = []
# 简化实现
if len(logs) > 10000:
anomalies.append({
"type": "volume_anomaly",
"severity": "medium",
"description": "异常高的请求量"
})
return anomalies
def _assess_privacy(self, logs: List[Dict]) -> Dict[str, Any]:
"""评估隐私"""
return {
"pii_exposure_risk": "low",
"data_retention_compliance": True,
"encryption_status": "enabled"
}
def _assess_quality(self, logs: List[Dict]) -> Dict[str, float]:
"""评估质量"""
return {
"accuracy": 0.92,
"relevance": 0.88,
"coherence": 0.85
}
def _assess_safety(self, logs: List[Dict]) -> Dict[str, float]:
"""评估安全性"""
return {
"harmful_content_rate": 0.02,
"bias_detection_rate": 0.05,
"misinformation_rate": 0.03
}
def _assess_consistency(self, logs: List[Dict]) -> Dict[str, float]:
"""评估一致性"""
return {
"output_consistency": 0.9,
"style_consistency": 0.88,
"fact_consistency": 0.85
}
def _assess_performance(self, logs: List[Dict]) -> Dict[str, float]:
"""评估性能"""
return {
"average_latency": 0.5,
"p95_latency": 1.2,
"throughput": 1000
}
第三方审计
审计流程管理
class ThirdPartyAuditManager:
"""第三方审计管理"""
def __init__(self):
self.auditors = []
self.audit_reports = []
self.compliance_certifications = []
def engage_auditor(self, auditor_info: Dict[str, Any]) -> str:
"""聘请审计师"""
auditor_id = f"auditor_{len(self.auditors) + 1}"
self.auditors.append({
"id": auditor_id,
"name": auditor_info["name"],
"credentials": auditor_info["credentials"],
"specialization": auditor_info["specialization"],
"engagement_date": datetime.now().isoformat()
})
return auditor_id
def conduct_third_party_audit(self, auditor_id: str, audit_scope: str) -> Dict[str, Any]:
"""执行第三方审计"""
auditor = self._get_auditor(auditor_id)
audit_report = {
"audit_id": f"tpa_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"auditor": auditor["name"],
"scope": audit_scope,
"methodology": self._define_methodology(audit_scope),
"findings": self._collect_findings(auditor_id),
"recommendations": self._generate_recommendations(auditor_id),
"certification_status": self._determine_certification(auditor_id)
}
self.audit_reports.append(audit_report)
return audit_report
def _get_auditor(self, auditor_id: str) -> Dict[str, Any]:
"""获取审计师信息"""
for auditor in self.auditors:
if auditor["id"] == auditor_id:
return auditor
raise ValueError(f"Auditor {auditor_id} not found")
def _define_methodology(self, scope: str) -> Dict[str, Any]:
"""定义方法论"""
methodologies = {
"security": {
"tools": ["penetration_testing", "vulnerability_scanning"],
"standards": ["ISO_27001", "SOC_2"],
"duration": "4-6 weeks"
},
"fairness": {
"tools": ["fairness_toolkit", "bias_detection"],
"standards": ["IEEE_7010", "NIST_AI_100-1"],
"duration": "2-3 weeks"
},
"privacy": {
"tools": ["privacy_assessment", "data_flow_analysis"],
"standards": ["GDPR", "CCPA", "ISO_27701"],
"duration": "3-4 weeks"
}
}
return methodologies.get(scope, methodologies["security"])
def _collect_findings(self, auditor_id: str) -> List[Dict]:
"""收集发现"""
# 简化实现
return [
{
"id": "F001",
"title": "输入验证不足",
"severity": "medium",
"description": "某些API端点缺少输入验证",
"remediation": "添加输入验证逻辑"
}
]
def _generate_recommendations(self, auditor_id: str) -> List[Dict]:
"""生成建议"""
return [
{
"priority": "high",
"category": "security",
"recommendation": "实施API网关和速率限制",
"timeline": "30 days"
}
]
def _determine_certification(self, auditor_id: str) -> Dict[str, Any]:
"""确定认证状态"""
return {
"certified": True,
"certification_level": "A",
"valid_until": "2027-06-25",
"conditions": []
}
def track_remediation(self, finding_id: str, remediation_plan: Dict) -> bool:
"""跟踪修复"""
# 简化实现
return True
def generate_compliance_report(self) -> Dict[str, Any]:
"""生成合规报告"""
return {
"report_date": datetime.now().isoformat(),
"audit_summary": self._summarize_audits(),
"compliance_status": self._assess_compliance(),
"certifications": self.compliance_certifications,
"next_audit_date": "2027-06-25"
}
def _summarize_audits(self) -> Dict[str, Any]:
"""总结审计"""
return {
"total_audits": len(self.audit_reports),
"audits_this_year": len([r for r in self.audit_reports
if r["audit_id"].startswith("tpa_2026")])
}
def _assess_compliance(self) -> Dict[str, bool]:
"""评估合规性"""
return {
"GDPR": True,
"SOC_2": True,
"ISO_27001": True,
"CCPA": True
}
审计工具集成
自动化审计工具
class LLMAuditTools:
"""LLM审计工具"""
def __init__(self):
self.tools = {
"fairness": FairnessAuditor(None, ["gender", "age", "race"]),
"robustness": RobustnessAuditor(None),
"data_quality": TrainingDataAuditor(),
"third_party": ThirdPartyAuditManager()
}
def run_comprehensive_audit(self, model, training_data: List[Dict],
test_data: List[Dict]) -> Dict[str, Any]:
"""运行综合审计"""
audit_results = {}
# 公平性审计
fairness_auditor = FairnessAuditor(model, ["gender", "age"])
audit_results["fairness"] = fairness_auditor.audit_demographic_parity(test_data)
# 鲁棒性审计
robustness_auditor = RobustnessAuditor(model)
audit_results["robustness"] = robustness_auditor.audit_adversarial_robustness(test_data)
# 数据质量审计
data_auditor = TrainingDataAuditor()
audit_results["data_quality"] = data_auditor.audit_data_quality(training_data)
# 生成审计报告
audit_framework = LLMAuditFramework("model_name", "comprehensive")
audit_framework.findings = self._extract_findings(audit_results)
audit_report = audit_framework.conduct_audit()
return audit_report
def _extract_findings(self, audit_results: Dict) -> List[Dict]:
"""提取发现"""
findings = []
# 从公平性结果中提取
if "fairness" in audit_results:
for attr, metrics in audit_results["fairness"].items():
if not metrics.get("passes_threshold", True):
findings.append({
"id": f"fairness_{attr}",
"title": f"{attr}公平性问题",
"severity": "high",
"category": "fairness",
"description": f"{attr}存在显著差异"
})
return findings
总结
LLM审计是确保模型安全、公平和合规的重要过程。通过算法审计、数据审计和第三方审计,可以全面评估模型的风险和性能。建立完善的审计框架和工具,有助于组织及时发现和解决问题,建立用户信任。