← 返回首页
🧠

NSFW检测:过滤不当内容

📂 llm ⏱ 4 min 656 words

--- title: "NSFW检测:过滤不当内容" description: "检测和过滤LLM生成的不适合工作场合的内容" tags: ["NSFW", "不当内容", "内容过滤", "LLM", "安全"] category: "llm" icon: "🔞"

NSFW检测:过滤不当内容

NSFW概述

NSFW(Not Safe For Work)检测是识别和过滤不适合工作场合或公开场合查看的内容。

检测方法

1. 基于规则的检测

import re
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class NSFWDetectionResult:
    """NSFW检测结果"""
    text: str
    is_nsfw: bool
    confidence: float
    categories: List[str]
    flagged_terms: List[str]

class RuleBasedNSFWDetector:
    """基于规则的NSFW检测器"""
    
    def __init__(self):
        self.nsfw_patterns = self._load_patterns()
    
    def _load_patterns(self) -> Dict[str, List[str]]:
        """加载NSFW模式"""
        return {
            "sexual": [
                r"色情",
                r"性行为",
                r"裸体",
                r"淫秽"
            ],
            "violence": [
                r"暴力",
                r"血腥",
                r"伤害",
                r"死亡"
            ],
            "drugs": [
                r"毒品",
                r"吸毒",
                r"贩毒",
                r"大麻"
            ],
            "gambling": [
                r"赌博",
                r"赌局",
                r"赌场",
                r"博彩"
            ]
        }
    
    def detect(self, text: str) -> NSFWDetectionResult:
        """检测NSFW内容"""
        flagged_terms = []
        categories = []
        
        for category, patterns in self.nsfw_patterns.items():
            category_terms = []
            for pattern in patterns:
                matches = re.findall(pattern, text, re.IGNORECASE)
                if matches:
                    category_terms.extend(matches)
            
            if category_terms:
                categories.append(category)
                flagged_terms.extend(category_terms)
        
        # 计算置信度
        confidence = min(len(flagged_terms) * 0.2, 1.0)
        
        return NSFWDetectionResult(
            text=text,
            is_nsfw=len(flagged_terms) > 0,
            confidence=confidence,
            categories=categories,
            flagged_terms=list(set(flagged_terms))
        )

2. 基于模型的检测

class ModelBasedNSFWDetector:
    """基于模型的NSFW检测器"""
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def detect(self, text: str) -> Dict:
        """检测NSFW内容"""
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            probs = torch.softmax(logits, dim=-1)
        
        # 假设[0]=安全, [1]=NSFW
        nsfw_prob = probs[0][1].item()
        
        return {
            "text": text,
            "is_nsfw": nsfw_prob > 0.5,
            "confidence": max(probs[0].tolist()),
            "nsfw_probability": nsfw_prob,
            "safe_probability": 1 - nsfw_prob
        }

3. 多标签检测

class MultiLabelNSFWDetector:
    """多标签NSFW检测器"""
    
    def __init__(self):
        self.label_patterns = {
            "explicit_sexual": [r"性行为", r"裸体", r"色情"],
            "graphic_violence": [r"暴力", r"血腥", r"酷刑"],
            "drug_use": [r"吸毒", r"药物滥用"],
            "gambling": [r"赌博", r"博彩"],
            "hate_speech": [r"仇恨", r"歧视", r"种族主义"],
            "self_harm": [r"自残", r"自杀", r"自我伤害"]
        }
    
    def detect_labels(self, text: str) -> Dict:
        """检测多个标签"""
        detected_labels = {}
        
        for label, patterns in self.label_patterns.items():
            label_score = 0
            for pattern in patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    label_score += 0.3
            
            if label_score > 0:
                detected_labels[label] = min(label_score, 1.0)
        
        return {
            "text": text,
            "detected_labels": detected_labels,
            "is_nsfw": len(detected_labels) > 0,
            "max_score": max(detected_labels.values()) if detected_labels else 0,
            "nsfw_categories": list(detected_labels.keys())
        }

过滤策略

1. 内容过滤器

class NSFWContentFilter:
    """NSFW内容过滤器"""
    
    def __init__(self):
        self.detector = RuleBasedNSFWDetector()
        self.filter_strategies = {
            "block": self._block_content,
            "blur": self._blur_content,
            "censor": self._censor_content,
            "refuse": self._refuse_content
        }
    
    def filter_content(self, text: str, strategy: str = "censor") -> Dict:
        """过滤内容"""
        detection = self.detector.detect(text)
        
        if not detection.is_nsfw:
            return {
                "filtered": False,
                "text": text,
                "reason": "内容安全"
            }
        
        filter_func = self.filter_strategies.get(strategy, self._censor_content)
        filtered_text = filter_func(text, detection)
        
        return {
            "filtered": True,
            "original_text": text,
            "filtered_text": filtered_text,
            "nsfw_categories": detection.categories,
            "flagged_terms": detection.flagged_terms,
            "strategy": strategy
        }
    
    def _block_content(self, text: str, detection: NSFWDetectionResult) -> str:
        """阻止内容"""
        return "[内容不适合显示]"
    
    def _blur_content(self, text: str, detection: NSFWDetectionResult) -> str:
        """模糊内容"""
        blurred = text
        for term in detection.flagged_terms:
            blurred = blurred.replace(term, "*" * len(term))
        return blurred
    
    def _censor_content(self, text: str, detection: NSFWDetectionResult) -> str:
        """审查内容"""
        censored = text
        for term in detection.flagged_terms:
            # 保留首尾字符,中间用*替代
            if len(term) > 2:
                censored_term = term[0] + "*" * (len(term) - 2) + term[-1]
            else:
                censored_term = "**"
            censored = censored.replace(term, censored_term)
        return censored
    
    def _refuse_content(self, text: str, detection: NSFWDetectionResult) -> str:
        """拒绝内容"""
        return "抱歉,我无法生成或处理包含不当内容的文本。"

2. 实时过滤

class RealTimeNSFWFilter:
    """实时NSFW过滤器"""
    
    def __init__(self):
        self.detector = RuleBasedNSFWDetector()
        self缓冲区 = ""
        self过滤队列 = []
    
    def process_token(self, token: str) -> str:
        """处理token"""
        self.缓冲区 += token
        
        # 检查是否需要过滤
        detection = self.detector.detect(self.缓冲区)
        
        if detection.is_nsfw:
            # 找到不安全的部分并过滤
            filtered = self._filter_buffer(self.缓冲区, detection)
            self.缓冲区 = ""
            return filtered
        
        # 检查是否可以安全输出
        if self._is_safe_to_output(self.缓冲区):
            output = self.缓冲区
            self.缓冲区 = ""
            return output
        
        return ""
    
    def _filter_buffer(self, buffer: str, detection) -> str:
        """过滤缓冲区"""
        filtered = buffer
        for term in detection.flagged_terms:
            filtered = filtered.replace(term, "[NSFW]")
        return filtered
    
    def _is_safe_to_output(self, text: str) -> bool:
        """检查是否可以安全输出"""
        # 简化实现
        return True
    
    def flush(self) -> str:
        """清空缓冲区"""
        output = self.缓冲区
        self.缓冲区 = ""
        return output

配置和监控

class NSFWFilterConfiguration:
    """NSFW过滤配置"""
    
    def __init__(self):
        self.config = {
            "enabled": True,
            "strict_mode": False,
            "default_strategy": "censor",
            "threshold": 0.5,
            "categories": ["sexual", "violence", "drugs", "gambling"],
            "custom_patterns": []
        }
    
    def update_config(self, key: str, value):
        """更新配置"""
        self.config[key] = value
    
    def get_config(self) -> Dict:
        """获取配置"""
        return self.config.copy()

class NSFWMonitor:
    """NSFW监控"""
    
    def __init__(self):
        self.detection_log = []
        self.metrics = {
            "total_checks": 0,
            "nsfw_detected": 0,
            "by_category": {}
        }
    
    def log_detection(self, text: str, result: Dict):
        """记录检测"""
        self.detection_log.append({
            "timestamp": datetime.now().isoformat(),
            "text_preview": text[:100],
            "is_nsfw": result.get("is_nsfw", False),
            "categories": result.get("categories", [])
        })
        
        self.metrics["total_checks"] += 1
        
        if result.get("is_nsfw", False):
            self.metrics["nsfw_detected"] += 1
            
            for category in result.get("categories", []):
                self.metrics["by_category"][category] = \
                    self.metrics["by_category"].get(category, 0) + 1
    
    def get_statistics(self) -> Dict:
        """获取统计"""
        total = self.metrics["total_checks"]
        
        return {
            **self.metrics,
            "nsfw_rate": self.metrics["nsfw_detected"] / total if total > 0 else 0
        }

最佳实践

  1. 多层检测:结合规则和模型检测
  2. 可配置策略:提供多种过滤策略
  3. 持续更新:定期更新检测模式
  4. 用户反馈:利用用户反馈改进检测

总结

NSFW检测是确保LLM输出内容安全的重要环节。通过多层检测和过滤机制,可以有效防止不当内容的生成和传播。