← 返回首页
🧠

LLM毒性:检测和过滤有害内容

📂 llm ⏱ 4 min 751 words

--- title: "LLM毒性:检测和过滤有害内容" description: "检测和过滤LLM生成的有毒、有害内容,确保输出安全" tags: ["毒性检测", "内容安全", "有害内容", "LLM", "过滤"] category: "llm" icon: "🚫"

LLM毒性:检测和过滤有害内容

毒性概述

毒性检测是识别和过滤LLM生成的有害、冒犯性或不适当内容的技术。

毒性检测方法

1. 基于规则的检测

import re
from typing import List, Dict, Tuple
from dataclasses import dataclass
from enum import Enum

class ToxicityType(Enum):
    HATE_SPEECH = "hate_speech"
    HARASSMENT = "harassment"
    VIOLENCE = "violence"
    SELF_HARM = "self_harm"
    SEXUAL = "sexual"
    PROFANITY = "profanity"

@dataclass
class ToxicityResult:
    """毒性检测结果"""
    text: str
    is_toxic: bool
    toxicity_score: float
    toxicity_types: List[Dict[str, float]]
    flagged_terms: List[str]

class RuleBasedToxicityDetector:
    """基于规则的毒性检测器"""
    
    def __init__(self):
        self.toxicity_patterns = self._load_patterns()
    
    def _load_patterns(self) -> Dict[ToxicityType, List[str]]:
        """加载毒性模式"""
        return {
            ToxicityType.HATE_SPEECH: [
                r"仇恨.*言论",
                r"种族.*歧视",
                r"性别.*歧视"
            ],
            ToxicityType.HARASSMENT: [
                r"骚扰.*他人",
                r"威胁.*人身",
                r"霸凌"
            ],
            ToxicityType.VIOLENCE: [
                r"暴力.*行为",
                r"伤害.*他人",
                r"杀人"
            ],
            ToxicityType.SELF_HARM: [
                r"自杀.*方法",
                r"自残.*行为",
                r"结束.*生命"
            ],
            ToxicityType.SEXUAL: [
                r"色情.*内容",
                r"性.*骚扰",
                r"不当.*性"
            ],
            ToxicityType.PROFANITY: [
                r"脏话",
                r"粗俗.*语言",
                r"侮辱.*性"
            ]
        }
    
    def detect(self, text: str) -> ToxicityResult:
        """检测毒性"""
        toxicity_scores = {}
        flagged_terms = []
        
        for toxicity_type, patterns in self.toxicity_patterns.items():
            score = 0
            for pattern in patterns:
                matches = re.findall(pattern, text, re.IGNORECASE)
                if matches:
                    score += len(matches) * 0.3
                    flagged_terms.extend(matches)
            
            toxicity_scores[toxicity_type.value] = min(score, 1.0)
        
        # 计算总体毒性分数
        toxicity_score = max(toxicity_scores.values()) if toxicity_scores else 0
        
        # 确定是否有毒
        is_toxic = toxicity_score > 0.5
        
        # 生成毒性类型详情
        toxicity_types = [
            {"type": t.value, "score": s}
            for t, s in zip(ToxicityType, toxicity_scores.values())
            if s > 0.3
        ]
        
        return ToxicityResult(
            text=text,
            is_toxic=is_toxic,
            toxicity_score=toxicity_score,
            toxicity_types=toxicity_types,
            flagged_terms=list(set(flagged_terms))
        )

2. 基于模型的检测

class ModelBasedToxicityDetector:
    """基于模型的毒性检测器"""
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def detect(self, text: str) -> Dict:
        """检测毒性"""
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            probs = torch.softmax(logits, dim=-1)
        
        # 获取毒性概率
        toxic_prob = probs[0][1].item()  # 假设[0]=非毒性,[1]=毒性
        
        return {
            "text": text,
            "toxicity_score": toxic_prob,
            "is_toxic": toxic_prob > 0.5,
            "confidence": max(probs[0].tolist()),
            "prediction": "toxic" if toxic_prob > 0.5 else "non-toxic"
        }
    
    def detect_batch(self, texts: List[str]) -> List[Dict]:
        """批量检测"""
        results = []
        for text in texts:
            result = self.detect(text)
            results.append(result)
        return results

3. 上下文感知检测

class ContextAwareToxicityDetector:
    """上下文感知毒性检测器"""
    
    def __init__(self):
        self.conversation_history = []
    
    def detect_with_context(self, text: str, context: List[str] = None) -> Dict:
        """带上下文的毒性检测"""
        if context:
            self.conversation_history = context[-10:]  # 保留最近10条
        
        # 基础毒性检测
        base_detector = RuleBasedToxicityDetector()
        base_result = base_detector.detect(text)
        
        # 上下文分析
        context_score = self._analyze_context(text)
        
        # 综合评分
        combined_score = (base_result.toxicity_score * 0.7 + context_score * 0.3)
        
        return {
            "text": text,
            "toxicity_score": combined_score,
            "is_toxic": combined_score > 0.5,
            "base_score": base_result.toxicity_score,
            "context_score": context_score,
            "context_analysis": self._get_context_analysis(text)
        }
    
    def _analyze_context(self, text: str) -> float:
        """分析上下文"""
        # 检查是否在对抗性对话中
        if self.conversation_history:
            recent_texts = " ".join(self.conversation_history[-3:])
            if any(word in recent_texts for word in ["争论", "吵架", "冲突"]):
                return 0.3
        
        return 0.0
    
    def _get_context_analysis(self, text: str) -> Dict:
        """获取上下文分析"""
        return {
            "conversation_length": len(self.conversation_history),
            "recent_context": self.conversation_history[-3:] if self.conversation_history else []
        }

内容过滤

1. 输出过滤器

class OutputFilter:
    """输出过滤器"""
    
    def __init__(self):
        self.detector = RuleBasedToxicityDetector()
        self.filter_strategies = {
            "block": self._block_output,
            "replace": self._replace_toxic_content,
            "refuse": self._refuse_generation
        }
    
    def filter_output(self, text: str, strategy: str = "block") -> Dict:
        """过滤输出"""
        detection_result = self.detector.detect(text)
        
        if not detection_result.is_toxic:
            return {
                "filtered": False,
                "text": text,
                "reason": "无毒性内容"
            }
        
        filter_func = self.filter_strategies.get(strategy, self._block_output)
        filtered_text = filter_func(text, detection_result)
        
        return {
            "filtered": True,
            "original_text": text,
            "filtered_text": filtered_text,
            "toxicity_score": detection_result.toxicity_score,
            "strategy": strategy
        }
    
    def _block_output(self, text: str, result: ToxicityResult) -> str:
        """阻止输出"""
        return "[内容被过滤:检测到有毒内容]"
    
    def _replace_toxic_content(self, text: str, result: ToxicityResult) -> str:
        """替换有毒内容"""
        filtered_text = text
        for term in result.flagged_terms:
            filtered_text = filtered_text.replace(term, "***")
        return filtered_text
    
    def _refuse_generation(self, text: str, result: ToxicityResult) -> str:
        """拒绝生成"""
        return "抱歉,我无法生成包含有害内容的响应。"

2. 实时监控

class ToxicityMonitor:
    """毒性监控"""
    
    def __init__(self):
        self.alert_threshold = 0.7
        self.monitoring_log = []
    
    def monitor_generation(self, text: str, metadata: Dict = None) -> Dict:
        """监控生成内容"""
        detector = RuleBasedToxicityDetector()
        result = detector.detect(text)
        
        # 记录监控结果
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "text": text[:100],  # 只记录前100字符
            "toxicity_score": result.toxicity_score,
            "is_toxic": result.is_toxic,
            "metadata": metadata or {}
        }
        self.monitoring_log.append(log_entry)
        
        # 检查是否需要告警
        if result.toxicity_score > self.alert_threshold:
            self._trigger_alert(result)
        
        return {
            "monitored": True,
            "toxicity_score": result.toxicity_score,
            "is_toxic": result.is_toxic,
            "requires_review": result.toxicity_score > self.alert_threshold
        }
    
    def _trigger_alert(self, result: ToxicityResult):
        """触发告警"""
        print(f"警告:检测到高毒性内容 (分数: {result.toxicity_score:.2f})")
    
    def get_monitoring_report(self) -> Dict:
        """获取监控报告"""
        total = len(self.monitoring_log)
        toxic_count = sum(1 for log in self.monitoring_log if log["is_toxic"])
        
        return {
            "total_monitored": total,
            "toxic_count": toxic_count,
            "toxicity_rate": toxic_count / total if total > 0 else 0,
            "average_toxicity_score": np.mean([log["toxicity_score"] for log in self.monitoring_log]) if self.monitoring_log else 0
        }

评估工具

class ToxicityEvaluator:
    """毒性评估器"""
    
    @staticmethod
    def evaluate_detector(detector, test_data: List[Dict]) -> Dict:
        """评估检测器"""
        true_positives = 0
        false_positives = 0
        true_negatives = 0
        false_negatives = 0
        
        for item in test_data:
            text = item["text"]
            expected = item["is_toxic"]
            
            result = detector.detect(text)
            predicted = result.is_toxic
            
            if expected and predicted:
                true_positives += 1
            elif not expected and predicted:
                false_positives += 1
            elif not expected and not predicted:
                true_negatives += 1
            else:
                false_negatives += 1
        
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "accuracy": (true_positives + true_negatives) / len(test_data),
            "confusion_matrix": {
                "true_positives": true_positives,
                "false_positives": false_positives,
                "true_negatives": true_negatives,
                "false_negatives": false_negatives
            }
        }

最佳实践

  1. 多层防护:使用多种检测方法组合
  2. 上下文感知:考虑对话上下文进行检测
  3. 持续更新:定期更新毒性词汇和模式
  4. 用户反馈:利用用户反馈改进检测

总结

毒性检测和过滤是确保LLM输出安全的关键环节。通过多层防护和持续监控,可以有效减少有害内容的生成。