← 返回首页
🤖

LSTM网络详解

📂 ai ⏱ 5 min 939 words

LSTM网络详解

长短期记忆网络(Long Short-Term Memory,LSTM)是一种特殊的RNN,能够学习长期依赖关系,解决了传统RNN的梯度消失问题。

LSTM原理

门控机制

LSTM通过三个门控单元控制信息流动:

  1. 遗忘门:决定丢弃哪些信息
  2. 输入门:决定存储哪些新信息
  3. 输出门:决定输出哪些信息
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings('ignore')

# 创建模拟序列数据
np.random.seed(42)
n_samples = 1000
seq_length = 20
n_features = 10

# 生成有长期依赖的序列数据
X = np.zeros((n_samples, seq_length, n_features))
y = np.zeros(n_samples)

for i in range(n_samples):
    # 生成序列
    for t in range(seq_length):
        X[i, t, :] = np.sin(2 * np.pi * t / seq_length) + 0.1 * np.random.randn(n_features)
    
    # 标签基于序列的某些特征
    y[i] = 1 if np.sum(X[i, -5:, 0]) > 0 else 0

print(f"序列数据形状: {X.shape}")
print(f"标签分布: {np.bincount(y.astype(int))}")

LSTM单元实现

基本LSTM单元

class LSTMCell:
    def __init__(self, input_size, hidden_size):
        """
        LSTM单元
        
        参数:
            input_size: 输入特征维度
            hidden_size: 隐藏状态维度
        """
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        # 初始化权重
        # 遗忘门
        self.W_f = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
        self.b_f = np.zeros((1, hidden_size))
        
        # 输入门
        self.W_i = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
        self.b_i = np.zeros((1, hidden_size))
        
        # 候选记忆
        self.W_c = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
        self.b_c = np.zeros((1, hidden_size))
        
        # 输出门
        self.W_o = np.random.randn(input_size + hidden_size, hidden_size) * 0.01
        self.b_o = np.zeros((1, hidden_size))
    
    def _sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
    
    def _tanh(self, x):
        return np.tanh(x)
    
    def forward(self, x, h_prev, c_prev):
        """
        前向传播
        
        参数:
            x: 当前时间步输入 (batch_size, input_size)
            h_prev: 上一时间步隐藏状态 (batch_size, hidden_size)
            c_prev: 上一时间步细胞状态 (batch_size, hidden_size)
        """
        # 拼接输入和隐藏状态
        combined = np.concatenate([x, h_prev], axis=1)
        
        # 遗忘门
        f = self._sigmoid(np.dot(combined, self.W_f) + self.b_f)
        
        # 输入门
        i = self._sigmoid(np.dot(combined, self.W_i) + self.b_i)
        
        # 候选记忆
        c_tilde = self._tanh(np.dot(combined, self.W_c) + self.b_c)
        
        # 更新细胞状态
        c = f * c_prev + i * c_tilde
        
        # 输出门
        o = self._sigmoid(np.dot(combined, self.W_o) + self.b_o)
        
        # 更新隐藏状态
        h = o * self._tanh(c)
        
        return h, c
    
    def init_states(self, batch_size):
        """初始化隐藏状态和细胞状态"""
        h = np.zeros((batch_size, self.hidden_size))
        c = np.zeros((batch_size, self.hidden_size))
        return h, c

# 测试LSTM单元
lstm_cell = LSTMCell(input_size=10, hidden_size=20)
batch_size = 32

# 初始化状态
h_prev, c_prev = lstm_cell.init_states(batch_size)

# 模拟一个时间步
x_t = np.random.randn(batch_size, 10)
h_t, c_t = lstm_cell.forward(x_t, h_prev, c_prev)

print(f"输入形状: {x_t.shape}")
print(f"隐藏状态形状: {h_t.shape}")
print(f"细胞状态形状: {c_t.shape}")

完整LSTM网络

class SimpleLSTM:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
        """
        简单LSTM网络
        """
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.lr = learning_rate
        
        # LSTM参数
        self.lstm_cell = LSTMCell(input_size, hidden_size)
        
        # 输出层参数
        self.W_hy = np.random.randn(hidden_size, output_size) * 0.01
        self.b_y = np.zeros((1, output_size))
    
    def forward(self, X):
        """
        前向传播
        """
        batch_size, seq_length, _ = X.shape
        
        # 初始化状态
        h, c = self.lstm_cell.init_states(batch_size)
        
        # 处理每个时间步
        for t in range(seq_length):
            x_t = X[:, t, :]
            h, c = self.lstm_cell.forward(x_t, h, c)
        
        # 输出层
        output = np.dot(h, self.W_hy) + self.b_y
        
        return output, h, c
    
    def predict(self, X):
        """预测"""
        output, _, _ = self.forward(X)
        return np.argmax(output, axis=1)

# 创建LSTM模型
lstm = SimpleLSTM(input_size=10, hidden_size=30, output_size=2, learning_rate=0.01)

# 测试前向传播
X_batch = X[:32]
output, h_final, c_final = lstm.forward(X_batch)

print(f"输入形状: {X_batch.shape}")
print(f"输出形状: {output.shape}")
print(f"最终隐藏状态形状: {h_final.shape}")
print(f"最终细胞状态形状: {c_final.shape}")

使用PyTorch实现LSTM

PyTorch LSTM

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 转换为PyTorch张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X_tensor, y_tensor, test_size=0.2, random_state=42
)

# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)

# 定义LSTM模型
class PyTorchLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, bidirectional=False):
        super(PyTorchLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers, 
            batch_first=True, bidirectional=bidirectional
        )
        
        # 全连接层
        self.fc = nn.Linear(hidden_size * self.num_directions, output_size)
    
    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(x.device)
        
        # 前向传播LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        
        # 全连接层
        out = self.fc(out)
        
        return out

# 创建模型
model = PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=2, bidirectional=True)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
print("训练PyTorch LSTM:")
for epoch in range(10):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_X, batch_y in train_loader:
        # 前向传播
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 统计
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
    
    # 打印统计信息
    train_accuracy = 100 * correct / total
    print(f'Epoch [{epoch+1}/10], Loss: {running_loss/len(train_loader):.4f}, '
          f'训练准确率: {train_accuracy:.2f}%')

# 测试模型
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    _, predicted = torch.max(test_outputs.data, 1)
    test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
    print(f'\n测试准确率: {test_accuracy:.2f}%')

LSTM变体比较

# 比较不同LSTM变体
lstm_variants = {
    '标准LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=1),
    '双向LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=1, bidirectional=True),
    '多层LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=3),
    '双向多层LSTM': PyTorchLSTM(input_size=10, hidden_size=50, output_size=2, num_layers=2, bidirectional=True)
}

print("\n比较不同LSTM变体:")
for name, model in lstm_variants.items():
    # 重新初始化优化器
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # 训练
    for epoch in range(5):
        model.train()
        for batch_X, batch_y in train_loader:
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    # 测试
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test)
        _, predicted = torch.max(test_outputs.data, 1)
        test_accuracy = 100 * (predicted == y_test).sum().item() / y_test.size(0)
        print(f"{name} 测试准确率: {test_accuracy:.2f}%")

实际应用

文本分类

# 模拟文本数据
np.random.seed(42)
vocab_size = 1000
max_seq_length = 50
n_samples = 2000

# 生成文本序列
X_text = np.random.randint(0, vocab_size, (n_samples, max_seq_length))
y_text = np.random.randint(0, 2, n_samples)

# 转换为PyTorch张量
X_text_tensor = torch.LongTensor(X_text)
y_text_tensor = torch.LongTensor(y_text)

# 划分数据集
X_train_text, X_test_text, y_train_text, y_test_text = train_test_split(
    X_text_tensor, y_text_tensor, test_size=0.2, random_state=42
)

# 定义文本分类模型
class TextLSTM(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size):
        super(TextLSTM, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # 嵌入层
        x = self.embedding(x)
        
        # LSTM层
        out, _ = self.lstm(x)
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        
        # 全连接层
        out = self.fc(out)
        
        return out

# 创建模型
text_model = TextLSTM(vocab_size=1000, embed_size=64, hidden_size=128, output_size=2)

# 定义损失函数和优化器
criterion_text = nn.CrossEntropyLoss()
optimizer_text = optim.Adam(text_model.parameters(), lr=0.001)

# 创建数据加载器
train_dataset_text = TensorDataset(X_train_text, y_train_text)
train_loader_text = DataLoader(dataset=train_dataset_text, batch_size=32, shuffle=True)

# 训练模型
print("\n训练文本分类LSTM:")
for epoch in range(5):
    text_model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_X, batch_y in train_loader_text:
        outputs = text_model(batch_X)
        loss = criterion_text(outputs, batch_y)
        
        optimizer_text.zero_grad()
        loss.backward()
        optimizer_text.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
    
    train_accuracy = 100 * correct / total
    print(f'Epoch [{epoch+1}/5], Loss: {running_loss/len(train_loader_text):.4f}, '
          f'训练准确率: {train_accuracy:.2f}%')

# 测试模型
text_model.eval()
with torch.no_grad():
    test_outputs = text_model(X_test_text)
    _, predicted = torch.max(test_outputs.data, 1)
    test_accuracy = 100 * (predicted == y_test_text).sum().item() / y_test_text.size(0)
    print(f'\n测试准确率: {test_accuracy:.2f}%')

LSTM最佳实践

  1. 梯度裁剪:防止梯度爆炸
  2. 层归一化:稳定训练过程
  3. Dropout:防止过拟合
  4. 学习率调度:动态调整学习率
  5. 双向LSTM:同时考虑过去和未来信息

LSTM是处理序列数据的核心技术,掌握LSTM对于自然语言处理、时间序列预测等任务至关重要。