迁移学习详解
迁移学习详解
迁移学习是将一个任务上学到的知识应用到另一个相关任务上的技术,是深度学习中的重要方法。
迁移学习原理
为什么使用迁移学习
- 数据不足:当目标域数据量较少时
- 训练时间:减少训练时间和计算资源
- 性能提升:利用预训练模型的特征提取能力
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import models, transforms, datasets
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')
# 检查CUDA是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
预训练模型
使用PyTorch预训练模型
# 加载预训练的ResNet模型
resnet18 = models.resnet18(pretrained=True)
# 查看模型结构
print("ResNet18模型结构:")
print(resnet18)
# 修改最后的全连接层
num_ftrs = resnet18.fc.in_features
resnet18.fc = nn.Linear(num_ftrs, 10) # 假设10个类别
# 将模型移动到设备
resnet18 = resnet18.to(device)
print(f"\n修改后的模型最后一层: {resnet18.fc}")
冻结和解冻层
# 冻结所有层
for param in resnet18.parameters():
param.requires_grad = False
# 只训练最后一层
for param in resnet18.fc.parameters():
param.requires_grad = True
# 统计参数
total_params = sum(p.numel() for p in resnet18.parameters())
trainable_params = sum(p.numel() for p in resnet18.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")
print(f"冻结参数: {total_params - trainable_params:,}")
迁移学习方法
特征提取
# 特征提取方法
class FeatureExtractor:
def __init__(self, model_name='resnet18'):
self.model_name = model_name
self.model = None
self.transform = None
self._load_model()
self._setup_transform()
def _load_model(self):
"""加载预训练模型"""
if self.model_name == 'resnet18':
self.model = models.resnet18(pretrained=True)
# 移除最后的全连接层
self.model = nn.Sequential(*list(self.model.children())[:-1])
elif self.model_name == 'vgg16':
self.model = models.vgg16(pretrained=True)
# 只使用特征提取部分
self.model = self.model.features
self.model.eval()
self.model.to(device)
def _setup_transform(self):
"""设置图像预处理"""
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def extract_features(self, images):
"""提取特征"""
with torch.no_grad():
features = self.model(images.to(device))
return features.cpu().numpy()
# 使用特征提取器
feature_extractor = FeatureExtractor('resnet18')
# 模拟图像数据
dummy_images = torch.randn(5, 3, 224, 224) # 批次大小, 通道, 高度, 宽度
features = feature_extractor.extract_features(dummy_images)
print(f"输入形状: {dummy_images.shape}")
print(f"提取的特征形状: {features.shape}")
微调
# 微调方法
class FineTuner:
def __init__(self, num_classes, model_name='resnet18'):
self.num_classes = num_classes
self.model_name = model_name
self.model = None
self._load_model()
def _load_model(self):
"""加载预训练模型并修改"""
if self.model_name == 'resnet18':
self.model = models.resnet18(pretrained=True)
num_ftrs = self.model.fc.in_features
self.model.fc = nn.Linear(num_ftrs, self.num_classes)
elif self.model_name == 'vgg16':
self.model = models.vgg16(pretrained=True)
num_ftrs = self.model.classifier[6].in_features
self.model.classifier[6] = nn.Linear(num_ftrs, self.num_classes)
self.model = self.model.to(device)
def freeze_layers(self, num_layers_to_freeze):
"""冻结前N层"""
children = list(self.model.children())
# 冻结指定层
for i, child in enumerate(children):
if i < num_layers_to_freeze:
for param in child.parameters():
param.requires_grad = False
# 统计参数
total_params = sum(p.numel() for p in self.model.parameters())
trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
print(f"冻结前{num_layers_to_freeze}层后:")
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")
def get_optimizer(self, lr=0.001):
"""获取优化器(只优化可训练参数)"""
params_to_update = []
for param in self.model.parameters():
if param.requires_grad:
params_to_update.append(param)
return optim.Adam(params_to_update, lr=lr)
def train(self, train_loader, val_loader, epochs=10):
"""训练模型"""
criterion = nn.CrossEntropyLoss()
optimizer = self.get_optimizer()
train_losses = []
val_accs = []
for epoch in range(epochs):
# 训练阶段
self.model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = self.model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
train_loss = running_loss / len(train_loader)
train_acc = 100 * correct / total
train_losses.append(train_loss)
# 验证阶段
val_acc = self._validate(val_loader)
val_accs.append(val_acc)
print(f'Epoch [{epoch+1}/{epochs}], '
f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, '
f'Val Acc: {val_acc:.2f}%')
return train_losses, val_accs
def _validate(self, val_loader):
"""验证模型"""
self.model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in val_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
outputs = self.model(batch_X)
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
return 100 * correct / total
# 创建微调器
fine_tuner = FineTuner(num_classes=10, model_name='resnet18')
fine_tuner.freeze_layers(6) # 冻结前6层
实际应用
图像分类迁移学习
# 创建模拟数据
np.random.seed(42)
n_samples = 500
n_classes = 10
# 模拟图像数据(真实场景中应使用实际图像)
X = np.random.randn(n_samples, 3, 224, 224).astype(np.float32)
y = np.random.randint(0, n_classes, n_samples)
# 转换为PyTorch张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)
# 划分数据集
X_train, X_val, y_train, y_val = train_test_split(
X_tensor, y_tensor, test_size=0.2, random_state=42
)
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
# 训练模型
print("训练迁移学习模型:")
train_losses, val_accs = fine_tuner.train(train_loader, val_loader, epochs=10)
# 可视化训练过程
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(train_losses, 'b-', label='训练损失')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('损失')
ax1.set_title('训练损失曲线')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax2.plot(val_accs, 'r-', label='验证准确率')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('准确率')
ax2.set_title('验证准确率曲线')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
使用TensorFlow的迁移学习
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import VGG16
# 加载预训练的VGG16模型
base_model = VGG16(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# 冻结基础模型
base_model.trainable = False
# 构建迁移学习模型
model = keras.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dense(256, activation='relu'),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
# 编译模型
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.summary()
# 统计参数
total_params = model.count_params()
trainable_params = sum([tf.keras.backend.count_params(w) for w in model.trainable_weights])
print(f"\n总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")
迁移学习策略
逐步解冻
# 逐步解冻策略
def gradual_unfreezing(model, unfreeze_from_epoch=5, layers_per_epoch=2):
"""逐步解冻层"""
children = list(model.children())
n_layers = len(children)
for epoch in range(10):
# 计算要解冻的层数
layers_to_unfreeze = min(unfreeze_from_epoch + epoch * layers_per_epoch, n_layers)
# 解冻指定层
for i, child in enumerate(children):
if i >= n_layers - layers_to_unfreeze:
for param in child.parameters():
param.requires_grad = True
# 统计可训练参数
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Epoch {epoch+1}: 解冻{layers_to_unfreeze}层, 可训练参数: {trainable_params:,}")
# 使用逐步解冻
gradual_unfreezing(resnet18)
迁移学习最佳实践
- 选择合适的预训练模型:根据任务选择模型架构
- 数据预处理:使用与预训练模型相同的预处理
- 冻结策略:先冻结底层,再逐步解冻
- 学习率调整:使用较小的学习率微调
- 数据增强:增加训练数据多样性
迁移学习是深度学习中的重要技术,掌握迁移学习可以在数据有限的情况下构建高性能模型。