异常检测技术详解
异常检测技术详解
异常检测是识别数据中异常模式或异常值的技术,在欺诈检测、网络入侵检测等领域有广泛应用。
异常检测原理
异常类型
- 点异常:单个数据点明显偏离正常模式
- 上下文异常:在特定上下文中异常
- 集体异常:一组数据点的组合异常
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from sklearn.datasets import make_blobs, make_moons
from sklearn.preprocessing import StandardScaler
import pandas as pd
# 创建示例数据(包含异常值)
np.random.seed(42)
n_normal = 200
n_anomaly = 20
# 正常数据
X_normal, y_normal = make_blobs(n_samples=n_normal, centers=2,
cluster_std=0.5, random_state=42)
# 异常数据
X_anomaly = np.random.uniform(low=-6, high=6, size=(n_anomaly, 2))
# 合并数据
X = np.vstack([X_normal, X_anomaly])
y_true = np.hstack([np.zeros(n_normal), np.ones(n_anomaly)])
print(f"数据集大小: {X.shape}")
print(f"正常样本: {n_normal}")
print(f"异常样本: {n_anomaly}")
异常检测算法
1. 孤立森林(Isolation Forest)
# 孤立森林
iso_forest = IsolationForest(contamination=0.1, random_state=42)
y_pred_if = iso_forest.fit_predict(X)
# 获取异常分数
anomaly_scores_if = iso_forest.decision_function(X)
print("孤立森林检测结果:")
print(f"预测异常数量: {np.sum(y_pred_if == -1)}")
print(f"实际异常数量: {np.sum(y_true == 1)}")
# 可视化
plt.figure(figsize=(12, 5))
# 原始数据
plt.subplot(1, 2, 1)
plt.scatter(X[y_true == 0, 0], X[y_true == 0, 1], c='blue', label='正常', alpha=0.6)
plt.scatter(X[y_true == 1, 0], X[y_true == 1, 1], c='red', label='异常', alpha=0.8)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('原始数据')
plt.legend()
plt.grid(True, alpha=0.3)
# 检测结果
plt.subplot(1, 2, 2)
plt.scatter(X[y_pred_if == 1, 0], X[y_pred_if == 1, 1], c='blue', label='正常', alpha=0.6)
plt.scatter(X[y_pred_if == -1, 0], X[y_pred_if == -1, 1], c='red', label='异常', alpha=0.8)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('孤立森林检测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
2. 局部异常因子(LOF)
# 局部异常因子
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred_lof = lof.fit_predict(X)
# 获取异常分数
anomaly_scores_lof = lof.negative_outlier_factor_
print("\nLOF检测结果:")
print(f"预测异常数量: {np.sum(y_pred_lof == -1)}")
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X[y_pred_lof == 1, 0], X[y_pred_lof == 1, 1],
c='blue', label='正常', alpha=0.6)
plt.scatter(X[y_pred_lof == -1, 0], X[y_pred_lof == -1, 1],
c='red', label='异常', alpha=0.8)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('LOF检测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
3. 单类SVM
# 单类SVM
oc_svm = OneClassSVM(kernel='rbf', gamma='auto', contamination=0.1)
y_pred_svm = oc_svm.fit_predict(X)
print("\n单类SVM检测结果:")
print(f"预测异常数量: {np.sum(y_pred_svm == -1)}")
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X[y_pred_svm == 1, 0], X[y_pred_svm == 1, 1],
c='blue', label='正常', alpha=0.6)
plt.scatter(X[y_pred_svm == -1, 0], X[y_pred_svm == -1, 1],
c='red', label='异常', alpha=0.8)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('单类SVM检测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
4. 椭圆包络(基于高斯分布)
# 椭圆包络
ee = EllipticEnvelope(contamination=0.1, random_state=42)
y_pred_ee = ee.fit_predict(X)
print("\n椭圆包络检测结果:")
print(f"预测异常数量: {np.sum(y_pred_ee == -1)}")
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X[y_pred_ee == 1, 0], X[y_pred_ee == 1, 1],
c='blue', label='正常', alpha=0.6)
plt.scatter(X[y_pred_ee == -1, 0], X[y_pred_ee == -1, 1],
c='red', label='异常', alpha=0.8)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('椭圆包络检测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
算法比较
性能评估
from sklearn.metrics import precision_score, recall_score, f1_score
# 比较不同算法
algorithms = {
'孤立森林': y_pred_if,
'LOF': y_pred_lof,
'单类SVM': y_pred_svm,
'椭圆包络': y_pred_ee
}
print("算法性能比较:")
print("-" * 60)
print(f"{'算法':<15} {'精确率':<10} {'召回率':<10} {'F1分数':<10}")
print("-" * 60)
for name, y_pred in algorithms.items():
# 转换标签:异常=1,正常=0
y_pred_binary = (y_pred == -1).astype(int)
y_true_binary = y_true.astype(int)
precision = precision_score(y_true_binary, y_pred_binary)
recall = recall_score(y_true_binary, y_pred_binary)
f1 = f1_score(y_true_binary, y_pred_binary)
print(f"{name:<15} {precision:<10.4f} {recall:<10.4f} {f1:<10.4f}")
可视化比较
# 比较不同算法的检测结果
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for idx, (name, y_pred) in enumerate(algorithms.items()):
ax = axes[idx // 2, idx % 2]
ax.scatter(X[y_pred == 1, 0], X[y_pred == 1, 1],
c='blue', label='正常', alpha=0.6, s=30)
ax.scatter(X[y_pred == -1, 0], X[y_pred == -1, 1],
c='red', label='异常', alpha=0.8, s=50)
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
ax.set_title(name)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
实际应用
金融欺诈检测
# 模拟金融交易数据
np.random.seed(42)
n_transactions = 1000
# 正常交易
normal_amount = np.random.lognormal(mean=4, sigma=1, size=n_transactions-50)
normal_time = np.random.uniform(0, 24, n_transactions-50) # 24小时制
# 欺诈交易
fraud_amount = np.random.lognormal(mean=7, sigma=1, size=50) # 金额更大
fraud_time = np.random.choice([0, 1, 2, 3, 22, 23], size=50) # 异常时间
# 合并数据
amounts = np.concatenate([normal_amount, fraud_amount])
times = np.concatenate([normal_time, fraud_time])
is_fraud = np.concatenate([np.zeros(n_transactions-50), np.ones(50)])
# 创建特征矩阵
X_fraud = np.column_stack([amounts, times])
# 使用孤立森林检测
iso_forest_fraud = IsolationForest(contamination=0.05, random_state=42)
predictions = iso_forest_fraud.fit_predict(X_fraud)
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X_fraud[is_fraud == 0, 0], X_fraud[is_fraud == 0, 1],
c='blue', label='正常交易', alpha=0.6)
plt.scatter(X_fraud[is_fraud == 1, 0], X_fraud[is_fraud == 1, 1],
c='red', label='欺诈交易', alpha=0.8)
plt.xlabel('交易金额')
plt.ylabel('交易时间')
plt.title('原始交易数据')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.scatter(X_fraud[predictions == 1, 0], X_fraud[predictions == 1, 1],
c='blue', label='正常', alpha=0.6)
plt.scatter(X_fraud[predictions == -1, 0], X_fraud[predictions == -1, 1],
c='red', label='异常', alpha=0.8)
plt.xlabel('交易金额')
plt.ylabel('交易时间')
plt.title('异常检测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 评估
print(f"检测到的异常交易: {np.sum(predictions == -1)}")
print(f"实际欺诈交易: {np.sum(is_fraud == 1)}")
异常检测最佳实践
- 数据预处理:标准化数据,处理缺失值
- 特征工程:创建有意义的特征
- 算法选择:根据数据特点选择合适的算法
- 参数调优:调整污染率等参数
- 结果验证:使用领域知识验证检测结果
异常检测是数据科学中的重要技术,掌握异常检测对于数据质量控制和安全监控至关重要。