← 返回首页
🧠

欺诈检测:识别LLM欺诈行为

📂 llm ⏱ 5 min 806 words

--- title: "欺诈检测:识别LLM欺诈行为" description: "检测和防范利用LLM进行的欺诈行为" tags: ["欺诈检测", "反欺诈", "LLM安全", "风险控制", "安全"] category: "llm" icon: "🚨"

欺诈检测:识别LLM欺诈行为

欺诈概述

欺诈检测是识别和防范利用LLM进行的各种欺诈行为,保护用户和系统安全。

欺诈类型

1. 欺诈分类

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List
import re

class FraudType(Enum):
    PHISHING = "phishing"           # 钓鱼欺诈
    SOCIAL_ENGINEERING = "social_engineering"  # 社会工程
    IDENTITY_THEFT = "identity_theft"  # 身份盗窃
    FINANCIAL_FRAUD = "financial_fraud"  # 金融欺诈
    CONTENT_FRAUD = "content_fraud"  # 内容欺诈
    MANIPULATION = "manipulation"   # 操纵欺诈

@dataclass
class FraudIndicator:
    """欺诈指标"""
    fraud_type: FraudType
    description: str
    patterns: List[str]
    severity: str  # "low", "medium", "high"

class FraudClassifier:
    """欺诈分类器"""
    
    def __init__(self):
        self.fraud_indicators = self._load_indicators()
    
    def _load_indicators(self) -> List[FraudIndicator]:
        """加载欺诈指标"""
        return [
            FraudIndicator(
                fraud_type=FraudType.PHISHING,
                description="钓鱼欺诈",
                patterns=[
                    r"验证.*账户",
                    r"点击.*链接",
                    r"确认.*信息",
                    r"紧急.*通知"
                ],
                severity="high"
            ),
            FraudIndicator(
                fraud_type=FraudType.SOCIAL_ENGINEERING,
                description="社会工程",
                patterns=[
                    r"假装.*身份",
                    r"冒充.*客服",
                    r"紧急.*帮助",
                    r"机密.*信息"
                ],
                severity="high"
            ),
            FraudIndicator(
                fraud_type=FraudType.FINANCIAL_FRAUD,
                description="金融欺诈",
                patterns=[
                    r"投资.*回报",
                    r"保证.*收益",
                    r"快速.*致富",
                    r"免费.*赚钱"
                ],
                severity="critical"
            ),
            FraudIndicator(
                fraud_type=FraudType.CONTENT_FRAUD,
                description="内容欺诈",
                patterns=[
                    r"虚假.*信息",
                    r"误导.*内容",
                    r"伪造.*证据",
                    r"假新闻"
                ],
                severity="medium"
            ),
            FraudIndicator(
                fraud_type=FraudType.MANIPULATION,
                description="操纵欺诈",
                patterns=[
                    r"限时.*优惠",
                    r"最后.*机会",
                    r"不容.*错过",
                    r"立即.*行动"
                ],
                severity="medium"
            )
        ]
    
    def classify(self, text: str) -> Dict:
        """分类欺诈"""
        detected_indicators = []
        
        for indicator in self.fraud_indicators:
            for pattern in indicator.patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    detected_indicators.append({
                        "type": indicator.fraud_type.value,
                        "description": indicator.description,
                        "pattern": pattern,
                        "severity": indicator.severity
                    })
        
        # 计算欺诈分数
        severity_weights = {"low": 0.1, "medium": 0.3, "high": 0.6, "critical": 1.0}
        fraud_score = sum(severity_weights.get(ind["severity"], 0.3) for ind in detected_indicators)
        fraud_score = min(fraud_score, 1.0)
        
        return {
            "text": text,
            "is_fraud": len(detected_indicators) > 0,
            "fraud_score": fraud_score,
            "detected_indicators": detected_indicators,
            "primary_fraud_type": detected_indicators[0]["type"] if detected_indicators else None
        }

2. 欺诈检测器

class FraudDetector:
    """欺诈检测器"""
    
    def __init__(self):
        self.classifier = FraudClassifier()
        self.pattern_detector = self._create_pattern_detector()
    
    def _create_pattern_detector(self):
        """创建模式检测器"""
        def detect_patterns(text: str) -> Dict:
            patterns = {
                "urgency": [r"立即", r"马上", r"限时", r"最后"],
                "pressure": [r"必须", r"一定", r"肯定", r"保证"],
                "secrecy": [r"保密", r"机密", r"私下", r"不要告诉"],
                "too_good": [r"免费", r"中奖", r"赠送", r"优惠"]
            }
            
            detected = {}
            for category, pattern_list in patterns.items():
                for pattern in pattern_list:
                    if re.search(pattern, text, re.IGNORECASE):
                        if category not in detected:
                            detected[category] = []
                        detected[category].append(pattern)
            
            return detected
        
        return detect_patterns
    
    def detect_fraud(self, text: str) -> Dict:
        """检测欺诈"""
        # 分类检测
        classification = self.classifier.classify(text)
        
        # 模式检测
        patterns = self.pattern_detector(text)
        
        # 综合评估
        overall_risk = classification["fraud_score"]
        
        # 基于模式增加风险
        if "urgency" in patterns and "pressure" in patterns:
            overall_risk = min(overall_risk + 0.2, 1.0)
        
        if "too_good" in patterns:
            overall_risk = min(overall_risk + 0.15, 1.0)
        
        return {
            **classification,
            "detected_patterns": patterns,
            "overall_risk": overall_risk,
            "recommendation": self._generate_recommendation(overall_risk, classification)
        }
    
    def _generate_recommendation(self, risk_score: float, classification: Dict) -> str:
        """生成建议"""
        if risk_score > 0.7:
            return "高风险:强烈建议阻止该内容"
        elif risk_score > 0.4:
            return "中风险:建议人工审核"
        elif risk_score > 0.2:
            return "低风险:可继续但需监控"
        else:
            return "安全:内容看起来正常"

3. 用户行为分析

class UserBehaviorAnalyzer:
    """用户行为分析器"""
    
    def __init__(self):
        self.user_profiles = {}
        self.anomaly_threshold = 0.7
    
    def analyze_behavior(self, user_id: str, actions: List[Dict]) -> Dict:
        """分析用户行为"""
        # 获取用户历史
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = {
                "total_actions": 0,
                "action_types": {},
                "risk_scores": []
            }
        
        profile = self.user_profiles[user_id]
        
        # 分析当前行为
        risk_score = 0
        anomalies = []
        
        for action in actions:
            action_type = action.get("type", "unknown")
            
            # 更新profile
            profile["total_actions"] += 1
            profile["action_types"][action_type] = \
                profile["action_types"].get(action_type, 0) + 1
            
            # 检测异常
            if self._is_anomalous(action, profile):
                risk_score += 0.2
                anomalies.append(action)
        
        # 更新风险分数
        profile["risk_scores"].append(risk_score)
        
        return {
            "user_id": user_id,
            "risk_score": risk_score,
            "is_suspicious": risk_score > self.anomaly_threshold,
            "anomalies": anomalies,
            "profile_summary": {
                "total_actions": profile["total_actions"],
                "action_diversity": len(profile["action_types"]),
                "avg_risk": sum(profile["risk_scores"]) / len(profile["risk_scores"]) if profile["risk_scores"] else 0
            }
        }
    
    def _is_anomalous(self, action: Dict, profile: Dict) -> bool:
        """检查是否异常"""
        # 简化实现:检查频率异常
        action_type = action.get("type", "unknown")
        expected_count = profile["total_actions"] * 0.1  # 期望比例
        
        actual_count = profile["action_types"].get(action_type, 0)
        
        return actual_count > expected_count * 3

防护策略

class FraudPreventionSystem:
    """欺诈防护系统"""
    
    def __init__(self):
        self.detector = FraudDetector()
        self.behavior_analyzer = UserBehaviorAnalyzer()
        self.prevention_rules = []
    
    def add_prevention_rule(self, name: str, rule_func):
        """添加防护规则"""
        self.prevention_rules.append({"name": name, "rule": rule_func})
    
    def analyze_request(self, user_id: str, text: str, context: Dict = None) -> Dict:
        """分析请求"""
        # 欺诈检测
        fraud_result = self.detector.detect_fraud(text)
        
        # 行为分析
        actions = context.get("actions", []) if context else []
        behavior_result = self.behavior_analyzer.analyze_behavior(user_id, actions)
        
        # 应用防护规则
        rule_results = []
        for rule in self.prevention_rules:
            try:
                result = rule["rule"](text, fraud_result, behavior_result)
                rule_results.append({
                    "rule": rule["name"],
                    "passed": result.get("passed", True),
                    "message": result.get("message", "")
                })
            except Exception as e:
                rule_results.append({
                    "rule": rule["name"],
                    "passed": False,
                    "message": f"规则执行失败: {str(e)}"
                })
        
        # 综合评估
        overall_risk = max(fraud_result["fraud_score"], behavior_result["risk_score"])
        should_block = overall_risk > 0.7 or any(not r["passed"] for r in rule_results)
        
        return {
            "user_id": user_id,
            "text": text,
            "fraud_detection": fraud_result,
            "behavior_analysis": behavior_result,
            "rule_results": rule_results,
            "overall_risk": overall_risk,
            "should_block": should_block,
            "recommendation": self._generate_recommendation(overall_risk, should_block)
        }
    
    def _generate_recommendation(self, risk_score: float, should_block: bool) -> str:
        """生成建议"""
        if should_block:
            return "建议阻止该请求"
        elif risk_score > 0.5:
            return "建议人工审核"
        elif risk_score > 0.3:
            return "建议监控后续行为"
        else:
            return "请求看起来正常"

监控和告警

class FraudMonitor:
    """欺诈监控"""
    
    def __init__(self):
        self.detection_log = []
        self.alert_thresholds = {
            "fraud_rate": 0.1,
            "high_risk_count": 10
        }
        self.metrics = {
            "total_checks": 0,
            "fraud_detected": 0,
            "by_type": {},
            "blocked_requests": 0
        }
    
    def log_detection(self, user_id: str, text: str, result: Dict):
        """记录检测"""
        self.detection_log.append({
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "text_preview": text[:100],
            "is_fraud": result.get("fraud_detection", {}).get("is_fraud", False),
            "risk_score": result.get("overall_risk", 0),
            "blocked": result.get("should_block", False)
        })
        
        self.metrics["total_checks"] += 1
        
        if result.get("fraud_detection", {}).get("is_fraud", False):
            self.metrics["fraud_detected"] += 1
            
            fraud_type = result.get("fraud_detection", {}).get("primary_fraud_type", "unknown")
            self.metrics["by_type"][fraud_type] = \
                self.metrics["by_type"].get(fraud_type, 0) + 1
        
        if result.get("should_block", False):
            self.metrics["blocked_requests"] += 1
    
    def check_alerts(self) -> List[Dict]:
        """检查告警"""
        alerts = []
        total = self.metrics["total_checks"]
        
        if total > 0:
            fraud_rate = self.metrics["fraud_detected"] / total
            if fraud_rate > self.alert_thresholds["fraud_rate"]:
                alerts.append({
                    "type": "high_fraud_rate",
                    "message": f"欺诈率过高: {fraud_rate*100:.1f}%",
                    "severity": "high"
                })
        
        return alerts
    
    def generate_report(self) -> str:
        """生成报告"""
        stats = self.metrics
        total = stats["total_checks"]
        
        report = f"""
欺诈检测报告
{'='*50}

总检测数: {total}
欺诈检测数: {stats['fraud_detected']}
阻止请求数: {stats['blocked_requests']}

按类型分布:
"""
        for fraud_type, count in stats["by_type"].items():
            report += f"  {fraud_type}: {count}\n"
        
        return report

最佳实践

  1. 多层防护:结合内容检测和行为分析
  2. 实时监控:实时监控欺诈行为
  3. 持续更新:定期更新欺诈模式
  4. 用户教育:教育用户识别欺诈

总结

欺诈检测是保护用户和系统安全的重要环节。通过多层检测和防护机制,可以有效识别和防范各种欺诈行为。