← 返回首页
🧠

垃圾邮件检测:过滤垃圾内容

📂 llm ⏱ 4 min 673 words

--- title: "垃圾邮件检测:过滤垃圾内容" description: "使用LLM检测和过滤垃圾邮件、垃圾信息" tags: ["垃圾邮件", "垃圾检测", "内容过滤", "LLM", "安全"] category: "llm" icon: "📧"

垃圾邮件检测:过滤垃圾内容

垃圾邮件概述

垃圾邮件检测是识别和过滤不需要的、批量发送的或恶意的内容。

检测方法

1. 基于规则的检测

import re
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class SpamDetectionResult:
    """垃圾邮件检测结果"""
    text: str
    is_spam: bool
    spam_score: float
    spam_type: str
    flagged_features: List[str]

class RuleBasedSpamDetector:
    """基于规则的垃圾邮件检测器"""
    
    def __init__(self):
        self.spam_patterns = self._load_patterns()
        self.spam_keywords = self._load_keywords()
    
    def _load_patterns(self) -> Dict[str, List[str]]:
        """加载垃圾邮件模式"""
        return {
            "urgency": [
                r"立即",
                r"马上",
                r"限时",
                r"最后机会"
            ],
            "financial": [
                r"赚钱",
                r"投资",
                r"回报",
                r"收益"
            ],
            "scam": [
                r"中奖",
                r"免费",
                r"优惠",
                r"赠送"
            ],
            "phishing": [
                r"验证账户",
                r"点击链接",
                r"确认信息"
            ]
        }
    
    def _load_keywords(self) -> List[str]:
        """加载垃圾邮件关键词"""
        return [
            "免费", "中奖", "优惠", "赚钱", "投资",
            "贷款", "发票", "代开", "推广", "广告"
        ]
    
    def detect(self, text: str) -> SpamDetectionResult:
        """检测垃圾邮件"""
        flagged_features = []
        spam_type = "unknown"
        
        # 检查模式
        for category, patterns in self.spam_patterns.items():
            for pattern in patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    flagged_features.append(f"{category}:{pattern}")
                    spam_type = category
        
        # 检查关键词
        keyword_matches = [kw for kw in self.spam_keywords if kw in text]
        flagged_features.extend([f"keyword:{kw}" for kw in keyword_matches])
        
        # 计算垃圾邮件分数
        spam_score = min(len(flagged_features) * 0.15, 1.0)
        
        return SpamDetectionResult(
            text=text,
            is_spam=spam_score > 0.5,
            spam_score=spam_score,
            spam_type=spam_type,
            flagged_features=flagged_features
        )

2. 基于机器学习的检测

class MLSpanDetector:
    """基于机器学习的垃圾邮件检测器"""
    
    def __init__(self, model=None):
        self.model = model
        self.feature_extractor = self._create_feature_extractor()
    
    def _create_feature_extractor(self):
        """创建特征提取器"""
        def extract_features(text: str) -> Dict:
            return {
                "length": len(text),
                "word_count": len(text.split()),
                "exclamation_count": text.count("!"),
                "question_count": text.count("?"),
                "uppercase_ratio": sum(1 for c in text if c.isupper()) / max(len(text), 1),
                "url_count": len(re.findall(r'http[s]?://', text)),
                "email_count": len(re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', text))
            }
        return extract_features
    
    def detect(self, text: str) -> Dict:
        """检测垃圾邮件"""
        features = self.feature_extractor(text)
        
        # 简化的规则-based分类
        score = 0
        
        if features["length"] < 10:
            score += 0.2
        if features["exclamation_count"] > 3:
            score += 0.3
        if features["uppercase_ratio"] > 0.5:
            score += 0.2
        if features["url_count"] > 2:
            score += 0.3
        if features["email_count"] > 1:
            score += 0.2
        
        return {
            "text": text,
            "is_spam": score > 0.5,
            "spam_score": min(score, 1.0),
            "features": features
        }

3. 深度学习检测

class DeepLearningSpamDetector:
    """深度学习垃圾邮件检测器"""
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def detect(self, text: str) -> Dict:
        """检测垃圾邮件"""
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            probs = torch.softmax(logits, dim=-1)
        
        # 假设[0]=正常, [1]=垃圾邮件
        spam_prob = probs[0][1].item()
        
        return {
            "text": text,
            "is_spam": spam_prob > 0.5,
            "spam_probability": spam_prob,
            "confidence": max(probs[0].tolist())
        }

过滤策略

1. 内容过滤器

class SpamContentFilter:
    """垃圾邮件内容过滤器"""
    
    def __init__(self):
        self.detector = RuleBasedSpamDetector()
        self.filter_actions = {
            "block": self._block_content,
            "quarantine": self._quarantine_content,
            "mark": self._mark_content,
            "redirect": self._redirect_content
        }
    
    def filter_content(self, text: str, action: str = "mark") -> Dict:
        """过滤内容"""
        detection = self.detector.detect(text)
        
        if not detection.is_spam:
            return {
                "filtered": False,
                "text": text,
                "reason": "内容正常"
            }
        
        filter_func = self.filter_actions.get(action, self._mark_content)
        result = filter_func(text, detection)
        
        return {
            "filtered": True,
            "original_text": text,
            "action": action,
            "spam_score": detection.spam_score,
            "spam_type": detection.spam_type,
            **result
        }
    
    def _block_content(self, text: str, detection: SpamDetectionResult) -> Dict:
        """阻止内容"""
        return {"filtered_text": "", "message": "垃圾邮件已阻止"}
    
    def _quarantine_content(self, text: str, detection: SpamDetectionResult) -> Dict:
        """隔离内容"""
        return {"filtered_text": text, "quarantined": True, "message": "内容已隔离"}
    
    def _mark_content(self, text: str, detection: SpamDetectionResult) -> Dict:
        """标记内容"""
        marked_text = f"[垃圾邮件 - 置信度: {detection.spam_score:.2f}]\n{text}"
        return {"filtered_text": marked_text, "message": "内容已标记"}
    
    def _redirect_content(self, text: str, detection: SpamDetectionResult) -> Dict:
        """重定向内容"""
        return {"filtered_text": text, "redirect_to": "spam_folder", "message": "已移至垃圾邮件文件夹"}

2. 批量过滤

class BatchSpamFilter:
    """批量垃圾邮件过滤器"""
    
    def __init__(self):
        self.detector = RuleBasedSpamDetector()
        self.batch_size = 100
    
    def filter_batch(self, texts: List[str]) -> Dict:
        """批量过滤"""
        results = []
        spam_count = 0
        
        for text in texts:
            detection = self.detector.detect(text)
            results.append({
                "text": text,
                "is_spam": detection.is_spam,
                "spam_score": detection.spam_score
            })
            
            if detection.is_spam:
                spam_count += 1
        
        return {
            "total": len(texts),
            "spam_count": spam_count,
            "normal_count": len(texts) - spam_count,
            "spam_rate": spam_count / len(texts) if texts else 0,
            "results": results
        }

监控和报告

class SpamMonitor:
    """垃圾邮件监控"""
    
    def __init__(self):
        self.detection_log = []
        self.metrics = {
            "total_checks": 0,
            "spam_detected": 0,
            "by_type": {},
            "by_hour": {}
        }
    
    def log_detection(self, text: str, result: Dict):
        """记录检测"""
        self.detection_log.append({
            "timestamp": datetime.now().isoformat(),
            "text_preview": text[:100],
            "is_spam": result.get("is_spam", False),
            "spam_type": result.get("spam_type", "unknown"),
            "spam_score": result.get("spam_score", 0)
        })
        
        self.metrics["total_checks"] += 1
        
        if result.get("is_spam", False):
            self.metrics["spam_detected"] += 1
            
            spam_type = result.get("spam_type", "unknown")
            self.metrics["by_type"][spam_type] = \
                self.metrics["by_type"].get(spam_type, 0) + 1
            
            hour = datetime.now().hour
            self.metrics["by_hour"][hour] = self.metrics["by_hour"].get(hour, 0) + 1
    
    def get_statistics(self) -> Dict:
        """获取统计"""
        total = self.metrics["total_checks"]
        
        return {
            **self.metrics,
            "spam_rate": self.metrics["spam_detected"] / total if total > 0 else 0,
            "peak_hour": max(self.metrics["by_hour"], key=self.metrics["by_hour"].get) if self.metrics["by_hour"] else None
        }
    
    def generate_report(self) -> str:
        """生成报告"""
        stats = self.get_statistics()
        
        report = f"""
垃圾邮件检测报告
{'='*50}

总检测数: {stats['total_checks']}
垃圾邮件数: {stats['spam_detected']}
垃圾邮件率: {stats['spam_rate']*100:.1f}%

按类型分布:
"""
        for spam_type, count in stats["by_type"].items():
            report += f"  {spam_type}: {count}\n"
        
        if stats["peak_hour"]:
            report += f"\n高峰时段: {stats['peak_hour']}:00\n"
        
        return report

最佳实践

  1. 多层检测:结合规则和机器学习检测
  2. 持续更新:定期更新垃圾邮件模式
  3. 用户反馈:利用用户反馈改进检测
  4. 性能优化:优化检测性能

总结

垃圾邮件检测是保护用户免受不需要内容干扰的重要环节。通过多层检测和过滤机制,可以有效识别和处理垃圾内容。