LSTM网络详解
LSTM网络详解
长短期记忆网络(Long Short-Term Memory,LSTM)是一种特殊的RNN,能够学习长期依赖关系,解决了传统RNN的梯度消失问题。
LSTM原理
门控机制
LSTM通过三个门控单元控制信息流动:
- 遗忘门:决定丢弃哪些信息
- 输入门:决定存储哪些新信息
- 输出门:决定输出哪些信息
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings('ignore')
# 创建模拟序列数据
np.random.seed(42)
n_samples = 1000
seq_length = 20
n_features = 10
# 生成有长期依赖的序列数据
X = np.zeros((n_samples, seq_length, n_features))
y = np.zeros(n_samples)
for i in range(n_samples):
# 生成序列
for t in range(seq_length):
X[i, t, :] = np.sin(2 * np.pi * t / seq_length) + 0.1 * np.random.randn(n_features)
# 标签基于序列的某些特征
y[i] = 1 if np.sum(X[i, -5:, 0]) > 0 else 0
print(f"序列数据形状: {X.shape}")
print(f"标签分布: {np.bincount(y.astype(int))}")
LSTM单元实现
基本LSTM单元
class LSTMCell:
def __init__(self, input_size, hidden_size):
"""
LSTM单元
参数:
input_size: 输入特征维度
hidden_size: 隐藏状态维度
"""
self.input_size = input_size
self.hidden_size = hidden_size
# 初始化权重
# 遗忘门
self.W_f = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
self.b_f = np.zeros((1, hidden_size))
# 输入门
self.W_i = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
self.b_i = np.zeros((1, hidden_size))
# 候选记忆
self.W_c = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
self.b_c = np.zeros((1, hidden_size))
# 输出门
self.W_o = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
self.b_o = np.zeros((1, hidden_size))
def _sigmoid(self, x):
return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
def _tanh(self, x):
return np.tanh(x)
def forward(self, x, h_prev, c_prev):
"""
前向传播
参数:
x: 当前时间步输入 (batch_size, input_size)
h_prev: 上一时间步隐藏状态 (batch_size, hidden_size)
c_prev: 上一时间步细胞状态 (batch_size, hidden_size)
"""
# 拼接输入和隐藏状态
combined = np.concatenate([x, h_prev], axis=1)
# 遗忘门
f = self._sigmoid(np.dot(combined, self.W_f) + self.b_f)
# 输入门
i = self._sigmoid(np.dot(combined, self.W_i) + self.b_i)
# 候选记忆
c_tilde = self._tanh(np.dot(combined, self.W_c) + self.b_c)
# 更新细胞状态
c = f * c_prev + i * c_tilde
# 输出门
o = self._sigmoid(np.dot(combined, self.W_o) + self.b_o)
# 更新隐藏状态
h = o * self._tanh(c)
return h, c
def init_states(self, batch_size):
"""初始化隐藏状态和细胞状态"""
h = np.zeros((batch_size, self.hidden_size))
c = np.zeros((batch_size, self.hidden_size))
return h, c
# 测试LSTM单元
lstm_cell = LSTMCell(input_size=10, hidden_size=20)
batch_size = 32
# 初始化状态
h_prev, c_prev = lstm_cell.init_states(batch_size)
# 模拟一个时间步
x_t = np.random.randn(batch_size, 10)
h_t, c_t = lstm_cell.forward(x_t, h_prev, c_prev)
print(f"输入形状: {x_t.shape}")
print(f"隐藏状态形状: {h_t.shape}")
print(f"细胞状态形状: {c_t.shape}")
完整LSTM网络
class SimpleLSTM:
def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
"""
简单LSTM网络
"""
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.lr = learning_rate
# LSTM参数
self.lstm_cell = LSTMCell(input_size, hidden_size)
# 输出层参数
self.W_hy = np.random.randn(hidden_size, output_size) * 0.01
self.b_y = np.zeros((1, output_size))
def forward(self, X):
"""
前向传播
"""
batch_size, seq_length, _ = X.shape
# 初始化状态
h, c = self.lstm_cell.init_states(batch_size)
# 处理每个时间步
for t in range(seq_length):
x_t = X[:, t, :]
h, c = self.lstm_cell.forward(x_t, h, c)
# 输出层
output = np.dot(h, self.W_hy) + self.b_y
return output, h, c
def predict(self, X):
"""预测"""
output, _, _ = self.forward(X)
return np.argmax(output, axis=1)
# 创建LSTM模型
lstm = SimpleLSTM(input_size=10, hidden_size=30, output_size=2, learning_rate=0.01)
# 测试前向传播
X_batch = X[:32]
output, h_final, c_final = lstm.forward(X_batch)
print(f"输入形状: {X_batch.shape}")
print(f"输出形状: {output.shape}")
print(f"最终隐藏状态形状: {h_final.shape}")
print(f"最终细胞状态形状: {c_final.shape}")
使用PyTorch实现LSTM
PyTorch LSTM
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# 转换为PyTorch张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X_tensor, y_tensor, test_size=0.2, random_state=42
)
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)
# 定义LSTM模型
class PyTorchLSTM(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1, bidirectional=False):
super(PyTorchLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.bidirectional = bidirectional
self.num_directions = 2 if bidirectional else 1
# LSTM层
self.lstm = nn.LSTM(
input_size, hidden_size, num_layers,
batch_first=True, bidirectional=bidirectional
)
# 全连接层
self.fc = nn.Linear(hidden_size * self.num_directions, output_size)
def forward(self, x):
# 初始化隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(x.device)
# 前向传播LSTM
out, _ = self.lstm(x, (h0, c0))
# 取最后一个时间步的输出
out = out[:, -1, :]
# 全连接层
out = self.fc(out)
return out
# 创建模型
model = PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=2, bidirectional=True)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
print("训练PyTorch LSTM:")
for epoch in range(10):
model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_X, batch_y in train_loader:
# 前向传播
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
# 打印统计信息
train_accuracy = 100 * correct / total
print(f'Epoch [{epoch+1}/10], Loss: {running_loss/len(train_loader):.4f}, '
f'训练准确率: {train_accuracy:.2f}%')
# 测试模型
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
_, predicted = torch.max(test_outputs.data, 1)
test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
print(f'\n测试准确率: {test_accuracy:.2f}%')
LSTM变体比较
# 比较不同LSTM变体
lstm_variants = {
'标准LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=1),
'双向LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=1, bidirectional=True),
'多层LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=3),
'双向多层LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=2, bidirectional=True)
}
print("\n比较不同LSTM变体:")
for name, model in lstm_variants.items():
# 重新初始化优化器
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练
for epoch in range(5):
model.train()
for batch_X, batch_y in train_loader:
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 测试
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
_, predicted = torch.max(test_outputs.data, 1)
test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
print(f"{name} 测试准确率: {test_accuracy:.2f}%")
实际应用
文本分类
# 模拟文本数据
np.random.seed(42)
vocab_size = 1000
max_seq_length = 50
n_samples = 2000
# 生成文本序列
X_text = np.random.randint(0, vocab_size, (n_samples, max_seq_length))
y_text = np.random.randint(0, 2, n_samples)
# 转换为PyTorch张量
X_text_tensor = torch.LongTensor(X_text)
y_text_tensor = torch.LongTensor(y_text)
# 划分数据集
X_train_text, X_test_text, y_train_text, y_test_text = train_test_split(
X_text_tensor, y_text_tensor, test_size=0.2, random_state=42
)
# 定义文本分类模型
class TextLSTM(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, output_size):
super(TextLSTM, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 嵌入层
x = self.embedding(x)
# LSTM层
out, _ = self.lstm(x)
# 取最后一个时间步的输出
out = out[:, -1, :]
# 全连接层
out = self.fc(out)
return out
# 创建模型
text_model = TextLSTM(vocab_size=1000, embed_size=64, hidden_size=128, output_size=2)
# 定义损失函数和优化器
criterion_text = nn.CrossEntropyLoss()
optimizer_text = optim.Adam(text_model.parameters(), lr=0.001)
# 创建数据加载器
train_dataset_text = TensorDataset(X_train_text, y_train_text)
train_loader_text = DataLoader(dataset=train_dataset_text, batch_size=32, shuffle=True)
# 训练模型
print("\n训练文本分类LSTM:")
for epoch in range(5):
text_model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_X, batch_y in train_loader_text:
outputs = text_model(batch_X)
loss = criterion_text(outputs, batch_y)
optimizer_text.zero_grad()
loss.backward()
optimizer_text.step()
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
train_accuracy = 100 * correct / total
print(f'Epoch [{epoch+1}/5], Loss: {running_loss/len(train_loader_text):.4f}, '
f'训练准确率: {train_accuracy:.2f}%')
# 测试模型
text_model.eval()
with torch.no_grad():
test_outputs = text_model(X_test_text)
_, predicted = torch.max(test_outputs.data, 1)
test_accuracy = 100 * (predicted == y_test_text).sum().item() / y_test_text.size(0)
print(f'\n测试准确率: {test_accuracy:.2f}%')
LSTM最佳实践
- 梯度裁剪:防止梯度爆炸
- 层归一化:稳定训练过程
- Dropout:防止过拟合
- 学习率调度:动态调整学习率
- 双向LSTM:同时考虑过去和未来信息
LSTM是处理序列数据的核心技术,掌握LSTM对于自然语言处理、时间序列预测等任务至关重要。