← 返回首页
🧠

输出过滤:保护LLM输出安全

📂 llm ⏱ 4 min 688 words

--- title: "输出过滤:保护LLM输出安全" description: "过滤和验证LLM生成的内容,确保输出安全可靠" tags: ["输出过滤", "内容过滤", "安全输出", "LLM", "防护"] category: "llm" icon: "🔒"

输出过滤:保护LLM输出安全

输出过滤概述

输出过滤是确保LLM生成内容安全、准确和适当的关键环节,防止有害信息泄露。

过滤组件

1. 输出过滤器

import re
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class FilterResult:
    """过滤结果"""
    filtered: bool
    original_text: str
    filtered_text: str
    violations: List[Dict]
    action_taken: str

class OutputFilter:
    """输出过滤器"""
    
    def __init__(self):
        self.content_policies = []
        self.filter_strategies = {
            "block": self._block_content,
            "redact": self._redact_content,
            "replace": self._replace_content,
            "refuse": self._refuse_content
        }
    
    def add_content_policy(self, name: str, pattern: str, action: str = "redact"):
        """添加内容策略"""
        self.content_policies.append({
            "name": name,
            "pattern": pattern,
            "action": action
        })
    
    def filter_output(self, text: str, default_action: str = "redact") -> FilterResult:
        """过滤输出"""
        violations = []
        filtered_text = text
        
        for policy in self.content_policies:
            matches = re.findall(policy["pattern"], text, re.IGNORECASE)
            if matches:
                violations.append({
                    "policy": policy["name"],
                    "matches": matches,
                    "action": policy["action"]
                })
                
                action = policy["action"]
                filter_func = self.filter_strategies.get(action, self._redact_content)
                filtered_text = filter_func(filtered_text, policy["pattern"])
        
        return FilterResult(
            filtered=len(violations) > 0,
            original_text=text,
            filtered_text=filtered_text,
            violations=violations,
            action_taken=default_action if violations else "none"
        )
    
    def _block_content(self, text: str, pattern: str) -> str:
        """阻止内容"""
        return "[内容被阻止]"
    
    def _redact_content(self, text: str, pattern: str) -> str:
        """编辑内容"""
        return re.sub(pattern, "[REDACTED]", text, flags=re.IGNORECASE)
    
    def _replace_content(self, text: str, pattern: str) -> str:
        """替换内容"""
        return re.sub(pattern, "***", text, flags=re.IGNORECASE)
    
    def _refuse_content(self, text: str, pattern: str) -> str:
        """拒绝内容"""
        return "抱歉,我无法生成包含此类内容的响应。"

2. 敏感信息过滤

class SensitiveInfoFilter:
    """敏感信息过滤器"""
    
    def __init__(self):
        self.sensitive_patterns = {
            "email": r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
            "phone": r'1[3-9]\d{9}',
            "id_card": r'\d{17}[\dXx]',
            "credit_card": r'\d{16}',
            "ssn": r'\d{3}-\d{2}-\d{4}'
        }
    
    def filter_sensitive_info(self, text: str) -> Dict:
        """过滤敏感信息"""
        filtered_text = text
        found_info = []
        
        for info_type, pattern in self.sensitive_patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                found_info.append({
                    "type": info_type,
                    "count": len(matches),
                    "pattern": pattern
                })
                
                # 替换敏感信息
                filtered_text = re.sub(pattern, f'[{info_type.upper()}_REDACTED]', filtered_text)
        
        return {
            "filtered_text": filtered_text,
            "found_sensitive_info": found_info,
            "has_sensitive_info": len(found_info) > 0
        }

3. 质量过滤器

class QualityFilter:
    """质量过滤器"""
    
    def __init__(self):
        self.quality_checks = []
    
    def add_quality_check(self, name: str, check_func):
        """添加质量检查"""
        self.quality_checks.append({
            "name": name,
            "check": check_func
        })
    
    def check_quality(self, text: str) -> Dict:
        """检查质量"""
        issues = []
        
        for check in self.quality_checks:
            try:
                result = check["check"](text)
                if not result.get("passed", True):
                    issues.append({
                        "check": check["name"],
                        "message": result.get("message", "质量检查失败")
                    })
            except Exception as e:
                issues.append({
                    "check": check["name"],
                    "message": f"检查执行失败: {str(e)}"
                })
        
        return {
            "quality_score": 1.0 - (len(issues) / len(self.quality_checks)) if self.quality_checks else 1.0,
            "issues": issues,
            "passed": len(issues) == 0
        }

# 添加默认质量检查
quality_filter = QualityFilter()
quality_filter.add_quality_check("length", lambda t: {"passed": len(t) > 10, "message": "内容过短"})
quality_filter.add_quality_check("no_repetition", lambda t: {"passed": t.count(t[:10]) < 3, "message": "内容重复"})
quality_filter.add_quality_check("coherence", lambda t: {"passed": "。" in t or len(t) < 50, "message": "内容缺乏标点"})

高级过滤

1. 上下文感知过滤

class ContextAwareFilter:
    """上下文感知过滤器"""
    
    def __init__(self):
        self.conversation_history = []
        self.context_rules = []
    
    def add_context_rule(self, name: str, rule_func):
        """添加上下文规则"""
        self.context_rules.append({
            "name": name,
            "rule": rule_func
        })
    
    def filter_with_context(self, text: str, context: Dict = None) -> Dict:
        """带上下文过滤"""
        if context and "history" in context:
            self.conversation_history = context["history"][-10:]
        
        violations = []
        
        for rule in self.context_rules:
            try:
                result = rule["rule"](text, self.conversation_history)
                if not result.get("passed", True):
                    violations.append({
                        "rule": rule["name"],
                        "message": result.get("message", "")
                    })
            except Exception as e:
                violations.append({
                    "rule": rule["name"],
                    "message": f"规则执行失败: {str(e)}"
                })
        
        return {
            "filtered": len(violations) > 0,
            "violations": violations,
            "context_applied": bool(self.conversation_history)
        }

2. 实时过滤

class RealTimeFilter:
    """实时过滤器"""
    
    def __init__(self):
        self.buffer = ""
        self.filter_queue = []
    
    def process_stream(self, token: str) -> str:
        """处理流式输出"""
        self.buffer += token
        
        # 检查是否需要过滤
        for pattern in self.filter_queue:
            if re.search(pattern, self.buffer, re.IGNORECASE):
                # 清空缓冲区并返回过滤后的内容
                filtered = re.sub(pattern, "[FILTERED]", self.buffer, flags=re.IGNORECASE)
                self.buffer = ""
                return filtered
        
        # 检查是否可以安全输出
        if self._is_safe_to_output(self.buffer):
            output = self.buffer
            self.buffer = ""
            return output
        
        return ""
    
    def _is_safe_to_output(self, text: str) -> bool:
        """检查是否可以安全输出"""
        # 简化实现:检查是否有未完成的敏感词
        sensitive_words = ["密码", "密钥", "token"]
        for word in sensitive_words:
            if word in text and text.index(word) + len(word) > len(text) - 5:
                return False
        return True
    
    def flush(self) -> str:
        """清空缓冲区"""
        output = self.buffer
        self.buffer = ""
        return output

过滤配置

class FilterConfiguration:
    """过滤配置"""
    
    def __init__(self):
        self.config = {
            "enabled": True,
            "strict_mode": False,
            "log_level": "INFO",
            "default_action": "redact",
            "sensitive_info_filter": True,
            "content_policy_filter": True,
            "quality_filter": True
        }
    
    def update_config(self, key: str, value):
        """更新配置"""
        self.config[key] = value
    
    def get_config(self) -> Dict:
        """获取配置"""
        return self.config.copy()

完整过滤管道

class OutputFilterPipeline:
    """输出过滤管道"""
    
    def __init__(self):
        self.content_filter = OutputFilter()
        self.sensitive_filter = SensitiveInfoFilter()
        self.quality_filter = QualityFilter()
        self._setup_default_policies()
    
    def _setup_default_policies(self):
        """设置默认策略"""
        # 内容策略
        self.content_filter.add_content_policy("violence", r"暴力|伤害|杀死", "redact")
        self.content_filter.add_content_policy("hate", r"仇恨|歧视|种族", "redact")
        self.content_filter.add_content_policy("illegal", r"非法|违法|犯罪", "redact")
    
    def filter(self, text: str) -> Dict:
        """执行过滤"""
        # 1. 内容过滤
        content_result = self.content_filter.filter_output(text)
        
        # 2. 敏感信息过滤
        sensitive_result = self.sensitive_filter.filter_sensitive_info(content_result.filtered_text)
        
        # 3. 质量检查
        quality_result = self.quality_filter.check_quality(sensitive_result["filtered_text"])
        
        return {
            "final_text": sensitive_result["filtered_text"],
            "content_filtered": content_result.filtered,
            "sensitive_info_filtered": sensitive_result["has_sensitive_info"],
            "quality_passed": quality_result["passed"],
            "violations": content_result.violations,
            "quality_issues": quality_result["issues"]
        }

# 使用示例
pipeline = OutputFilterPipeline()

test_output = "用户密码是123456,邮箱是test@example.com,这是一段正常内容。"
result = pipeline.filter(test_output)

print(f"原始文本: {test_output}")
print(f"过滤后: {result['final_text']}")
print(f"内容过滤: {result['content_filtered']}")
print(f"敏感信息过滤: {result['sensitive_info_filtered']}")

最佳实践

  1. 多层过滤:实施内容、敏感信息、质量多层过滤
  2. 可配置性:提供灵活的过滤策略配置
  3. 性能优化:优化过滤性能
  4. 日志记录:完整记录过滤事件

总结

输出过滤是确保LLM生成内容安全可靠的关键环节。通过多层过滤机制,可以有效防止有害信息和敏感数据泄露。