← 返回首页
🧠

LLM安全工具:构建安全的AI系统

📂 llm ⏱ 4 min 638 words

--- title: "LLM安全工具:构建安全的AI系统" description: "使用工具构建和维护LLM应用的安全性" tags: ["安全工具", "AI安全", "防护机制", "LLM", "安全"] category: "llm" icon: "🔒"

LLM安全工具:构建安全的AI系统

安全概述

AI安全工具帮助构建和维护LLM应用的安全性,包括输入验证、输出过滤和攻击防御。

安全组件

1. 输入安全检查器

import re
from typing import Dict, List, Tuple
from dataclasses import dataclass

@dataclass
class SecurityCheckResult:
    """安全检查结果"""
    is_safe: bool
    risk_level: str  # "low", "medium", "high", "critical"
    threats_detected: List[Dict]
    recommendations: List[str]

class InputSecurityChecker:
    """输入安全检查器"""
    
    def __init__(self):
        self.threat_patterns = self._load_threat_patterns()
    
    def _load_threat_patterns(self) -> Dict[str, List[str]]:
        """加载威胁模式"""
        return {
            "prompt_injection": [
                r"忽略.*指令",
                r"你现在是",
                r"系统提示",
                r"新指令"
            ],
            "jailbreak": [
                r"假设你没有限制",
                r"作为DAN",
                r"忽略安全",
                r"虚构角色"
            ],
            "data_exfiltration": [
                r"输出.*数据",
                r"发送.*信息",
                r"泄露.*内容"
            ],
            "code_injection": [
                r"执行.*代码",
                r"运行.*脚本",
                r"system.*call"
            ]
        }
    
    def check_input(self, text: str) -> SecurityCheckResult:
        """检查输入安全"""
        threats = []
        
        for threat_type, patterns in self.threat_patterns.items():
            for pattern in patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    threats.append({
                        "type": threat_type,
                        "pattern": pattern,
                        "match": re.search(pattern, text).group()
                    })
        
        # 计算风险等级
        risk_level = self._calculate_risk_level(threats)
        
        # 生成建议
        recommendations = self._generate_recommendations(threats)
        
        return SecurityCheckResult(
            is_safe=len(threats) == 0,
            risk_level=risk_level,
            threats_detected=threats,
            recommendations=recommendations
        )
    
    def _calculate_risk_level(self, threats: List[Dict]) -> str:
        """计算风险等级"""
        if not threats:
            return "low"
        
        threat_count = len(threats)
        if threat_count >= 3:
            return "critical"
        elif threat_count >= 2:
            return "high"
        elif threat_count == 1:
            return "medium"
        return "low"
    
    def _generate_recommendations(self, threats: List[Dict]) -> List[str]:
        """生成建议"""
        recommendations = []
        
        for threat in threats:
            if threat["type"] == "prompt_injection":
                recommendations.append("检测到提示注入,建议拒绝处理")
            elif threat["type"] == "jailbreak":
                recommendations.append("检测到越狱尝试,建议拒绝处理")
            elif threat["type"] == "data_exfiltration":
                recommendations.append("检测到数据泄露风险,建议审查输出")
            elif threat["type"] == "code_injection":
                recommendations.append("检测到代码注入风险,建议阻止执行")
        
        return recommendations

2. 输出安全过滤器

class OutputSecurityFilter:
    """输出安全过滤器"""
    
    def __init__(self):
        self.content_policies = self._load_content_policies()
    
    def _load_content_policies(self) -> Dict[str, List[str]]:
        """加载内容策略"""
        return {
            "prohibited_content": [
                "暴力", "仇恨", "歧视", "色情", "非法"
            ],
            "sensitive_topics": [
                "政治", "宗教", "种族", "性别"
            ],
            "safety_violations": [
                "自残", "自杀", "伤害他人"
            ]
        }
    
    def filter_output(self, text: str) -> Dict:
        """过滤输出"""
        violations = []
        
        for category, terms in self.content_policies.items():
            for term in terms:
                if term in text:
                    violations.append({
                        "category": category,
                        "term": term,
                        "position": text.find(term)
                    })
        
        # 生成过滤结果
        if violations:
            filtered_text = self._apply_filtering(text, violations)
            return {
                "filtered": True,
                "original_text": text,
                "filtered_text": filtered_text,
                "violations": violations,
                "action": "filtered"
            }
        else:
            return {
                "filtered": False,
                "text": text,
                "violations": [],
                "action": "passed"
            }
    
    def _apply_filtering(self, text: str, violations: List[Dict]) -> str:
        """应用过滤"""
        filtered_text = text
        
        # 按位置倒序替换,避免索引问题
        sorted_violations = sorted(violations, key=lambda x: x["position"], reverse=True)
        
        for violation in sorted_violations:
            term = violation["term"]
            replacement = "***"
            filtered_text = filtered_text.replace(term, replacement)
        
        return filtered_text

3. 安全防护层

class SecurityGuardrail:
    """安全防护层"""
    
    def __init__(self):
        self.input_checker = InputSecurityChecker()
        self.output_filter = OutputSecurityFilter()
        self.security_log = []
    
    def process_request(self, input_text: str, model_func) -> Dict:
        """处理请求(带安全防护)"""
        # 1. 输入检查
        input_check = self.input_checker.check_input(input_text)
        
        if not input_check.is_safe:
            self._log_security_event("input_blocked", input_check)
            return {
                "success": False,
                "error": "输入被安全策略阻止",
                "risk_level": input_check.risk_level,
                "threats": input_check.threats_detected
            }
        
        # 2. 调用模型
        try:
            model_output = model_func(input_text)
        except Exception as e:
            return {
                "success": False,
                "error": f"模型调用失败: {str(e)}"
            }
        
        # 3. 输出过滤
        output_filter_result = self.output_filter.filter_output(model_output)
        
        if output_filter_result["filtered"]:
            self._log_security_event("output_filtered", output_filter_result)
            return {
                "success": True,
                "output": output_filter_result["filtered_text"],
                "filtered": True,
                "violations": output_filter_result["violations"]
            }
        
        return {
            "success": True,
            "output": model_output,
            "filtered": False
        }
    
    def _log_security_event(self, event_type: str, details: Dict):
        """记录安全事件"""
        self.security_log.append({
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "details": details
        })

安全监控

class SecurityMonitor:
    """安全监控"""
    
    def __init__(self):
        self.alert_thresholds = {
            "critical": 1,
            "high": 5,
            "medium": 10
        }
        self.incident_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
    
    def monitor_request(self, security_result: Dict) -> Dict:
        """监控请求"""
        risk_level = security_result.get("risk_level", "low")
        
        # 更新计数
        self.incident_counts[risk_level] = self.incident_counts.get(risk_level, 0) + 1
        
        # 检查是否需要告警
        needs_alert = self._check_alert_threshold(risk_level)
        
        return {
            "risk_level": risk_level,
            "needs_alert": needs_alert,
            "incident_counts": self.incident_counts
        }
    
    def _check_alert_threshold(self, risk_level: str) -> bool:
        """检查告警阈值"""
        threshold = self.alert_thresholds.get(risk_level, 100)
        count = self.incident_counts.get(risk_level, 0)
        return count >= threshold
    
    def get_security_summary(self) -> Dict:
        """获取安全摘要"""
        total_incidents = sum(self.incident_counts.values())
        
        return {
            "total_incidents": total_incidents,
            "by_risk_level": self.incident_counts.copy(),
            "critical_rate": self.incident_counts["critical"] / total_incidents if total_incidents > 0 else 0
        }

安全配置

class SecurityConfig:
    """安全配置"""
    
    def __init__(self):
        self.settings = {
            "input_validation": True,
            "output_filtering": True,
            "rate_limiting": True,
            "max_request_length": 10000,
            "allowed_characters": "default",
            "blocked_patterns": [],
            "alert_email": None,
            "log_level": "INFO"
        }
    
    def update_setting(self, key: str, value):
        """更新设置"""
        if key in self.settings:
            self.settings[key] = value
    
    def get_setting(self, key: str):
        """获取设置"""
        return self.settings.get(key)
    
    def validate_config(self) -> Dict:
        """验证配置"""
        issues = []
        
        if self.settings["max_request_length"] > 100000:
            issues.append("请求长度限制过大")
        
        if not self.settings["input_validation"]:
            issues.append("输入验证已禁用")
        
        if not self.settings["output_filtering"]:
            issues.append("输出过滤已禁用")
        
        return {
            "is_valid": len(issues) == 0,
            "issues": issues
        }

# 使用示例
security_config = SecurityConfig()
security_config.update_setting("max_request_length", 5000)
security_config.update_setting("alert_email", "security@example.com")

config_validation = security_config.validate_config()
print(f"配置验证: {'通过' if config_validation['is_valid'] else '失败'}")

最佳实践

  1. 多层防护:实施输入、处理、输出多层安全防护
  2. 持续监控:实时监控安全事件
  3. 定期更新:定期更新安全规则和模式
  4. 安全审计:定期进行安全审计

总结

AI安全工具是构建可靠LLM应用的重要保障。通过多层安全防护和持续监控,可以有效防范各种安全威胁。