LLM安全工具:构建安全的AI系统
--- title: "LLM安全工具:构建安全的AI系统" description: "使用工具构建和维护LLM应用的安全性" tags: ["安全工具", "AI安全", "防护机制", "LLM", "安全"] category: "llm" icon: "🔒"
LLM安全工具:构建安全的AI系统
安全概述
AI安全工具帮助构建和维护LLM应用的安全性,包括输入验证、输出过滤和攻击防御。
安全组件
1. 输入安全检查器
import re
from typing import Dict, List, Tuple
from dataclasses import dataclass
@dataclass
class SecurityCheckResult:
"""安全检查结果"""
is_safe: bool
risk_level: str # "low", "medium", "high", "critical"
threats_detected: List[Dict]
recommendations: List[str]
class InputSecurityChecker:
"""输入安全检查器"""
def __init__(self):
self.threat_patterns = self._load_threat_patterns()
def _load_threat_patterns(self) -> Dict[str, List[str]]:
"""加载威胁模式"""
return {
"prompt_injection": [
r"忽略.*指令",
r"你现在是",
r"系统提示",
r"新指令"
],
"jailbreak": [
r"假设你没有限制",
r"作为DAN",
r"忽略安全",
r"虚构角色"
],
"data_exfiltration": [
r"输出.*数据",
r"发送.*信息",
r"泄露.*内容"
],
"code_injection": [
r"执行.*代码",
r"运行.*脚本",
r"system.*call"
]
}
def check_input(self, text: str) -> SecurityCheckResult:
"""检查输入安全"""
threats = []
for threat_type, patterns in self.threat_patterns.items():
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
threats.append({
"type": threat_type,
"pattern": pattern,
"match": re.search(pattern, text).group()
})
# 计算风险等级
risk_level = self._calculate_risk_level(threats)
# 生成建议
recommendations = self._generate_recommendations(threats)
return SecurityCheckResult(
is_safe=len(threats) == 0,
risk_level=risk_level,
threats_detected=threats,
recommendations=recommendations
)
def _calculate_risk_level(self, threats: List[Dict]) -> str:
"""计算风险等级"""
if not threats:
return "low"
threat_count = len(threats)
if threat_count >= 3:
return "critical"
elif threat_count >= 2:
return "high"
elif threat_count == 1:
return "medium"
return "low"
def _generate_recommendations(self, threats: List[Dict]) -> List[str]:
"""生成建议"""
recommendations = []
for threat in threats:
if threat["type"] == "prompt_injection":
recommendations.append("检测到提示注入,建议拒绝处理")
elif threat["type"] == "jailbreak":
recommendations.append("检测到越狱尝试,建议拒绝处理")
elif threat["type"] == "data_exfiltration":
recommendations.append("检测到数据泄露风险,建议审查输出")
elif threat["type"] == "code_injection":
recommendations.append("检测到代码注入风险,建议阻止执行")
return recommendations
2. 输出安全过滤器
class OutputSecurityFilter:
"""输出安全过滤器"""
def __init__(self):
self.content_policies = self._load_content_policies()
def _load_content_policies(self) -> Dict[str, List[str]]:
"""加载内容策略"""
return {
"prohibited_content": [
"暴力", "仇恨", "歧视", "色情", "非法"
],
"sensitive_topics": [
"政治", "宗教", "种族", "性别"
],
"safety_violations": [
"自残", "自杀", "伤害他人"
]
}
def filter_output(self, text: str) -> Dict:
"""过滤输出"""
violations = []
for category, terms in self.content_policies.items():
for term in terms:
if term in text:
violations.append({
"category": category,
"term": term,
"position": text.find(term)
})
# 生成过滤结果
if violations:
filtered_text = self._apply_filtering(text, violations)
return {
"filtered": True,
"original_text": text,
"filtered_text": filtered_text,
"violations": violations,
"action": "filtered"
}
else:
return {
"filtered": False,
"text": text,
"violations": [],
"action": "passed"
}
def _apply_filtering(self, text: str, violations: List[Dict]) -> str:
"""应用过滤"""
filtered_text = text
# 按位置倒序替换,避免索引问题
sorted_violations = sorted(violations, key=lambda x: x["position"], reverse=True)
for violation in sorted_violations:
term = violation["term"]
replacement = "***"
filtered_text = filtered_text.replace(term, replacement)
return filtered_text
3. 安全防护层
class SecurityGuardrail:
"""安全防护层"""
def __init__(self):
self.input_checker = InputSecurityChecker()
self.output_filter = OutputSecurityFilter()
self.security_log = []
def process_request(self, input_text: str, model_func) -> Dict:
"""处理请求(带安全防护)"""
# 1. 输入检查
input_check = self.input_checker.check_input(input_text)
if not input_check.is_safe:
self._log_security_event("input_blocked", input_check)
return {
"success": False,
"error": "输入被安全策略阻止",
"risk_level": input_check.risk_level,
"threats": input_check.threats_detected
}
# 2. 调用模型
try:
model_output = model_func(input_text)
except Exception as e:
return {
"success": False,
"error": f"模型调用失败: {str(e)}"
}
# 3. 输出过滤
output_filter_result = self.output_filter.filter_output(model_output)
if output_filter_result["filtered"]:
self._log_security_event("output_filtered", output_filter_result)
return {
"success": True,
"output": output_filter_result["filtered_text"],
"filtered": True,
"violations": output_filter_result["violations"]
}
return {
"success": True,
"output": model_output,
"filtered": False
}
def _log_security_event(self, event_type: str, details: Dict):
"""记录安全事件"""
self.security_log.append({
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"details": details
})
安全监控
class SecurityMonitor:
"""安全监控"""
def __init__(self):
self.alert_thresholds = {
"critical": 1,
"high": 5,
"medium": 10
}
self.incident_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
def monitor_request(self, security_result: Dict) -> Dict:
"""监控请求"""
risk_level = security_result.get("risk_level", "low")
# 更新计数
self.incident_counts[risk_level] = self.incident_counts.get(risk_level, 0) + 1
# 检查是否需要告警
needs_alert = self._check_alert_threshold(risk_level)
return {
"risk_level": risk_level,
"needs_alert": needs_alert,
"incident_counts": self.incident_counts
}
def _check_alert_threshold(self, risk_level: str) -> bool:
"""检查告警阈值"""
threshold = self.alert_thresholds.get(risk_level, 100)
count = self.incident_counts.get(risk_level, 0)
return count >= threshold
def get_security_summary(self) -> Dict:
"""获取安全摘要"""
total_incidents = sum(self.incident_counts.values())
return {
"total_incidents": total_incidents,
"by_risk_level": self.incident_counts.copy(),
"critical_rate": self.incident_counts["critical"] / total_incidents if total_incidents > 0 else 0
}
安全配置
class SecurityConfig:
"""安全配置"""
def __init__(self):
self.settings = {
"input_validation": True,
"output_filtering": True,
"rate_limiting": True,
"max_request_length": 10000,
"allowed_characters": "default",
"blocked_patterns": [],
"alert_email": None,
"log_level": "INFO"
}
def update_setting(self, key: str, value):
"""更新设置"""
if key in self.settings:
self.settings[key] = value
def get_setting(self, key: str):
"""获取设置"""
return self.settings.get(key)
def validate_config(self) -> Dict:
"""验证配置"""
issues = []
if self.settings["max_request_length"] > 100000:
issues.append("请求长度限制过大")
if not self.settings["input_validation"]:
issues.append("输入验证已禁用")
if not self.settings["output_filtering"]:
issues.append("输出过滤已禁用")
return {
"is_valid": len(issues) == 0,
"issues": issues
}
# 使用示例
security_config = SecurityConfig()
security_config.update_setting("max_request_length", 5000)
security_config.update_setting("alert_email", "security@example.com")
config_validation = security_config.validate_config()
print(f"配置验证: {'通过' if config_validation['is_valid'] else '失败'}")
最佳实践
- 多层防护:实施输入、处理、输出多层安全防护
- 持续监控:实时监控安全事件
- 定期更新:定期更新安全规则和模式
- 安全审计:定期进行安全审计
总结
AI安全工具是构建可靠LLM应用的重要保障。通过多层安全防护和持续监控,可以有效防范各种安全威胁。