输出过滤:保护LLM输出安全
--- title: "输出过滤:保护LLM输出安全" description: "过滤和验证LLM生成的内容,确保输出安全可靠" tags: ["输出过滤", "内容过滤", "安全输出", "LLM", "防护"] category: "llm" icon: "🔒"
输出过滤:保护LLM输出安全
输出过滤概述
输出过滤是确保LLM生成内容安全、准确和适当的关键环节,防止有害信息泄露。
过滤组件
1. 输出过滤器
import re
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class FilterResult:
"""过滤结果"""
filtered: bool
original_text: str
filtered_text: str
violations: List[Dict]
action_taken: str
class OutputFilter:
"""输出过滤器"""
def __init__(self):
self.content_policies = []
self.filter_strategies = {
"block": self._block_content,
"redact": self._redact_content,
"replace": self._replace_content,
"refuse": self._refuse_content
}
def add_content_policy(self, name: str, pattern: str, action: str = "redact"):
"""添加内容策略"""
self.content_policies.append({
"name": name,
"pattern": pattern,
"action": action
})
def filter_output(self, text: str, default_action: str = "redact") -> FilterResult:
"""过滤输出"""
violations = []
filtered_text = text
for policy in self.content_policies:
matches = re.findall(policy["pattern"], text, re.IGNORECASE)
if matches:
violations.append({
"policy": policy["name"],
"matches": matches,
"action": policy["action"]
})
action = policy["action"]
filter_func = self.filter_strategies.get(action, self._redact_content)
filtered_text = filter_func(filtered_text, policy["pattern"])
return FilterResult(
filtered=len(violations) > 0,
original_text=text,
filtered_text=filtered_text,
violations=violations,
action_taken=default_action if violations else "none"
)
def _block_content(self, text: str, pattern: str) -> str:
"""阻止内容"""
return "[内容被阻止]"
def _redact_content(self, text: str, pattern: str) -> str:
"""编辑内容"""
return re.sub(pattern, "[REDACTED]", text, flags=re.IGNORECASE)
def _replace_content(self, text: str, pattern: str) -> str:
"""替换内容"""
return re.sub(pattern, "***", text, flags=re.IGNORECASE)
def _refuse_content(self, text: str, pattern: str) -> str:
"""拒绝内容"""
return "抱歉,我无法生成包含此类内容的响应。"
2. 敏感信息过滤
class SensitiveInfoFilter:
"""敏感信息过滤器"""
def __init__(self):
self.sensitive_patterns = {
"email": r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
"phone": r'1[3-9]\d{9}',
"id_card": r'\d{17}[\dXx]',
"credit_card": r'\d{16}',
"ssn": r'\d{3}-\d{2}-\d{4}'
}
def filter_sensitive_info(self, text: str) -> Dict:
"""过滤敏感信息"""
filtered_text = text
found_info = []
for info_type, pattern in self.sensitive_patterns.items():
matches = re.findall(pattern, text)
if matches:
found_info.append({
"type": info_type,
"count": len(matches),
"pattern": pattern
})
# 替换敏感信息
filtered_text = re.sub(pattern, f'[{info_type.upper()}_REDACTED]', filtered_text)
return {
"filtered_text": filtered_text,
"found_sensitive_info": found_info,
"has_sensitive_info": len(found_info) > 0
}
3. 质量过滤器
class QualityFilter:
"""质量过滤器"""
def __init__(self):
self.quality_checks = []
def add_quality_check(self, name: str, check_func):
"""添加质量检查"""
self.quality_checks.append({
"name": name,
"check": check_func
})
def check_quality(self, text: str) -> Dict:
"""检查质量"""
issues = []
for check in self.quality_checks:
try:
result = check["check"](text)
if not result.get("passed", True):
issues.append({
"check": check["name"],
"message": result.get("message", "质量检查失败")
})
except Exception as e:
issues.append({
"check": check["name"],
"message": f"检查执行失败: {str(e)}"
})
return {
"quality_score": 1.0 - (len(issues) / len(self.quality_checks)) if self.quality_checks else 1.0,
"issues": issues,
"passed": len(issues) == 0
}
# 添加默认质量检查
quality_filter = QualityFilter()
quality_filter.add_quality_check("length", lambda t: {"passed": len(t) > 10, "message": "内容过短"})
quality_filter.add_quality_check("no_repetition", lambda t: {"passed": t.count(t[:10]) < 3, "message": "内容重复"})
quality_filter.add_quality_check("coherence", lambda t: {"passed": "。" in t or len(t) < 50, "message": "内容缺乏标点"})
高级过滤
1. 上下文感知过滤
class ContextAwareFilter:
"""上下文感知过滤器"""
def __init__(self):
self.conversation_history = []
self.context_rules = []
def add_context_rule(self, name: str, rule_func):
"""添加上下文规则"""
self.context_rules.append({
"name": name,
"rule": rule_func
})
def filter_with_context(self, text: str, context: Dict = None) -> Dict:
"""带上下文过滤"""
if context and "history" in context:
self.conversation_history = context["history"][-10:]
violations = []
for rule in self.context_rules:
try:
result = rule["rule"](text, self.conversation_history)
if not result.get("passed", True):
violations.append({
"rule": rule["name"],
"message": result.get("message", "")
})
except Exception as e:
violations.append({
"rule": rule["name"],
"message": f"规则执行失败: {str(e)}"
})
return {
"filtered": len(violations) > 0,
"violations": violations,
"context_applied": bool(self.conversation_history)
}
2. 实时过滤
class RealTimeFilter:
"""实时过滤器"""
def __init__(self):
self.buffer = ""
self.filter_queue = []
def process_stream(self, token: str) -> str:
"""处理流式输出"""
self.buffer += token
# 检查是否需要过滤
for pattern in self.filter_queue:
if re.search(pattern, self.buffer, re.IGNORECASE):
# 清空缓冲区并返回过滤后的内容
filtered = re.sub(pattern, "[FILTERED]", self.buffer, flags=re.IGNORECASE)
self.buffer = ""
return filtered
# 检查是否可以安全输出
if self._is_safe_to_output(self.buffer):
output = self.buffer
self.buffer = ""
return output
return ""
def _is_safe_to_output(self, text: str) -> bool:
"""检查是否可以安全输出"""
# 简化实现:检查是否有未完成的敏感词
sensitive_words = ["密码", "密钥", "token"]
for word in sensitive_words:
if word in text and text.index(word) + len(word) > len(text) - 5:
return False
return True
def flush(self) -> str:
"""清空缓冲区"""
output = self.buffer
self.buffer = ""
return output
过滤配置
class FilterConfiguration:
"""过滤配置"""
def __init__(self):
self.config = {
"enabled": True,
"strict_mode": False,
"log_level": "INFO",
"default_action": "redact",
"sensitive_info_filter": True,
"content_policy_filter": True,
"quality_filter": True
}
def update_config(self, key: str, value):
"""更新配置"""
self.config[key] = value
def get_config(self) -> Dict:
"""获取配置"""
return self.config.copy()
完整过滤管道
class OutputFilterPipeline:
"""输出过滤管道"""
def __init__(self):
self.content_filter = OutputFilter()
self.sensitive_filter = SensitiveInfoFilter()
self.quality_filter = QualityFilter()
self._setup_default_policies()
def _setup_default_policies(self):
"""设置默认策略"""
# 内容策略
self.content_filter.add_content_policy("violence", r"暴力|伤害|杀死", "redact")
self.content_filter.add_content_policy("hate", r"仇恨|歧视|种族", "redact")
self.content_filter.add_content_policy("illegal", r"非法|违法|犯罪", "redact")
def filter(self, text: str) -> Dict:
"""执行过滤"""
# 1. 内容过滤
content_result = self.content_filter.filter_output(text)
# 2. 敏感信息过滤
sensitive_result = self.sensitive_filter.filter_sensitive_info(content_result.filtered_text)
# 3. 质量检查
quality_result = self.quality_filter.check_quality(sensitive_result["filtered_text"])
return {
"final_text": sensitive_result["filtered_text"],
"content_filtered": content_result.filtered,
"sensitive_info_filtered": sensitive_result["has_sensitive_info"],
"quality_passed": quality_result["passed"],
"violations": content_result.violations,
"quality_issues": quality_result["issues"]
}
# 使用示例
pipeline = OutputFilterPipeline()
test_output = "用户密码是123456,邮箱是test@example.com,这是一段正常内容。"
result = pipeline.filter(test_output)
print(f"原始文本: {test_output}")
print(f"过滤后: {result['final_text']}")
print(f"内容过滤: {result['content_filtered']}")
print(f"敏感信息过滤: {result['sensitive_info_filtered']}")
最佳实践
- 多层过滤:实施内容、敏感信息、质量多层过滤
- 可配置性:提供灵活的过滤策略配置
- 性能优化:优化过滤性能
- 日志记录:完整记录过滤事件
总结
输出过滤是确保LLM生成内容安全可靠的关键环节。通过多层过滤机制,可以有效防止有害信息和敏感数据泄露。