← 返回首页
🧠

LLM审计

📂 llm ⏱ 9 min 1648 words

--- title: "LLM审计" description: "全面介绍LLM审计流程,包括算法审计、数据审计、第三方审计等关键环节和最佳实践" tags: ["LLM审计", "算法审计", "数据审计", "第三方审计"] category: "llm" icon: "🧠"

LLM审计

LLM审计概述

LLM审计是对大语言模型的开发、训练、部署和使用过程进行系统性检查和评估的过程。审计旨在确保模型的安全性、公平性、合规性和可靠性。

审计框架

from datetime import datetime
from typing import Dict, List, Any
import json

class LLMAuditFramework:
    """LLM审计框架"""
    def __init__(self, model_name: str, audit_scope: str):
        self.model_name = model_name
        self.audit_scope = audit_scope
        self.audit_id = f"audit_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        self.findings = []
        self.recommendations = []
        self.compliance_status = {}
    
    def conduct_audit(self) -> Dict[str, Any]:
        """执行审计"""
        audit_report = {
            "audit_id": self.audit_id,
            "model_name": self.model_name,
            "audit_scope": self.audit_scope,
            "timestamp": datetime.now().isoformat(),
            "executive_summary": self._generate_executive_summary(),
            "findings": self.findings,
            "recommendations": self.recommendations,
            "compliance_status": self.compliance_status,
            "risk_assessment": self._assess_risks(),
            "next_steps": self._determine_next_steps()
        }
        
        return audit_report
    
    def _generate_executive_summary(self) -> str:
        """生成执行摘要"""
        critical_findings = [f for f in self.findings if f["severity"] == "critical"]
        high_findings = [f for f in self.findings if f["severity"] == "high"]
        
        summary = f"审计完成。发现 {len(self.findings)} 个问题,"
        summary += f"其中 {len(critical_findings)} 个严重,{len(high_findings)} 个高危。"
        
        return summary
    
    def _assess_risks(self) -> Dict[str, Any]:
        """风险评估"""
        return {
            "overall_risk_level": self._calculate_risk_level(),
            "risk_categories": self._categorize_risks(),
            "mitigation_strategies": self._suggest_mitigations()
        }
    
    def _calculate_risk_level(self) -> str:
        """计算风险等级"""
        critical_count = sum(1 for f in self.findings if f["severity"] == "critical")
        high_count = sum(1 for f in self.findings if f["severity"] == "high")
        
        if critical_count > 0:
            return "critical"
        elif high_count > 2:
            return "high"
        elif high_count > 0:
            return "medium"
        else:
            return "low"
    
    def _categorize_risks(self) -> Dict[str, int]:
        """风险分类"""
        categories = {}
        for finding in self.findings:
            category = finding.get("category", "unknown")
            categories[category] = categories.get(category, 0) + 1
        return categories
    
    def _suggest_mitigations(self) -> List[str]:
        """建议缓解措施"""
        mitigations = []
        for finding in self.findings:
            if finding["severity"] in ["critical", "high"]:
                mitigations.append(f"针对 {finding['title']} 的缓解措施")
        return mitigations
    
    def _determine_next_steps(self) -> List[str]:
        """确定下一步"""
        next_steps = []
        if self._calculate_risk_level() in ["critical", "high"]:
            next_steps.append("立即修复严重和高危问题")
            next_steps.append("安排后续审计")
        else:
            next_steps.append("监控已识别问题")
            next_steps.append("定期复审")
        return next_steps

算法审计

公平性审计

import numpy as np
from typing import List, Dict

class FairnessAuditor:
    """公平性审计器"""
    def __init__(self, model, protected_attributes: List[str]):
        self.model = model
        self.protected_attributes = protected_attributes
        self.metrics = {}
    
    def audit_demographic_parity(self, test_data: List[Dict]) -> Dict[str, float]:
        """审计人口统计平等性"""
        results = {}
        
        for attr in self.protected_attributes:
            # 计算不同群体的正类预测率
            groups = self._split_by_attribute(test_data, attr)
            
            positive_rates = {}
            for group_name, group_data in groups.items():
                predictions = self._get_predictions(group_data)
                positive_rate = np.mean(predictions)
                positive_rates[group_name] = positive_rate
            
            # 计算差异
            rates = list(positive_rates.values())
            max_rate = max(rates)
            min_rate = min(rates)
            disparity = max_rate - min_rate
            
            results[attr] = {
                "positive_rates": positive_rates,
                "disparity": disparity,
                "passes_threshold": disparity < 0.1  # 10%差异阈值
            }
        
        return results
    
    def audit_equalized_odds(self, test_data: List[Dict]) -> Dict[str, float]:
        """审计均等机会"""
        results = {}
        
        for attr in self.protected_attributes:
            groups = self._split_by_attribute(test_data, attr)
            
            tpr_by_group = {}
            fpr_by_group = {}
            
            for group_name, group_data in groups.items():
                predictions = self._get_predictions(group_data)
                true_labels = self._get_true_labels(group_data)
                
                # 计算真正率和假正率
                tpr = self._calculate_tpr(predictions, true_labels)
                fpr = self._calculate_fpr(predictions, true_labels)
                
                tpr_by_group[group_name] = tpr
                fpr_by_group[group_name] = fpr
            
            results[attr] = {
                "tpr_by_group": tpr_by_group,
                "fpr_by_group": fpr_by_group,
                "tpr_parity": self._check_parity(tpr_by_group),
                "fpr_parity": self._check_parity(fpr_by_group)
            }
        
        return results
    
    def audit_calibration(self, test_data: List[Dict]) -> Dict[str, float]:
        """审计校准性"""
        results = {}
        
        for attr in self.protected_attributes:
            groups = self._split_by_attribute(test_data, attr)
            
            calibration_scores = {}
            for group_name, group_data in groups.items():
                predictions = self._get_predictions(group_data)
                true_labels = self._get_true_labels(group_data)
                
                # 计算校准分数
                calibration = self._calculate_calibration(predictions, true_labels)
                calibration_scores[group_name] = calibration
            
            results[attr] = {
                "calibration_scores": calibration_scores,
                "calibration_parity": self._check_parity(calibration_scores)
            }
        
        return results
    
    def _split_by_attribute(self, data: List[Dict], attribute: str) -> Dict[str, List]:
        """按属性分组"""
        groups = {}
        for item in data:
            group = item.get(attribute, "unknown")
            if group not in groups:
                groups[group] = []
            groups[group].append(item)
        return groups
    
    def _get_predictions(self, data: List[Dict]) -> np.ndarray:
        """获取预测结果"""
        # 简化实现
        return np.random.randint(0, 2, len(data))
    
    def _get_true_labels(self, data: List[Dict]) -> np.ndarray:
        """获取真实标签"""
        # 简化实现
        return np.random.randint(0, 2, len(data))
    
    def _calculate_tpr(self, predictions: np.ndarray, true_labels: np.ndarray) -> float:
        """计算真正率"""
        tp = np.sum((predictions == 1) & (true_labels == 1))
        fn = np.sum((predictions == 0) & (true_labels == 1))
        return tp / (tp + fn) if (tp + fn) > 0 else 0.0
    
    def _calculate_fpr(self, predictions: np.ndarray, true_labels: np.ndarray) -> float:
        """计算假正率"""
        fp = np.sum((predictions == 1) & (true_labels == 0))
        tn = np.sum((predictions == 0) & (true_labels == 0))
        return fp / (fp + tn) if (fp + tn) > 0 else 0.0
    
    def _calculate_calibration(self, predictions: np.ndarray, true_labels: np.ndarray) -> float:
        """计算校准分数"""
        # 简化实现
        return np.mean(np.abs(predictions - true_labels))
    
    def _check_parity(self, scores: Dict[str, float]) -> bool:
        """检查均等性"""
        values = list(scores.values())
        if not values:
            return True
        return max(values) - min(values) < 0.1

鲁棒性审计

class RobustnessAuditor:
    """鲁棒性审计器"""
    def __init__(self, model):
        self.model = model
        self.attack_methods = ["adversarial", "perturbation", "extraction"]
    
    def audit_adversarial_robustness(self, test_samples: List) -> Dict[str, float]:
        """审计对抗鲁棒性"""
        results = {}
        
        for attack in self.attack_methods:
            success_rate = self._simulate_attack(test_samples, attack)
            results[attack] = {
                "success_rate": success_rate,
                "passes_threshold": success_rate < 0.05  # 5%攻击成功率阈值
            }
        
        return results
    
    def audit_input_perturbation(self, test_samples: List) -> Dict[str, float]:
        """审计输入扰动"""
        perturbation_levels = [0.01, 0.05, 0.1, 0.2]
        results = {}
        
        for level in perturbation_levels:
            accuracy = self._test_with_perturbation(test_samples, level)
            results[f"perturbation_{level}"] = {
                "accuracy": accuracy,
                "degradation": 1.0 - accuracy
            }
        
        return results
    
    def audit_distribution_shift(self, train_data: List, test_data: List) -> Dict[str, float]:
        """审计分布偏移"""
        train_features = self._extract_features(train_data)
        test_features = self._extract_features(test_data)
        
        # 计算分布差异
        distribution_distance = self._calculate_distribution_distance(train_features, test_features)
        
        return {
            "distribution_distance": distribution_distance,
            "shift_severity": self._classify_shift_severity(distribution_distance)
        }
    
    def _simulate_attack(self, samples: List, attack_type: str) -> float:
        """模拟攻击"""
        # 简化实现
        return np.random.uniform(0, 0.1)
    
    def _test_with_perturbation(self, samples: List, perturbation_level: float) -> float:
        """带扰动测试"""
        # 简化实现
        return 0.95 - perturbation_level * 0.5
    
    def _extract_features(self, data: List) -> np.ndarray:
        """提取特征"""
        # 简化实现
        return np.random.randn(len(data), 10)
    
    def _calculate_distribution_distance(self, dist1: np.ndarray, dist2: np.ndarray) -> float:
        """计算分布距离"""
        from scipy.stats import wasserstein_distance
        return wasserstein_distance(dist1.flatten(), dist2.flatten())
    
    def _classify_shift_severity(self, distance: float) -> str:
        """分类偏移严重程度"""
        if distance > 0.5:
            return "severe"
        elif distance > 0.2:
            return "moderate"
        else:
            return "mild"

数据审计

训练数据审计

class TrainingDataAuditor:
    """训练数据审计器"""
    def __init__(self):
        self.data_quality_metrics = {}
        self.privacy_metrics = {}
        self.bias_metrics = {}
    
    def audit_data_quality(self, training_data: List[Dict]) -> Dict[str, Any]:
        """审计数据质量"""
        quality_report = {
            "completeness": self._check_completeness(training_data),
            "consistency": self._check_consistency(training_data),
            "accuracy": self._check_accuracy(training_data),
            "timeliness": self._check_timeliness(training_data),
            "uniqueness": self._check_uniqueness(training_data)
        }
        
        return quality_report
    
    def audit_data_privacy(self, training_data: List[Dict]) -> Dict[str, Any]:
        """审计数据隐私"""
        privacy_report = {
            "pii_detection": self._detect_pii(training_data),
            "anonymization_level": self._assess_anonymization(training_data),
            "consent_coverage": self._check_consent_coverage(training_data),
            "retention_compliance": self._check_retention_compliance(training_data)
        }
        
        return privacy_report
    
    def audit_data_bias(self, training_data: List[Dict]) -> Dict[str, Any]:
        """审计数据偏差"""
        bias_report = {
            "representation_balance": self._check_representation_balance(training_data),
            "label_distribution": self._analyze_label_distribution(training_data),
            "attribute_correlation": self._analyze_attribute_correlation(training_data),
            "historical_bias": self._detect_historical_bias(training_data)
        }
        
        return bias_report
    
    def _check_completeness(self, data: List[Dict]) -> Dict[str, float]:
        """检查完整性"""
        completeness_scores = {}
        for field in ["text", "label", "metadata"]:
            non_null_count = sum(1 for item in data if item.get(field) is not None)
            completeness_scores[field] = non_null_count / len(data) if data else 0
        return completeness_scores
    
    def _check_consistency(self, data: List[Dict]) -> float:
        """检查一致性"""
        # 简化实现
        return 0.95
    
    def _check_accuracy(self, data: List[Dict]) -> float:
        """检查准确性"""
        # 简化实现
        return 0.92
    
    def _check_timeliness(self, data: List[Dict]) -> Dict[str, Any]:
        """检查时效性"""
        return {
            "average_age_days": 180,
            "recency_score": 0.8
        }
    
    def _check_uniqueness(self, data: List[Dict]) -> float:
        """检查唯一性"""
        # 简化实现
        return 0.98
    
    def _detect_pii(self, data: List[Dict]) -> Dict[str, int]:
        """检测PII"""
        import re
        pii_counts = {"email": 0, "phone": 0, "id_card": 0, "name": 0}
        
        for item in data:
            text = str(item.get("text", ""))
            
            # 邮箱检测
            if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
                pii_counts["email"] += 1
            
            # 手机号检测
            if re.search(r'\b1[3-9]\d{9}\b', text):
                pii_counts["phone"] += 1
        
        return pii_counts
    
    def _assess_anonymization(self, data: List[Dict]) -> float:
        """评估匿名化程度"""
        # 简化实现
        return 0.85
    
    def _check_consent_coverage(self, data: List[Dict]) -> float:
        """检查同意覆盖率"""
        # 简化实现
        return 0.9
    
    def _check_retention_compliance(self, data: List[Dict]) -> float:
        """检查保留合规性"""
        # 简化实现
        return 0.95
    
    def _check_representation_balance(self, data: List[Dict]) -> Dict[str, float]:
        """检查表示平衡性"""
        # 简化实现
        return {
            "gender_balance": 0.48,
            "age_balance": 0.45,
            "geographic_balance": 0.42
        }
    
    def _analyze_label_distribution(self, data: List[Dict]) -> Dict[str, int]:
        """分析标签分布"""
        # 简化实现
        return {"positive": 450, "negative": 550}
    
    def _analyze_attribute_correlation(self, data: List[Dict]) -> Dict[str, float]:
        """分析属性相关性"""
        # 简化实现
        return {"gender_label_corr": 0.15, "age_label_corr": 0.12}
    
    def _detect_historical_bias(self, data: List[Dict]) -> Dict[str, Any]:
        """检测历史偏差"""
        # 简化实现
        return {"bias_detected": False, "bias_level": "low"}

推理数据审计

class InferenceDataAuditor:
    """推理数据审计器"""
    def __init__(self):
        self.input_patterns = {}
        self.output_patterns = {}
    
    def audit_input_data(self, input_logs: List[Dict]) -> Dict[str, Any]:
        """审计输入数据"""
        input_report = {
            "volume_analysis": self._analyze_volume(input_logs),
            "pattern_detection": self._detect_patterns(input_logs),
            "anomaly_detection": self._detect_anomalies(input_logs),
            "privacy_assessment": self._assess_privacy(input_logs)
        }
        
        return input_report
    
    def audit_output_data(self, output_logs: List[Dict]) -> Dict[str, Any]:
        """审计输出数据"""
        output_report = {
            "quality_metrics": self._assess_quality(output_logs),
            "safety_metrics": self._assess_safety(output_logs),
            "consistency_metrics": self._assess_consistency(output_logs),
            "performance_metrics": self._assess_performance(output_logs)
        }
        
        return output_report
    
    def _analyze_volume(self, logs: List[Dict]) -> Dict[str, Any]:
        """分析数据量"""
        return {
            "total_requests": len(logs),
            "average_daily": len(logs) / 30,
            "peak_daily": len(logs) / 15
        }
    
    def _detect_patterns(self, logs: List[Dict]) -> Dict[str, Any]:
        """检测模式"""
        return {
            "temporal_patterns": "normal",
            "user_patterns": "diverse",
            "content_patterns": "varied"
        }
    
    def _detect_anomalies(self, logs: List[Dict]) -> List[Dict]:
        """检测异常"""
        anomalies = []
        # 简化实现
        if len(logs) > 10000:
            anomalies.append({
                "type": "volume_anomaly",
                "severity": "medium",
                "description": "异常高的请求量"
            })
        return anomalies
    
    def _assess_privacy(self, logs: List[Dict]) -> Dict[str, Any]:
        """评估隐私"""
        return {
            "pii_exposure_risk": "low",
            "data_retention_compliance": True,
            "encryption_status": "enabled"
        }
    
    def _assess_quality(self, logs: List[Dict]) -> Dict[str, float]:
        """评估质量"""
        return {
            "accuracy": 0.92,
            "relevance": 0.88,
            "coherence": 0.85
        }
    
    def _assess_safety(self, logs: List[Dict]) -> Dict[str, float]:
        """评估安全性"""
        return {
            "harmful_content_rate": 0.02,
            "bias_detection_rate": 0.05,
            "misinformation_rate": 0.03
        }
    
    def _assess_consistency(self, logs: List[Dict]) -> Dict[str, float]:
        """评估一致性"""
        return {
            "output_consistency": 0.9,
            "style_consistency": 0.88,
            "fact_consistency": 0.85
        }
    
    def _assess_performance(self, logs: List[Dict]) -> Dict[str, float]:
        """评估性能"""
        return {
            "average_latency": 0.5,
            "p95_latency": 1.2,
            "throughput": 1000
        }

第三方审计

审计流程管理

class ThirdPartyAuditManager:
    """第三方审计管理"""
    def __init__(self):
        self.auditors = []
        self.audit_reports = []
        self.compliance_certifications = []
    
    def engage_auditor(self, auditor_info: Dict[str, Any]) -> str:
        """聘请审计师"""
        auditor_id = f"auditor_{len(self.auditors) + 1}"
        
        self.auditors.append({
            "id": auditor_id,
            "name": auditor_info["name"],
            "credentials": auditor_info["credentials"],
            "specialization": auditor_info["specialization"],
            "engagement_date": datetime.now().isoformat()
        })
        
        return auditor_id
    
    def conduct_third_party_audit(self, auditor_id: str, audit_scope: str) -> Dict[str, Any]:
        """执行第三方审计"""
        auditor = self._get_auditor(auditor_id)
        
        audit_report = {
            "audit_id": f"tpa_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            "auditor": auditor["name"],
            "scope": audit_scope,
            "methodology": self._define_methodology(audit_scope),
            "findings": self._collect_findings(auditor_id),
            "recommendations": self._generate_recommendations(auditor_id),
            "certification_status": self._determine_certification(auditor_id)
        }
        
        self.audit_reports.append(audit_report)
        return audit_report
    
    def _get_auditor(self, auditor_id: str) -> Dict[str, Any]:
        """获取审计师信息"""
        for auditor in self.auditors:
            if auditor["id"] == auditor_id:
                return auditor
        raise ValueError(f"Auditor {auditor_id} not found")
    
    def _define_methodology(self, scope: str) -> Dict[str, Any]:
        """定义方法论"""
        methodologies = {
            "security": {
                "tools": ["penetration_testing", "vulnerability_scanning"],
                "standards": ["ISO_27001", "SOC_2"],
                "duration": "4-6 weeks"
            },
            "fairness": {
                "tools": ["fairness_toolkit", "bias_detection"],
                "standards": ["IEEE_7010", "NIST_AI_100-1"],
                "duration": "2-3 weeks"
            },
            "privacy": {
                "tools": ["privacy_assessment", "data_flow_analysis"],
                "standards": ["GDPR", "CCPA", "ISO_27701"],
                "duration": "3-4 weeks"
            }
        }
        
        return methodologies.get(scope, methodologies["security"])
    
    def _collect_findings(self, auditor_id: str) -> List[Dict]:
        """收集发现"""
        # 简化实现
        return [
            {
                "id": "F001",
                "title": "输入验证不足",
                "severity": "medium",
                "description": "某些API端点缺少输入验证",
                "remediation": "添加输入验证逻辑"
            }
        ]
    
    def _generate_recommendations(self, auditor_id: str) -> List[Dict]:
        """生成建议"""
        return [
            {
                "priority": "high",
                "category": "security",
                "recommendation": "实施API网关和速率限制",
                "timeline": "30 days"
            }
        ]
    
    def _determine_certification(self, auditor_id: str) -> Dict[str, Any]:
        """确定认证状态"""
        return {
            "certified": True,
            "certification_level": "A",
            "valid_until": "2027-06-25",
            "conditions": []
        }
    
    def track_remediation(self, finding_id: str, remediation_plan: Dict) -> bool:
        """跟踪修复"""
        # 简化实现
        return True
    
    def generate_compliance_report(self) -> Dict[str, Any]:
        """生成合规报告"""
        return {
            "report_date": datetime.now().isoformat(),
            "audit_summary": self._summarize_audits(),
            "compliance_status": self._assess_compliance(),
            "certifications": self.compliance_certifications,
            "next_audit_date": "2027-06-25"
        }
    
    def _summarize_audits(self) -> Dict[str, Any]:
        """总结审计"""
        return {
            "total_audits": len(self.audit_reports),
            "audits_this_year": len([r for r in self.audit_reports 
                                   if r["audit_id"].startswith("tpa_2026")])
        }
    
    def _assess_compliance(self) -> Dict[str, bool]:
        """评估合规性"""
        return {
            "GDPR": True,
            "SOC_2": True,
            "ISO_27001": True,
            "CCPA": True
        }

审计工具集成

自动化审计工具

class LLMAuditTools:
    """LLM审计工具"""
    def __init__(self):
        self.tools = {
            "fairness": FairnessAuditor(None, ["gender", "age", "race"]),
            "robustness": RobustnessAuditor(None),
            "data_quality": TrainingDataAuditor(),
            "third_party": ThirdPartyAuditManager()
        }
    
    def run_comprehensive_audit(self, model, training_data: List[Dict], 
                               test_data: List[Dict]) -> Dict[str, Any]:
        """运行综合审计"""
        audit_results = {}
        
        # 公平性审计
        fairness_auditor = FairnessAuditor(model, ["gender", "age"])
        audit_results["fairness"] = fairness_auditor.audit_demographic_parity(test_data)
        
        # 鲁棒性审计
        robustness_auditor = RobustnessAuditor(model)
        audit_results["robustness"] = robustness_auditor.audit_adversarial_robustness(test_data)
        
        # 数据质量审计
        data_auditor = TrainingDataAuditor()
        audit_results["data_quality"] = data_auditor.audit_data_quality(training_data)
        
        # 生成审计报告
        audit_framework = LLMAuditFramework("model_name", "comprehensive")
        audit_framework.findings = self._extract_findings(audit_results)
        audit_report = audit_framework.conduct_audit()
        
        return audit_report
    
    def _extract_findings(self, audit_results: Dict) -> List[Dict]:
        """提取发现"""
        findings = []
        
        # 从公平性结果中提取
        if "fairness" in audit_results:
            for attr, metrics in audit_results["fairness"].items():
                if not metrics.get("passes_threshold", True):
                    findings.append({
                        "id": f"fairness_{attr}",
                        "title": f"{attr}公平性问题",
                        "severity": "high",
                        "category": "fairness",
                        "description": f"{attr}存在显著差异"
                    })
        
        return findings

总结

LLM审计是确保模型安全、公平和合规的重要过程。通过算法审计、数据审计和第三方审计,可以全面评估模型的风险和性能。建立完善的审计框架和工具,有助于组织及时发现和解决问题,建立用户信任。