← 返回首页
🤖

反向传播算法详解

📂 ai ⏱ 6 min 1076 words

反向传播算法详解

反向传播(Backpropagation)是训练神经网络的核心算法,通过链式法则计算损失函数对每个参数的梯度。

反向传播原理

链式法则

反向传播基于微积分中的链式法则,从输出层向输入层逐层计算梯度。

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings('ignore')

# 创建示例数据
X, y = make_moons(n_samples=1000, noise=0.2, random_state=42)

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"训练集大小: {X_train_scaled.shape[0]}")
print(f"测试集大小: {X_test_scaled.shape[0]}")

反向传播实现

手动实现反向传播

class NeuralNetworkBackprop:
    def __init__(self, layer_sizes, learning_rate=0.01):
        """
        初始化神经网络
        
        参数:
            layer_sizes: 各层神经元数量列表
            learning_rate: 学习率
        """
        self.layer_sizes = layer_sizes
        self.lr = learning_rate
        
        # 初始化权重和偏置
        self.weights = []
        self.biases = []
        
        for i in range(len(layer_sizes) - 1):
            # He初始化
            w = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * np.sqrt(2.0 / layer_sizes[i])
            b = np.zeros((1, layer_sizes[i+1]))
            self.weights.append(w)
            self.biases.append(b)
        
        # 存储中间结果
        self.z_values = []  # 线性变换结果
        self.a_values = []  # 激活函数结果
    
    def _relu(self, x):
        """ReLU激活函数"""
        return np.maximum(0, x)
    
    def _relu_derivative(self, x):
        """ReLU导数"""
        return (x > 0).astype(float)
    
    def _sigmoid(self, x):
        """Sigmoid激活函数"""
        return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
    
    def _sigmoid_derivative(self, x):
        """Sigmoid导数"""
        return x * (1 - x)
    
    def forward(self, X):
        """前向传播"""
        self.z_values = []
        self.a_values = [X]
        
        current_input = X
        
        for i in range(len(self.weights) - 1):
            # 线性变换
            z = np.dot(current_input, self.weights[i]) + self.biases[i]
            self.z_values.append(z)
            
            # 激活函数(隐藏层使用ReLU)
            a = self._relu(z)
            self.a_values.append(a)
            
            current_input = a
        
        # 输出层(使用Sigmoid进行二分类)
        z = np.dot(current_input, self.weights[-1]) + self.biases[-1]
        self.z_values.append(z)
        a = self._sigmoid(z)
        self.a_values.append(a)
        
        return a
    
    def compute_loss(self, y_true, y_pred):
        """计算二元交叉熵损失"""
        m = y_true.shape[0]
        y_true = y_true.reshape(-1, 1)
        
        # 避免log(0)
        y_pred = np.clip(y_pred, 1e-8, 1 - 1e-8)
        
        loss = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
        return loss
    
    def backward(self, X, y):
        """
        反向传播算法
        
        计算损失函数对每个参数的梯度
        """
        m = X.shape[0]
        y = y.reshape(-1, 1)
        
        # 存储梯度
        d_weights = [None] * len(self.weights)
        d_biases = [None] * len(self.biases)
        
        # 从输出层开始反向传播
        # 输出层误差
        output = self.a_values[-1]
        delta = output - y  # 对于Sigmoid输出和交叉熵损失
        
        # 计算输出层梯度
        d_weights[-1] = np.dot(self.a_values[-2].T, delta) / m
        d_biases[-1] = np.sum(delta, axis=0, keepdims=True) / m
        
        # 反向传播到隐藏层
        for i in range(len(self.weights) - 2, -1, -1):
            # 计算当前层的误差
            delta = np.dot(delta, self.weights[i+1].T) * self._relu_derivative(self.z_values[i])
            
            # 计算梯度
            d_weights[i] = np.dot(self.a_values[i].T, delta) / m
            d_biases[i] = np.sum(delta, axis=0, keepdims=True) / m
        
        # 更新权重和偏置
        for i in range(len(self.weights)):
            self.weights[i] -= self.lr * d_weights[i]
            self.biases[i] -= self.lr * d_biases[i]
        
        return d_weights, d_biases
    
    def fit(self, X, y, epochs=1000, verbose=True):
        """训练模型"""
        losses = []
        accuracies = []
        
        for epoch in range(epochs):
            # 前向传播
            output = self.forward(X)
            
            # 计算损失
            loss = self.compute_loss(y, output)
            losses.append(loss)
            
            # 计算准确率
            y_pred = (output > 0.5).astype(int).flatten()
            accuracy = accuracy_score(y, y_pred)
            accuracies.append(accuracy)
            
            # 反向传播
            self.backward(X, y)
            
            if verbose and (epoch + 1) % 100 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")
        
        return losses, accuracies
    
    def predict(self, X):
        """预测"""
        output = self.forward(X)
        return (output > 0.5).astype(int).flatten()

# 创建神经网络
nn = NeuralNetworkBackprop(layer_sizes=[2, 10, 8, 1], learning_rate=0.1)

# 训练模型
print("训练神经网络(使用反向传播):")
losses, accuracies = nn.fit(X_train_scaled, y_train, epochs=1000, verbose=True)

# 测试集评估
y_pred = nn.predict(X_test_scaled)
test_accuracy = accuracy_score(y_test, y_pred)
print(f"\n测试集准确率: {test_accuracy:.4f}")

可视化训练过程

# 可视化训练过程
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# 损失曲线
ax1.plot(losses, 'b-', linewidth=2)
ax1.set_xlabel('Epoch')
ax1.set_ylabel('损失值')
ax1.set_title('训练损失曲线')
ax1.grid(True, alpha=0.3)

# 准确率曲线
ax2.plot(accuracies, 'r-', linewidth=2)
ax2.set_xlabel('Epoch')
ax2.set_ylabel('准确率')
ax2.set_title('训练准确率曲线')
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 可视化决策边界
def plot_decision_boundary(model, X, y, title="决策边界"):
    """绘制决策边界"""
    h = 0.02  # 步长
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                          np.arange(y_min, y_max, h))
    
    Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)
    
    plt.figure(figsize=(10, 6))
    plt.contourf(xx, yy, Z, alpha=0.8, cmap=plt.cm.RdYlBu)
    plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.RdYlBu, edgecolors='black')
    plt.xlabel('特征1')
    plt.ylabel('特征2')
    plt.title(title)
    plt.grid(True, alpha=0.3)
    plt.show()

plot_decision_boundary(nn, X_test_scaled, y_test, "反向传播神经网络决策边界")

梯度下降变体

不同优化器比较

class GradientDescentVariants:
    """比较不同的梯度下降变体"""
    
    def __init__(self, layer_sizes):
        self.layer_sizes = layer_sizes
    
    def _init_network(self):
        """初始化网络参数"""
        weights = []
        biases = []
        
        for i in range(len(self.layer_sizes) - 1):
            w = np.random.randn(self.layer_sizes[i], self.layer_sizes[i+1]) * np.sqrt(2.0 / self.layer_sizes[i])
            b = np.zeros((1, self.layer_sizes[i+1]))
            weights.append(w)
            biases.append(b)
        
        return weights, biases
    
    def _forward(self, X, weights, biases):
        """前向传播"""
        current_input = X
        
        for i in range(len(weights) - 1):
            z = np.dot(current_input, weights[i]) + biases[i]
            current_input = np.maximum(0, z)  # ReLU
        
        # 输出层
        z = np.dot(current_input, weights[-1]) + biases[-1]
        output = 1 / (1 + np.exp(-np.clip(z, -250, 250)))  # Sigmoid
        
        return output
    
    def compare_optimizers(self, X_train, y_train, X_test, y_test, epochs=500):
        """比较不同优化器"""
        
        optimizers = {
            'SGD': {'lr': 0.01},
            'Momentum': {'lr': 0.01, 'momentum': 0.9},
            'RMSprop': {'lr': 0.001, 'decay': 0.99},
            'Adam': {'lr': 0.001, 'beta1': 0.9, 'beta2': 0.999}
        }
        
        results = {}
        
        for opt_name, opt_params in optimizers.items():
            print(f"\n训练 {opt_name}:")
            
            # 初始化网络
            weights, biases = self._init_network()
            
            # 初始化动量等参数(如果需要)
            if opt_name == 'Momentum':
                v_weights = [np.zeros_like(w) for w in weights]
                v_biases = [np.zeros_like(b) for b in biases]
            elif opt_name == 'Adam':
                m_weights = [np.zeros_like(w) for w in weights]
                v_weights = [np.zeros_like(w) for w in weights]
                m_biases = [np.zeros_like(b) for b in biases]
                v_biases = [np.zeros_like(b) for b in biases]
            
            losses = []
            
            for epoch in range(epochs):
                # 前向传播
                output = self._forward(X_train, weights, biases)
                
                # 计算损失
                y_train_reshaped = y_train.reshape(-1, 1)
                output_clipped = np.clip(output, 1e-8, 1 - 1e-8)
                loss = -np.mean(y_train_reshaped * np.log(output_clipped) + 
                               (1 - y_train_reshaped) * np.log(1 - output_clipped))
                losses.append(loss)
                
                # 简化的反向传播(这里只更新最后一层)
                m = X_train.shape[0]
                delta = output - y_train_reshaped
                
                # 计算梯度
                dW = np.dot(self._forward(X_train, weights[:-1], biases[:-1]).T, delta) / m
                dB = np.sum(delta, axis=0, keepdims=True) / m
                
                # 更新权重
                if opt_name == 'SGD':
                    weights[-1] -= opt_params['lr'] * dW
                    biases[-1] -= opt_params['lr'] * dB
                elif opt_name == 'Momentum':
                    v_weights[-1] = opt_params['momentum'] * v_weights[-1] - opt_params['lr'] * dW
                    v_biases[-1] = opt_params['momentum'] * v_biases[-1] - opt_params['lr'] * dB
                    weights[-1] += v_weights[-1]
                    biases[-1] += v_biases[-1]
                elif opt_name == 'Adam':
                    # 简化的Adam实现
                    m_weights[-1] = opt_params['beta1'] * m_weights[-1] + (1 - opt_params['beta1']) * dW
                    v_weights[-1] = opt_params['beta2'] * v_weights[-1] + (1 - opt_params['beta2']) * (dW ** 2)
                    m_hat = m_weights[-1] / (1 - opt_params['beta1'] ** (epoch + 1))
                    v_hat = v_weights[-1] / (1 - opt_params['beta2'] ** (epoch + 1))
                    weights[-1] -= opt_params['lr'] * m_hat / (np.sqrt(v_hat) + 1e-8)
                    
                    m_biases[-1] = opt_params['beta1'] * m_biases[-1] + (1 - opt_params['beta1']) * dB
                    v_biases[-1] = opt_params['beta2'] * v_biases[-1] + (1 - opt_params['beta2']) * (dB ** 2)
                    m_hat_b = m_biases[-1] / (1 - opt_params['beta1'] ** (epoch + 1))
                    v_hat_b = v_biases[-1] / (1 - opt_params['beta2'] ** (epoch + 1))
                    biases[-1] -= opt_params['lr'] * m_hat_b / (np.sqrt(v_hat_b) + 1e-8)
            
            # 测试集评估
            test_output = self._forward(X_test, weights, biases)
            test_pred = (test_output > 0.5).astype(int).flatten()
            test_accuracy = accuracy_score(y_test, test_pred)
            
            results[opt_name] = {
                'losses': losses,
                'accuracy': test_accuracy
            }
            
            print(f"最终损失: {losses[-1]:.4f}")
            print(f"测试准确率: {test_accuracy:.4f}")
        
        return results

# 比较不同优化器
gd_variants = GradientDescentVariants(layer_sizes=[2, 10, 1])
results = gd_variants.compare_optimizers(X_train_scaled, y_train, X_test_scaled, y_test, epochs=300)

# 可视化比较
plt.figure(figsize=(10, 6))
for opt_name, result in results.items():
    plt.plot(result['losses'], label=f"{opt_name} (Acc: {result['accuracy']:.3f})", linewidth=2)

plt.xlabel('Epoch')
plt.ylabel('损失值')
plt.title('不同优化器的训练损失比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

实际应用

使用PyTorch实现反向传播

import torch
import torch.nn as nn
import torch.optim as optim

# 转换为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1)

# 定义模型
model = nn.Sequential(
    nn.Linear(2, 10),
    nn.ReLU(),
    nn.Linear(10, 8),
    nn.ReLU(),
    nn.Linear(8, 1),
    nn.Sigmoid()
)

# 定义损失函数和优化器
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
print("使用PyTorch训练神经网络:")
losses = []
for epoch in range(1000):
    # 前向传播
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    
    # 反向传播和优化
    optimizer.zero_grad()
    loss.backward()  # PyTorch自动计算梯度
    optimizer.step()
    
    losses.append(loss.item())
    
    if (epoch + 1) % 200 == 0:
        with torch.no_grad():
            train_outputs = model(X_train_tensor)
            train_accuracy = ((train_outputs > 0.5).float() == y_train_tensor).float().mean()
            test_outputs = model(X_test_tensor)
            test_accuracy = ((test_outputs > 0.5).float() == y_test_tensor).float().mean()
            print(f'Epoch [{epoch+1}/1000], Loss: {loss.item():.4f}, '
                  f'训练准确率: {train_accuracy:.4f}, 测试准确率: {test_accuracy:.4f}')

# 可视化损失曲线
plt.figure(figsize=(10, 6))
plt.plot(losses, 'b-', linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('损失值')
plt.title('PyTorch训练损失曲线')
plt.grid(True, alpha=0.3)
plt.show()

反向传播最佳实践

  1. 梯度检查:验证梯度计算的正确性
  2. 梯度裁剪:防止梯度爆炸
  3. 学习率调度:动态调整学习率
  4. 批量归一化:加速训练和稳定梯度
  5. 权重初始化:使用合适的初始化方法

反向传播是神经网络训练的核心,掌握反向传播算法对于理解深度学习至关重要。