循环神经网络详解
循环神经网络详解
循环神经网络(Recurrent Neural Network,RNN)是专门用于处理序列数据的神经网络,具有记忆能力。
RNN原理
序列数据特点
序列数据具有时间依赖性,如文本、时间序列、语音等。
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings('ignore')
# 创建模拟序列数据
np.random.seed(42)
n_samples = 1000
seq_length = 10
n_features = 5
# 生成序列数据
X = np.random.randn(n_samples, seq_length, n_features)
y = np.random.randint(0, 2, n_samples)
print(f"序列数据形状: {X.shape}")
print(f"标签形状: {y.shape}")
print(f"序列长度: {seq_length}")
print(f"特征数量: {n_features}")
RNN单元实现
基本RNN单元
class SimpleRNNCell:
def __init__(self, input_size, hidden_size, output_size):
"""
简单RNN单元
参数:
input_size: 输入特征维度
hidden_size: 隐藏状态维度
output_size: 输出维度
"""
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
# 初始化权重
self.W_xh = np.random.randn(input_size, hidden_size) * 0.01 # 输入到隐藏
self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01 # 隐藏到隐藏
self.W_hy = np.random.randn(hidden_size, output_size) * 0.01 # 隐藏到输出
self.b_h = np.zeros((1, hidden_size)) # 隐藏层偏置
self.b_y = np.zeros((1, output_size)) # 输出层偏置
def forward(self, x, h_prev):
"""
前向传播
参数:
x: 当前时间步输入 (batch_size, input_size)
h_prev: 上一时间步隐藏状态 (batch_size, hidden_size)
"""
# 计算隐藏状态
h_current = np.tanh(np.dot(x, self.W_xh) + np.dot(h_prev, self.W_hh) + self.b_h)
# 计算输出
y = np.dot(h_current, self.W_hy) + self.b_y
return y, h_current
def init_hidden(self, batch_size):
"""初始化隐藏状态"""
return np.zeros((batch_size, self.hidden_size))
# 测试RNN单元
rnn_cell = SimpleRNNCell(input_size=5, hidden_size=10, output_size=2)
batch_size = 32
# 初始化隐藏状态
h_prev = rnn_cell.init_hidden(batch_size)
# 模拟一个时间步
x_t = np.random.randn(batch_size, 5)
y_t, h_t = rnn_cell.forward(x_t, h_prev)
print(f"输入形状: {x_t.shape}")
print(f"隐藏状态形状: {h_t.shape}")
print(f"输出形状: {y_t.shape}")
完整RNN网络
class SimpleRNN:
def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
"""
简单RNN网络
参数:
input_size: 输入特征维度
hidden_size: 隐藏状态维度
output_size: 输出维度
learning_rate: 学习率
"""
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.lr = learning_rate
# 初始化权重
self.W_xh = np.random.randn(input_size, hidden_size) * 0.01
self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01
self.W_hy = np.random.randn(hidden_size, output_size) * 0.01
self.b_h = np.zeros((1, hidden_size))
self.b_y = np.zeros((1, output_size))
def forward(self, X):
"""
前向传播(整个序列)
参数:
X: 输入序列 (batch_size, seq_length, input_size)
"""
batch_size, seq_length, _ = X.shape
# 初始化隐藏状态
h = np.zeros((batch_size, self.hidden_size))
# 存储每个时间步的输出
outputs = []
# 处理每个时间步
for t in range(seq_length):
x_t = X[:, t, :] # 当前时间步输入
# 计算隐藏状态
h = np.tanh(np.dot(x_t, self.W_xh) + np.dot(h, self.W_hh) + self.b_h)
# 计算输出
y = np.dot(h, self.W_hy) + self.b_y
outputs.append(y)
# 返回最后一个时间步的输出
return outputs[-1], h
def predict(self, X):
"""预测"""
output, _ = self.forward(X)
return np.argmax(output, axis=1)
# 创建RNN模型
rnn = SimpleRNN(input_size=5, hidden_size=20, output_size=2, learning_rate=0.01)
# 测试前向传播
X_batch = X[:32] # 取32个样本
output, h_final = rnn.forward(X_batch)
print(f"输入形状: {X_batch.shape}")
print(f"输出形状: {output.shape}")
print(f"最终隐藏状态形状: {h_final.shape}")
# 预测
predictions = rnn.predict(X_batch)
print(f"预测类别: {predictions}")
使用PyTorch实现RNN
PyTorch RNN
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# 转换为PyTorch张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X_tensor, y_tensor, test_size=0.2, random_state=42
)
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)
# 定义RNN模型
class PyTorchRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super(PyTorchRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# RNN层
self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
# 全连接层
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 初始化隐藏状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# 前向传播RNN
out, _ = self.rnn(x, h0)
# 取最后一个时间步的输出
out = out[:, -1, :]
# 全连接层
out = self.fc(out)
return out
# 创建模型
model = PyTorchRNN(input_size=5, hidden_size=20, output_size=2, num_layers=1)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
print("训练PyTorch RNN:")
for epoch in range(10):
model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_X, batch_y in train_loader:
# 前向传播
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计
running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
# 打印统计信息
train_accuracy = 100 * correct / total
print(f'Epoch [{epoch+1}/10], Loss: {running_loss/len(train_loader):.4f}, '
f'训练准确率: {train_accuracy:.2f}%')
# 测试模型
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
_, predicted = torch.max(test_outputs.data, 1)
test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
print(f'\n测试准确率: {test_accuracy:.2f}%')
LSTM和GRU
# LSTM模型
class PyTorchLSTM(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super(PyTorchLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# LSTM层
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# 全连接层
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 初始化隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# 前向传播LSTM
out, _ = self.lstm(x, (h0, c0))
# 取最后一个时间步的输出
out = out[:, -1, :]
# 全连接层
out = self.fc(out)
return out
# GRU模型
class PyTorchGRU(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super(PyTorchGRU, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# GRU层
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
# 全连接层
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 初始化隐藏状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# 前向传播GRU
out, _ = self.gru(x, h0)
# 取最后一个时间步的输出
out = out[:, -1, :]
# 全连接层
out = self.fc(out)
return out
# 比较不同RNN变体
models = {
'RNN': PyTorchRNN(input_size=5, hidden_size=20, output_size=2),
'LSTM': PyTorchLSTM(input_size=5, hidden_size=20, output_size=2),
'GRU': PyTorchGRU(input_size=5, hidden_size=20, output_size=2)
}
print("\n比较不同RNN变体:")
for name, model in models.items():
# 重新初始化优化器
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练
for epoch in range(5):
model.train()
for batch_X, batch_y in train_loader:
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 测试
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
_, predicted = torch.max(test_outputs.data, 1)
test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
print(f"{name} 测试准确率: {test_accuracy:.2f}%")
实际应用
时间序列预测
# 生成时间序列数据
np.random.seed(42)
t = np.linspace(0, 100, 1000)
series = np.sin(t) + 0.1 * np.random.randn(1000)
# 创建序列数据
def create_sequences(data, seq_length):
X, y = [], []
for i in range(len(data) - seq_length):
X.append(data[i:i+seq_length])
y.append(data[i+seq_length])
return np.array(X), np.array(y)
seq_length = 20
X_ts, y_ts = create_sequences(series, seq_length)
# 划分数据集
X_train_ts, X_test_ts, y_train_ts, y_test_ts = train_test_split(
X_ts, y_ts, test_size=0.2, random_state=42
)
# 转换为PyTorch张量
X_train_ts = torch.FloatTensor(X_train_ts).unsqueeze(-1) # 添加特征维度
y_train_ts = torch.FloatTensor(y_train_ts)
X_test_ts = torch.FloatTensor(X_test_ts).unsqueeze(-1)
y_test_ts = torch.FloatTensor(y_test_ts)
# 定义时间序列预测模型
class TimeSeriesRNN(nn.Module):
def __init__(self, input_size=1, hidden_size=50, output_size=1):
super(TimeSeriesRNN, self).__init__()
self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
out, _ = self.rnn(x)
out = self.fc(out[:, -1, :])
return out
# 创建模型
ts_model = TimeSeriesRNN(input_size=1, hidden_size=50, output_size=1)
criterion_ts = nn.MSELoss()
optimizer_ts = optim.Adam(ts_model.parameters(), lr=0.001)
# 训练模型
print("训练时间序列预测模型:")
for epoch in range(20):
ts_model.train()
outputs = ts_model(X_train_ts)
loss = criterion_ts(outputs.squeeze(), y_train_ts)
optimizer_ts.zero_grad()
loss.backward()
optimizer_ts.step()
if (epoch + 1) % 5 == 0:
ts_model.eval()
with torch.no_grad():
test_outputs = ts_model(X_test_ts)
test_loss = criterion_ts(test_outputs.squeeze(), y_test_ts)
print(f'Epoch [{epoch+1}/20], Train Loss: {loss.item():.6f}, Test Loss: {test_loss.item():.6f}')
# 可视化预测结果
ts_model.eval()
with torch.no_grad():
predictions = ts_model(X_test_ts).numpy()
plt.figure(figsize=(12, 6))
plt.plot(y_test_ts.numpy(), label='真实值', alpha=0.7)
plt.plot(predictions, label='预测值', alpha=0.7)
plt.xlabel('时间步')
plt.ylabel('值')
plt.title('时间序列预测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
RNN最佳实践
- 梯度裁剪:防止梯度爆炸
- 双向RNN:同时考虑过去和未来信息
- 注意力机制:提高长序列处理能力
- 层归一化:稳定训练过程
- 残差连接:缓解梯度消失问题
RNN是处理序列数据的重要技术,掌握RNN对于自然语言处理、时间序列分析等任务至关重要。