← 返回首页
🤖

循环神经网络详解

📂 ai ⏱ 5 min 933 words

循环神经网络详解

循环神经网络(Recurrent Neural Network,RNN)是专门用于处理序列数据的神经网络,具有记忆能力。

RNN原理

序列数据特点

序列数据具有时间依赖性,如文本、时间序列、语音等。

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings('ignore')

# 创建模拟序列数据
np.random.seed(42)
n_samples = 1000
seq_length = 10
n_features = 5

# 生成序列数据
X = np.random.randn(n_samples, seq_length, n_features)
y = np.random.randint(0, 2, n_samples)

print(f"序列数据形状: {X.shape}")
print(f"标签形状: {y.shape}")
print(f"序列长度: {seq_length}")
print(f"特征数量: {n_features}")

RNN单元实现

基本RNN单元

class SimpleRNNCell:
    def __init__(self, input_size, hidden_size, output_size):
        """
        简单RNN单元
        
        参数:
            input_size: 输入特征维度
            hidden_size: 隐藏状态维度
            output_size: 输出维度
        """
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        # 初始化权重
        self.W_xh = np.random.randn(input_size, hidden_size) * 0.01  # 输入到隐藏
        self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01  # 隐藏到隐藏
        self.W_hy = np.random.randn(hidden_size, output_size) * 0.01  # 隐藏到输出
        self.b_h = np.zeros((1, hidden_size))  # 隐藏层偏置
        self.b_y = np.zeros((1, output_size))  # 输出层偏置
    
    def forward(self, x, h_prev):
        """
        前向传播
        
        参数:
            x: 当前时间步输入 (batch_size, input_size)
            h_prev: 上一时间步隐藏状态 (batch_size, hidden_size)
        """
        # 计算隐藏状态
        h_current = np.tanh(np.dot(x, self.W_xh) + np.dot(h_prev, self.W_hh) + self.b_h)
        
        # 计算输出
        y = np.dot(h_current, self.W_hy) + self.b_y
        
        return y, h_current
    
    def init_hidden(self, batch_size):
        """初始化隐藏状态"""
        return np.zeros((batch_size, self.hidden_size))

# 测试RNN单元
rnn_cell = SimpleRNNCell(input_size=5, hidden_size=10, output_size=2)
batch_size = 32

# 初始化隐藏状态
h_prev = rnn_cell.init_hidden(batch_size)

# 模拟一个时间步
x_t = np.random.randn(batch_size, 5)
y_t, h_t = rnn_cell.forward(x_t, h_prev)

print(f"输入形状: {x_t.shape}")
print(f"隐藏状态形状: {h_t.shape}")
print(f"输出形状: {y_t.shape}")

完整RNN网络

class SimpleRNN:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
        """
        简单RNN网络
        
        参数:
            input_size: 输入特征维度
            hidden_size: 隐藏状态维度
            output_size: 输出维度
            learning_rate: 学习率
        """
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.lr = learning_rate
        
        # 初始化权重
        self.W_xh = np.random.randn(input_size, hidden_size) * 0.01
        self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.W_hy = np.random.randn(hidden_size, output_size) * 0.01
        self.b_h = np.zeros((1, hidden_size))
        self.b_y = np.zeros((1, output_size))
    
    def forward(self, X):
        """
        前向传播(整个序列)
        
        参数:
            X: 输入序列 (batch_size, seq_length, input_size)
        """
        batch_size, seq_length, _ = X.shape
        
        # 初始化隐藏状态
        h = np.zeros((batch_size, self.hidden_size))
        
        # 存储每个时间步的输出
        outputs = []
        
        # 处理每个时间步
        for t in range(seq_length):
            x_t = X[:, t, :]  # 当前时间步输入
            
            # 计算隐藏状态
            h = np.tanh(np.dot(x_t, self.W_xh) + np.dot(h, self.W_hh) + self.b_h)
            
            # 计算输出
            y = np.dot(h, self.W_hy) + self.b_y
            outputs.append(y)
        
        # 返回最后一个时间步的输出
        return outputs[-1], h
    
    def predict(self, X):
        """预测"""
        output, _ = self.forward(X)
        return np.argmax(output, axis=1)

# 创建RNN模型
rnn = SimpleRNN(input_size=5, hidden_size=20, output_size=2, learning_rate=0.01)

# 测试前向传播
X_batch = X[:32]  # 取32个样本
output, h_final = rnn.forward(X_batch)

print(f"输入形状: {X_batch.shape}")
print(f"输出形状: {output.shape}")
print(f"最终隐藏状态形状: {h_final.shape}")

# 预测
predictions = rnn.predict(X_batch)
print(f"预测类别: {predictions}")

使用PyTorch实现RNN

PyTorch RNN

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 转换为PyTorch张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X_tensor, y_tensor, test_size=0.2, random_state=42
)

# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)

# 定义RNN模型
class PyTorchRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(PyTorchRNN, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # RNN层
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        
        # 全连接层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # 前向传播RNN
        out, _ = self.rnn(x, h0)
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        
        # 全连接层
        out = self.fc(out)
        
        return out

# 创建模型
model = PyTorchRNN(input_size=5, hidden_size=20, output_size=2, num_layers=1)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
print("训练PyTorch RNN:")
for epoch in range(10):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_X, batch_y in train_loader:
        # 前向传播
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 统计
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
    
    # 打印统计信息
    train_accuracy = 100 * correct / total
    print(f'Epoch [{epoch+1}/10], Loss: {running_loss/len(train_loader):.4f}, '
          f'训练准确率: {train_accuracy:.2f}%')

# 测试模型
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    _, predicted = torch.max(test_outputs.data, 1)
    test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
    print(f'\n测试准确率: {test_accuracy:.2f}%')

LSTM和GRU

# LSTM模型
class PyTorchLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(PyTorchLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # 全连接层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # 前向传播LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        
        # 全连接层
        out = self.fc(out)
        
        return out

# GRU模型
class PyTorchGRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(PyTorchGRU, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # GRU层
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        
        # 全连接层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # 前向传播GRU
        out, _ = self.gru(x, h0)
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        
        # 全连接层
        out = self.fc(out)
        
        return out

# 比较不同RNN变体
models = {
    'RNN': PyTorchRNN(input_size=5, hidden_size=20, output_size=2),
    'LSTM': PyTorchLSTM(input_size=5, hidden_size=20, output_size=2),
    'GRU': PyTorchGRU(input_size=5, hidden_size=20, output_size=2)
}

print("\n比较不同RNN变体:")
for name, model in models.items():
    # 重新初始化优化器
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # 训练
    for epoch in range(5):
        model.train()
        for batch_X, batch_y in train_loader:
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    # 测试
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test)
        _, predicted = torch.max(test_outputs.data, 1)
        test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
        print(f"{name} 测试准确率: {test_accuracy:.2f}%")

实际应用

时间序列预测

# 生成时间序列数据
np.random.seed(42)
t = np.linspace(0, 100, 1000)
series = np.sin(t) + 0.1 * np.random.randn(1000)

# 创建序列数据
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)

seq_length = 20
X_ts, y_ts = create_sequences(series, seq_length)

# 划分数据集
X_train_ts, X_test_ts, y_train_ts, y_test_ts = train_test_split(
    X_ts, y_ts, test_size=0.2, random_state=42
)

# 转换为PyTorch张量
X_train_ts = torch.FloatTensor(X_train_ts).unsqueeze(-1)  # 添加特征维度
y_train_ts = torch.FloatTensor(y_train_ts)
X_test_ts = torch.FloatTensor(X_test_ts).unsqueeze(-1)
y_test_ts = torch.FloatTensor(y_test_ts)

# 定义时间序列预测模型
class TimeSeriesRNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, output_size=1):
        super(TimeSeriesRNN, self).__init__()
        
        self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, -1, :])
        return out

# 创建模型
ts_model = TimeSeriesRNN(input_size=1, hidden_size=50, output_size=1)
criterion_ts = nn.MSELoss()
optimizer_ts = optim.Adam(ts_model.parameters(), lr=0.001)

# 训练模型
print("训练时间序列预测模型:")
for epoch in range(20):
    ts_model.train()
    outputs = ts_model(X_train_ts)
    loss = criterion_ts(outputs.squeeze(), y_train_ts)
    
    optimizer_ts.zero_grad()
    loss.backward()
    optimizer_ts.step()
    
    if (epoch + 1) % 5 == 0:
        ts_model.eval()
        with torch.no_grad():
            test_outputs = ts_model(X_test_ts)
            test_loss = criterion_ts(test_outputs.squeeze(), y_test_ts)
            print(f'Epoch [{epoch+1}/20], Train Loss: {loss.item():.6f}, Test Loss: {test_loss.item():.6f}')

# 可视化预测结果
ts_model.eval()
with torch.no_grad():
    predictions = ts_model(X_test_ts).numpy()

plt.figure(figsize=(12, 6))
plt.plot(y_test_ts.numpy(), label='真实值', alpha=0.7)
plt.plot(predictions, label='预测值', alpha=0.7)
plt.xlabel('时间步')
plt.ylabel('值')
plt.title('时间序列预测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

RNN最佳实践

  1. 梯度裁剪:防止梯度爆炸
  2. 双向RNN:同时考虑过去和未来信息
  3. 注意力机制:提高长序列处理能力
  4. 层归一化:稳定训练过程
  5. 残差连接:缓解梯度消失问题

RNN是处理序列数据的重要技术,掌握RNN对于自然语言处理、时间序列分析等任务至关重要。