垃圾邮件检测:过滤垃圾内容
--- title: "垃圾邮件检测:过滤垃圾内容" description: "使用LLM检测和过滤垃圾邮件、垃圾信息" tags: ["垃圾邮件", "垃圾检测", "内容过滤", "LLM", "安全"] category: "llm" icon: "📧"
垃圾邮件检测:过滤垃圾内容
垃圾邮件概述
垃圾邮件检测是识别和过滤不需要的、批量发送的或恶意的内容。
检测方法
1. 基于规则的检测
import re
from typing import Dict, List
from dataclasses import dataclass
@dataclass
class SpamDetectionResult:
"""垃圾邮件检测结果"""
text: str
is_spam: bool
spam_score: float
spam_type: str
flagged_features: List[str]
class RuleBasedSpamDetector:
"""基于规则的垃圾邮件检测器"""
def __init__(self):
self.spam_patterns = self._load_patterns()
self.spam_keywords = self._load_keywords()
def _load_patterns(self) -> Dict[str, List[str]]:
"""加载垃圾邮件模式"""
return {
"urgency": [
r"立即",
r"马上",
r"限时",
r"最后机会"
],
"financial": [
r"赚钱",
r"投资",
r"回报",
r"收益"
],
"scam": [
r"中奖",
r"免费",
r"优惠",
r"赠送"
],
"phishing": [
r"验证账户",
r"点击链接",
r"确认信息"
]
}
def _load_keywords(self) -> List[str]:
"""加载垃圾邮件关键词"""
return [
"免费", "中奖", "优惠", "赚钱", "投资",
"贷款", "发票", "代开", "推广", "广告"
]
def detect(self, text: str) -> SpamDetectionResult:
"""检测垃圾邮件"""
flagged_features = []
spam_type = "unknown"
# 检查模式
for category, patterns in self.spam_patterns.items():
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
flagged_features.append(f"{category}:{pattern}")
spam_type = category
# 检查关键词
keyword_matches = [kw for kw in self.spam_keywords if kw in text]
flagged_features.extend([f"keyword:{kw}" for kw in keyword_matches])
# 计算垃圾邮件分数
spam_score = min(len(flagged_features) * 0.15, 1.0)
return SpamDetectionResult(
text=text,
is_spam=spam_score > 0.5,
spam_score=spam_score,
spam_type=spam_type,
flagged_features=flagged_features
)
2. 基于机器学习的检测
class MLSpanDetector:
"""基于机器学习的垃圾邮件检测器"""
def __init__(self, model=None):
self.model = model
self.feature_extractor = self._create_feature_extractor()
def _create_feature_extractor(self):
"""创建特征提取器"""
def extract_features(text: str) -> Dict:
return {
"length": len(text),
"word_count": len(text.split()),
"exclamation_count": text.count("!"),
"question_count": text.count("?"),
"uppercase_ratio": sum(1 for c in text if c.isupper()) / max(len(text), 1),
"url_count": len(re.findall(r'http[s]?://', text)),
"email_count": len(re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', text))
}
return extract_features
def detect(self, text: str) -> Dict:
"""检测垃圾邮件"""
features = self.feature_extractor(text)
# 简化的规则-based分类
score = 0
if features["length"] < 10:
score += 0.2
if features["exclamation_count"] > 3:
score += 0.3
if features["uppercase_ratio"] > 0.5:
score += 0.2
if features["url_count"] > 2:
score += 0.3
if features["email_count"] > 1:
score += 0.2
return {
"text": text,
"is_spam": score > 0.5,
"spam_score": min(score, 1.0),
"features": features
}
3. 深度学习检测
class DeepLearningSpamDetector:
"""深度学习垃圾邮件检测器"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def detect(self, text: str) -> Dict:
"""检测垃圾邮件"""
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1)
# 假设[0]=正常, [1]=垃圾邮件
spam_prob = probs[0][1].item()
return {
"text": text,
"is_spam": spam_prob > 0.5,
"spam_probability": spam_prob,
"confidence": max(probs[0].tolist())
}
过滤策略
1. 内容过滤器
class SpamContentFilter:
"""垃圾邮件内容过滤器"""
def __init__(self):
self.detector = RuleBasedSpamDetector()
self.filter_actions = {
"block": self._block_content,
"quarantine": self._quarantine_content,
"mark": self._mark_content,
"redirect": self._redirect_content
}
def filter_content(self, text: str, action: str = "mark") -> Dict:
"""过滤内容"""
detection = self.detector.detect(text)
if not detection.is_spam:
return {
"filtered": False,
"text": text,
"reason": "内容正常"
}
filter_func = self.filter_actions.get(action, self._mark_content)
result = filter_func(text, detection)
return {
"filtered": True,
"original_text": text,
"action": action,
"spam_score": detection.spam_score,
"spam_type": detection.spam_type,
**result
}
def _block_content(self, text: str, detection: SpamDetectionResult) -> Dict:
"""阻止内容"""
return {"filtered_text": "", "message": "垃圾邮件已阻止"}
def _quarantine_content(self, text: str, detection: SpamDetectionResult) -> Dict:
"""隔离内容"""
return {"filtered_text": text, "quarantined": True, "message": "内容已隔离"}
def _mark_content(self, text: str, detection: SpamDetectionResult) -> Dict:
"""标记内容"""
marked_text = f"[垃圾邮件 - 置信度: {detection.spam_score:.2f}]\n{text}"
return {"filtered_text": marked_text, "message": "内容已标记"}
def _redirect_content(self, text: str, detection: SpamDetectionResult) -> Dict:
"""重定向内容"""
return {"filtered_text": text, "redirect_to": "spam_folder", "message": "已移至垃圾邮件文件夹"}
2. 批量过滤
class BatchSpamFilter:
"""批量垃圾邮件过滤器"""
def __init__(self):
self.detector = RuleBasedSpamDetector()
self.batch_size = 100
def filter_batch(self, texts: List[str]) -> Dict:
"""批量过滤"""
results = []
spam_count = 0
for text in texts:
detection = self.detector.detect(text)
results.append({
"text": text,
"is_spam": detection.is_spam,
"spam_score": detection.spam_score
})
if detection.is_spam:
spam_count += 1
return {
"total": len(texts),
"spam_count": spam_count,
"normal_count": len(texts) - spam_count,
"spam_rate": spam_count / len(texts) if texts else 0,
"results": results
}
监控和报告
class SpamMonitor:
"""垃圾邮件监控"""
def __init__(self):
self.detection_log = []
self.metrics = {
"total_checks": 0,
"spam_detected": 0,
"by_type": {},
"by_hour": {}
}
def log_detection(self, text: str, result: Dict):
"""记录检测"""
self.detection_log.append({
"timestamp": datetime.now().isoformat(),
"text_preview": text[:100],
"is_spam": result.get("is_spam", False),
"spam_type": result.get("spam_type", "unknown"),
"spam_score": result.get("spam_score", 0)
})
self.metrics["total_checks"] += 1
if result.get("is_spam", False):
self.metrics["spam_detected"] += 1
spam_type = result.get("spam_type", "unknown")
self.metrics["by_type"][spam_type] = \
self.metrics["by_type"].get(spam_type, 0) + 1
hour = datetime.now().hour
self.metrics["by_hour"][hour] = self.metrics["by_hour"].get(hour, 0) + 1
def get_statistics(self) -> Dict:
"""获取统计"""
total = self.metrics["total_checks"]
return {
**self.metrics,
"spam_rate": self.metrics["spam_detected"] / total if total > 0 else 0,
"peak_hour": max(self.metrics["by_hour"], key=self.metrics["by_hour"].get) if self.metrics["by_hour"] else None
}
def generate_report(self) -> str:
"""生成报告"""
stats = self.get_statistics()
report = f"""
垃圾邮件检测报告
{'='*50}
总检测数: {stats['total_checks']}
垃圾邮件数: {stats['spam_detected']}
垃圾邮件率: {stats['spam_rate']*100:.1f}%
按类型分布:
"""
for spam_type, count in stats["by_type"].items():
report += f" {spam_type}: {count}\n"
if stats["peak_hour"]:
report += f"\n高峰时段: {stats['peak_hour']}:00\n"
return report
最佳实践
- 多层检测:结合规则和机器学习检测
- 持续更新:定期更新垃圾邮件模式
- 用户反馈:利用用户反馈改进检测
- 性能优化:优化检测性能
总结
垃圾邮件检测是保护用户免受不需要内容干扰的重要环节。通过多层检测和过滤机制,可以有效识别和处理垃圾内容。