LLM毒性:检测和过滤有害内容
--- title: "LLM毒性:检测和过滤有害内容" description: "检测和过滤LLM生成的有毒、有害内容,确保输出安全" tags: ["毒性检测", "内容安全", "有害内容", "LLM", "过滤"] category: "llm" icon: "🚫"
LLM毒性:检测和过滤有害内容
毒性概述
毒性检测是识别和过滤LLM生成的有害、冒犯性或不适当内容的技术。
毒性检测方法
1. 基于规则的检测
import re
from typing import List, Dict, Tuple
from dataclasses import dataclass
from enum import Enum
class ToxicityType(Enum):
HATE_SPEECH = "hate_speech"
HARASSMENT = "harassment"
VIOLENCE = "violence"
SELF_HARM = "self_harm"
SEXUAL = "sexual"
PROFANITY = "profanity"
@dataclass
class ToxicityResult:
"""毒性检测结果"""
text: str
is_toxic: bool
toxicity_score: float
toxicity_types: List[Dict[str, float]]
flagged_terms: List[str]
class RuleBasedToxicityDetector:
"""基于规则的毒性检测器"""
def __init__(self):
self.toxicity_patterns = self._load_patterns()
def _load_patterns(self) -> Dict[ToxicityType, List[str]]:
"""加载毒性模式"""
return {
ToxicityType.HATE_SPEECH: [
r"仇恨.*言论",
r"种族.*歧视",
r"性别.*歧视"
],
ToxicityType.HARASSMENT: [
r"骚扰.*他人",
r"威胁.*人身",
r"霸凌"
],
ToxicityType.VIOLENCE: [
r"暴力.*行为",
r"伤害.*他人",
r"杀人"
],
ToxicityType.SELF_HARM: [
r"自杀.*方法",
r"自残.*行为",
r"结束.*生命"
],
ToxicityType.SEXUAL: [
r"色情.*内容",
r"性.*骚扰",
r"不当.*性"
],
ToxicityType.PROFANITY: [
r"脏话",
r"粗俗.*语言",
r"侮辱.*性"
]
}
def detect(self, text: str) -> ToxicityResult:
"""检测毒性"""
toxicity_scores = {}
flagged_terms = []
for toxicity_type, patterns in self.toxicity_patterns.items():
score = 0
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
score += len(matches) * 0.3
flagged_terms.extend(matches)
toxicity_scores[toxicity_type.value] = min(score, 1.0)
# 计算总体毒性分数
toxicity_score = max(toxicity_scores.values()) if toxicity_scores else 0
# 确定是否有毒
is_toxic = toxicity_score > 0.5
# 生成毒性类型详情
toxicity_types = [
{"type": t.value, "score": s}
for t, s in zip(ToxicityType, toxicity_scores.values())
if s > 0.3
]
return ToxicityResult(
text=text,
is_toxic=is_toxic,
toxicity_score=toxicity_score,
toxicity_types=toxicity_types,
flagged_terms=list(set(flagged_terms))
)
2. 基于模型的检测
class ModelBasedToxicityDetector:
"""基于模型的毒性检测器"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def detect(self, text: str) -> Dict:
"""检测毒性"""
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1)
# 获取毒性概率
toxic_prob = probs[0][1].item() # 假设[0]=非毒性,[1]=毒性
return {
"text": text,
"toxicity_score": toxic_prob,
"is_toxic": toxic_prob > 0.5,
"confidence": max(probs[0].tolist()),
"prediction": "toxic" if toxic_prob > 0.5 else "non-toxic"
}
def detect_batch(self, texts: List[str]) -> List[Dict]:
"""批量检测"""
results = []
for text in texts:
result = self.detect(text)
results.append(result)
return results
3. 上下文感知检测
class ContextAwareToxicityDetector:
"""上下文感知毒性检测器"""
def __init__(self):
self.conversation_history = []
def detect_with_context(self, text: str, context: List[str] = None) -> Dict:
"""带上下文的毒性检测"""
if context:
self.conversation_history = context[-10:] # 保留最近10条
# 基础毒性检测
base_detector = RuleBasedToxicityDetector()
base_result = base_detector.detect(text)
# 上下文分析
context_score = self._analyze_context(text)
# 综合评分
combined_score = (base_result.toxicity_score * 0.7 + context_score * 0.3)
return {
"text": text,
"toxicity_score": combined_score,
"is_toxic": combined_score > 0.5,
"base_score": base_result.toxicity_score,
"context_score": context_score,
"context_analysis": self._get_context_analysis(text)
}
def _analyze_context(self, text: str) -> float:
"""分析上下文"""
# 检查是否在对抗性对话中
if self.conversation_history:
recent_texts = " ".join(self.conversation_history[-3:])
if any(word in recent_texts for word in ["争论", "吵架", "冲突"]):
return 0.3
return 0.0
def _get_context_analysis(self, text: str) -> Dict:
"""获取上下文分析"""
return {
"conversation_length": len(self.conversation_history),
"recent_context": self.conversation_history[-3:] if self.conversation_history else []
}
内容过滤
1. 输出过滤器
class OutputFilter:
"""输出过滤器"""
def __init__(self):
self.detector = RuleBasedToxicityDetector()
self.filter_strategies = {
"block": self._block_output,
"replace": self._replace_toxic_content,
"refuse": self._refuse_generation
}
def filter_output(self, text: str, strategy: str = "block") -> Dict:
"""过滤输出"""
detection_result = self.detector.detect(text)
if not detection_result.is_toxic:
return {
"filtered": False,
"text": text,
"reason": "无毒性内容"
}
filter_func = self.filter_strategies.get(strategy, self._block_output)
filtered_text = filter_func(text, detection_result)
return {
"filtered": True,
"original_text": text,
"filtered_text": filtered_text,
"toxicity_score": detection_result.toxicity_score,
"strategy": strategy
}
def _block_output(self, text: str, result: ToxicityResult) -> str:
"""阻止输出"""
return "[内容被过滤:检测到有毒内容]"
def _replace_toxic_content(self, text: str, result: ToxicityResult) -> str:
"""替换有毒内容"""
filtered_text = text
for term in result.flagged_terms:
filtered_text = filtered_text.replace(term, "***")
return filtered_text
def _refuse_generation(self, text: str, result: ToxicityResult) -> str:
"""拒绝生成"""
return "抱歉,我无法生成包含有害内容的响应。"
2. 实时监控
class ToxicityMonitor:
"""毒性监控"""
def __init__(self):
self.alert_threshold = 0.7
self.monitoring_log = []
def monitor_generation(self, text: str, metadata: Dict = None) -> Dict:
"""监控生成内容"""
detector = RuleBasedToxicityDetector()
result = detector.detect(text)
# 记录监控结果
log_entry = {
"timestamp": datetime.now().isoformat(),
"text": text[:100], # 只记录前100字符
"toxicity_score": result.toxicity_score,
"is_toxic": result.is_toxic,
"metadata": metadata or {}
}
self.monitoring_log.append(log_entry)
# 检查是否需要告警
if result.toxicity_score > self.alert_threshold:
self._trigger_alert(result)
return {
"monitored": True,
"toxicity_score": result.toxicity_score,
"is_toxic": result.is_toxic,
"requires_review": result.toxicity_score > self.alert_threshold
}
def _trigger_alert(self, result: ToxicityResult):
"""触发告警"""
print(f"警告:检测到高毒性内容 (分数: {result.toxicity_score:.2f})")
def get_monitoring_report(self) -> Dict:
"""获取监控报告"""
total = len(self.monitoring_log)
toxic_count = sum(1 for log in self.monitoring_log if log["is_toxic"])
return {
"total_monitored": total,
"toxic_count": toxic_count,
"toxicity_rate": toxic_count / total if total > 0 else 0,
"average_toxicity_score": np.mean([log["toxicity_score"] for log in self.monitoring_log]) if self.monitoring_log else 0
}
评估工具
class ToxicityEvaluator:
"""毒性评估器"""
@staticmethod
def evaluate_detector(detector, test_data: List[Dict]) -> Dict:
"""评估检测器"""
true_positives = 0
false_positives = 0
true_negatives = 0
false_negatives = 0
for item in test_data:
text = item["text"]
expected = item["is_toxic"]
result = detector.detect(text)
predicted = result.is_toxic
if expected and predicted:
true_positives += 1
elif not expected and predicted:
false_positives += 1
elif not expected and not predicted:
true_negatives += 1
else:
false_negatives += 1
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return {
"precision": precision,
"recall": recall,
"f1_score": f1,
"accuracy": (true_positives + true_negatives) / len(test_data),
"confusion_matrix": {
"true_positives": true_positives,
"false_positives": false_positives,
"true_negatives": true_negatives,
"false_negatives": false_negatives
}
}
最佳实践
- 多层防护:使用多种检测方法组合
- 上下文感知:考虑对话上下文进行检测
- 持续更新:定期更新毒性词汇和模式
- 用户反馈:利用用户反馈改进检测
总结
毒性检测和过滤是确保LLM输出安全的关键环节。通过多层防护和持续监控,可以有效减少有害内容的生成。