欺诈检测:识别LLM欺诈行为
--- title: "欺诈检测:识别LLM欺诈行为" description: "检测和防范利用LLM进行的欺诈行为" tags: ["欺诈检测", "反欺诈", "LLM安全", "风险控制", "安全"] category: "llm" icon: "🚨"
欺诈检测:识别LLM欺诈行为
欺诈概述
欺诈检测是识别和防范利用LLM进行的各种欺诈行为,保护用户和系统安全。
欺诈类型
1. 欺诈分类
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List
import re
class FraudType(Enum):
PHISHING = "phishing" # 钓鱼欺诈
SOCIAL_ENGINEERING = "social_engineering" # 社会工程
IDENTITY_THEFT = "identity_theft" # 身份盗窃
FINANCIAL_FRAUD = "financial_fraud" # 金融欺诈
CONTENT_FRAUD = "content_fraud" # 内容欺诈
MANIPULATION = "manipulation" # 操纵欺诈
@dataclass
class FraudIndicator:
"""欺诈指标"""
fraud_type: FraudType
description: str
patterns: List[str]
severity: str # "low", "medium", "high"
class FraudClassifier:
"""欺诈分类器"""
def __init__(self):
self.fraud_indicators = self._load_indicators()
def _load_indicators(self) -> List[FraudIndicator]:
"""加载欺诈指标"""
return [
FraudIndicator(
fraud_type=FraudType.PHISHING,
description="钓鱼欺诈",
patterns=[
r"验证.*账户",
r"点击.*链接",
r"确认.*信息",
r"紧急.*通知"
],
severity="high"
),
FraudIndicator(
fraud_type=FraudType.SOCIAL_ENGINEERING,
description="社会工程",
patterns=[
r"假装.*身份",
r"冒充.*客服",
r"紧急.*帮助",
r"机密.*信息"
],
severity="high"
),
FraudIndicator(
fraud_type=FraudType.FINANCIAL_FRAUD,
description="金融欺诈",
patterns=[
r"投资.*回报",
r"保证.*收益",
r"快速.*致富",
r"免费.*赚钱"
],
severity="critical"
),
FraudIndicator(
fraud_type=FraudType.CONTENT_FRAUD,
description="内容欺诈",
patterns=[
r"虚假.*信息",
r"误导.*内容",
r"伪造.*证据",
r"假新闻"
],
severity="medium"
),
FraudIndicator(
fraud_type=FraudType.MANIPULATION,
description="操纵欺诈",
patterns=[
r"限时.*优惠",
r"最后.*机会",
r"不容.*错过",
r"立即.*行动"
],
severity="medium"
)
]
def classify(self, text: str) -> Dict:
"""分类欺诈"""
detected_indicators = []
for indicator in self.fraud_indicators:
for pattern in indicator.patterns:
if re.search(pattern, text, re.IGNORECASE):
detected_indicators.append({
"type": indicator.fraud_type.value,
"description": indicator.description,
"pattern": pattern,
"severity": indicator.severity
})
# 计算欺诈分数
severity_weights = {"low": 0.1, "medium": 0.3, "high": 0.6, "critical": 1.0}
fraud_score = sum(severity_weights.get(ind["severity"], 0.3) for ind in detected_indicators)
fraud_score = min(fraud_score, 1.0)
return {
"text": text,
"is_fraud": len(detected_indicators) > 0,
"fraud_score": fraud_score,
"detected_indicators": detected_indicators,
"primary_fraud_type": detected_indicators[0]["type"] if detected_indicators else None
}
2. 欺诈检测器
class FraudDetector:
"""欺诈检测器"""
def __init__(self):
self.classifier = FraudClassifier()
self.pattern_detector = self._create_pattern_detector()
def _create_pattern_detector(self):
"""创建模式检测器"""
def detect_patterns(text: str) -> Dict:
patterns = {
"urgency": [r"立即", r"马上", r"限时", r"最后"],
"pressure": [r"必须", r"一定", r"肯定", r"保证"],
"secrecy": [r"保密", r"机密", r"私下", r"不要告诉"],
"too_good": [r"免费", r"中奖", r"赠送", r"优惠"]
}
detected = {}
for category, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, text, re.IGNORECASE):
if category not in detected:
detected[category] = []
detected[category].append(pattern)
return detected
return detect_patterns
def detect_fraud(self, text: str) -> Dict:
"""检测欺诈"""
# 分类检测
classification = self.classifier.classify(text)
# 模式检测
patterns = self.pattern_detector(text)
# 综合评估
overall_risk = classification["fraud_score"]
# 基于模式增加风险
if "urgency" in patterns and "pressure" in patterns:
overall_risk = min(overall_risk + 0.2, 1.0)
if "too_good" in patterns:
overall_risk = min(overall_risk + 0.15, 1.0)
return {
**classification,
"detected_patterns": patterns,
"overall_risk": overall_risk,
"recommendation": self._generate_recommendation(overall_risk, classification)
}
def _generate_recommendation(self, risk_score: float, classification: Dict) -> str:
"""生成建议"""
if risk_score > 0.7:
return "高风险:强烈建议阻止该内容"
elif risk_score > 0.4:
return "中风险:建议人工审核"
elif risk_score > 0.2:
return "低风险:可继续但需监控"
else:
return "安全:内容看起来正常"
3. 用户行为分析
class UserBehaviorAnalyzer:
"""用户行为分析器"""
def __init__(self):
self.user_profiles = {}
self.anomaly_threshold = 0.7
def analyze_behavior(self, user_id: str, actions: List[Dict]) -> Dict:
"""分析用户行为"""
# 获取用户历史
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {
"total_actions": 0,
"action_types": {},
"risk_scores": []
}
profile = self.user_profiles[user_id]
# 分析当前行为
risk_score = 0
anomalies = []
for action in actions:
action_type = action.get("type", "unknown")
# 更新profile
profile["total_actions"] += 1
profile["action_types"][action_type] = \
profile["action_types"].get(action_type, 0) + 1
# 检测异常
if self._is_anomalous(action, profile):
risk_score += 0.2
anomalies.append(action)
# 更新风险分数
profile["risk_scores"].append(risk_score)
return {
"user_id": user_id,
"risk_score": risk_score,
"is_suspicious": risk_score > self.anomaly_threshold,
"anomalies": anomalies,
"profile_summary": {
"total_actions": profile["total_actions"],
"action_diversity": len(profile["action_types"]),
"avg_risk": sum(profile["risk_scores"]) / len(profile["risk_scores"]) if profile["risk_scores"] else 0
}
}
def _is_anomalous(self, action: Dict, profile: Dict) -> bool:
"""检查是否异常"""
# 简化实现:检查频率异常
action_type = action.get("type", "unknown")
expected_count = profile["total_actions"] * 0.1 # 期望比例
actual_count = profile["action_types"].get(action_type, 0)
return actual_count > expected_count * 3
防护策略
class FraudPreventionSystem:
"""欺诈防护系统"""
def __init__(self):
self.detector = FraudDetector()
self.behavior_analyzer = UserBehaviorAnalyzer()
self.prevention_rules = []
def add_prevention_rule(self, name: str, rule_func):
"""添加防护规则"""
self.prevention_rules.append({"name": name, "rule": rule_func})
def analyze_request(self, user_id: str, text: str, context: Dict = None) -> Dict:
"""分析请求"""
# 欺诈检测
fraud_result = self.detector.detect_fraud(text)
# 行为分析
actions = context.get("actions", []) if context else []
behavior_result = self.behavior_analyzer.analyze_behavior(user_id, actions)
# 应用防护规则
rule_results = []
for rule in self.prevention_rules:
try:
result = rule["rule"](text, fraud_result, behavior_result)
rule_results.append({
"rule": rule["name"],
"passed": result.get("passed", True),
"message": result.get("message", "")
})
except Exception as e:
rule_results.append({
"rule": rule["name"],
"passed": False,
"message": f"规则执行失败: {str(e)}"
})
# 综合评估
overall_risk = max(fraud_result["fraud_score"], behavior_result["risk_score"])
should_block = overall_risk > 0.7 or any(not r["passed"] for r in rule_results)
return {
"user_id": user_id,
"text": text,
"fraud_detection": fraud_result,
"behavior_analysis": behavior_result,
"rule_results": rule_results,
"overall_risk": overall_risk,
"should_block": should_block,
"recommendation": self._generate_recommendation(overall_risk, should_block)
}
def _generate_recommendation(self, risk_score: float, should_block: bool) -> str:
"""生成建议"""
if should_block:
return "建议阻止该请求"
elif risk_score > 0.5:
return "建议人工审核"
elif risk_score > 0.3:
return "建议监控后续行为"
else:
return "请求看起来正常"
监控和告警
class FraudMonitor:
"""欺诈监控"""
def __init__(self):
self.detection_log = []
self.alert_thresholds = {
"fraud_rate": 0.1,
"high_risk_count": 10
}
self.metrics = {
"total_checks": 0,
"fraud_detected": 0,
"by_type": {},
"blocked_requests": 0
}
def log_detection(self, user_id: str, text: str, result: Dict):
"""记录检测"""
self.detection_log.append({
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"text_preview": text[:100],
"is_fraud": result.get("fraud_detection", {}).get("is_fraud", False),
"risk_score": result.get("overall_risk", 0),
"blocked": result.get("should_block", False)
})
self.metrics["total_checks"] += 1
if result.get("fraud_detection", {}).get("is_fraud", False):
self.metrics["fraud_detected"] += 1
fraud_type = result.get("fraud_detection", {}).get("primary_fraud_type", "unknown")
self.metrics["by_type"][fraud_type] = \
self.metrics["by_type"].get(fraud_type, 0) + 1
if result.get("should_block", False):
self.metrics["blocked_requests"] += 1
def check_alerts(self) -> List[Dict]:
"""检查告警"""
alerts = []
total = self.metrics["total_checks"]
if total > 0:
fraud_rate = self.metrics["fraud_detected"] / total
if fraud_rate > self.alert_thresholds["fraud_rate"]:
alerts.append({
"type": "high_fraud_rate",
"message": f"欺诈率过高: {fraud_rate*100:.1f}%",
"severity": "high"
})
return alerts
def generate_report(self) -> str:
"""生成报告"""
stats = self.metrics
total = stats["total_checks"]
report = f"""
欺诈检测报告
{'='*50}
总检测数: {total}
欺诈检测数: {stats['fraud_detected']}
阻止请求数: {stats['blocked_requests']}
按类型分布:
"""
for fraud_type, count in stats["by_type"].items():
report += f" {fraud_type}: {count}\n"
return report
最佳实践
- 多层防护:结合内容检测和行为分析
- 实时监控:实时监控欺诈行为
- 持续更新:定期更新欺诈模式
- 用户教育:教育用户识别欺诈
总结
欺诈检测是保护用户和系统安全的重要环节。通过多层检测和防护机制,可以有效识别和防范各种欺诈行为。