← 返回首页
🧠

特征归因在LLM中的应用

📂 llm ⏱ 8 min 1420 words

--- title: "特征归因在LLM中的应用" description: "介绍特征归因技术在大型语言模型解释和理解中的应用。" tags: ["特征归因", "llm", "模型解释", "可解释性", "归因方法"] category: "llm" icon: "🧠"

特征归因在LLM中的应用

什么是特征归因?

特征归因是确定输入特征对模型输出贡献程度的技术,帮助理解哪些输入元素对模型预测影响最大。

特征归因原理

1. 基本特征归因

import torch
import numpy as np
import matplotlib.pyplot as plt

class FeatureAttributor:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def compute_attention_attribution(self, input_ids):
        """计算注意力归因"""
        self.model.eval()
        
        # 前向传播并保存注意力权重
        attention_weights = []
        
        def attention_hook(module, input, output):
            if isinstance(output, tuple) and len(output) > 1:
                attention_weights.append(output[1].detach())
        
        # 注册钩子
        hooks = []
        for name, module in self.model.named_modules():
            if hasattr(module, 'attention') or 'attention' in name.lower():
                hook = module.register_forward_hook(attention_hook)
                hooks.append(hook)
        
        # 前向传播
        with torch.no_grad():
            outputs = self.model(input_ids)
        
        # 移除钩子
        for hook in hooks:
            hook.remove()
        
        return attention_weights
    
    def compute_gradient_attribution(self, input_ids, target_token_idx):
        """计算梯度归因"""
        self.model.train()
        self.model.zero_grad()
        
        # 前向传播
        outputs = self.model(input_ids)
        logits = outputs.logits if hasattr(outputs, 'logits') else outputs
        
        # 计算目标token的梯度
        target_logits = logits[0, target_token_idx, :]
        target_class = target_logits.argmax()
        
        # 反向传播
        target_logits[target_class].backward()
        
        # 计算输入嵌入的梯度
        embeddings = self.model.get_input_embeddings()(input_ids)
        gradients = embeddings.grad
        
        # 计算归因(梯度 × 输入)
        attribution = (gradients * embeddings).sum(dim=-1)
        
        return attribution.detach().cpu().numpy()
    
    def compute_integrated_gradients(self, input_ids, target_token_idx, 
                                     n_steps=50):
        """计算积分梯度"""
        self.model.eval()
        
        # 获取嵌入层
        embeddings = self.model.get_input_embeddings()
        
        # 基线嵌入(全零)
        baseline = torch.zeros_like(embeddings(input_ids))
        
        # 插值路径
        alphas = torch.linspace(0, 1, n_steps)
        
        # 计算积分梯度
        total_gradients = torch.zeros_like(embeddings(input_ids))
        
        for alpha in alphas:
            # 插值嵌入
            interpolated = baseline + alpha * (embeddings(input_ids) - baseline)
            interpolated.requires_grad_(True)
            
            # 前向传播
            outputs = self.model(inputs_embeds=interpolated)
            logits = outputs.logits if hasattr(outputs, 'logits') else outputs
            
            # 计算梯度
            target_logits = logits[0, target_token_idx, :]
            target_class = target_logits.argmax()
            target_logits[target_class].backward()
            
            # 累积梯度
            total_gradients += interpolated.grad
        
        # 计算平均梯度
        avg_gradients = total_gradients / n_steps
        
        # 计算归因
        attribution = (embeddings(input_ids) - baseline) * avg_gradients
        attribution = attribution.sum(dim=-1)
        
        return attribution.detach().cpu().numpy()

2. 归因可视化

class AttributionVisualizer:
    def __init__(self):
        self.figures = {}
    
    def plot_token_attribution(self, tokens, attributions, 
                               title="Token Attribution",
                               figsize=(12, 4)):
        """绘制token归因图"""
        fig, ax = plt.subplots(figsize=figsize)
        
        # 归一化归因值
        if len(attributions) > 0:
            max_abs = max(abs(min(attributions)), abs(max(attributions)))
            if max_abs > 0:
                attributions = [a / max_abs for a in attributions]
        
        # 绘制条形图
        colors = ['red' if a < 0 else 'blue' for a in attributions]
        bars = ax.bar(range(len(tokens)), attributions, color=colors, alpha=0.7)
        
        # 添加token标签
        ax.set_xticks(range(len(tokens)))
        ax.set_xticklabels(tokens, rotation=45, ha='right')
        
        ax.set_ylabel('Attribution Score')
        ax.set_title(title)
        ax.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
        
        plt.tight_layout()
        return fig
    
    def plot_heatmap_attribution(self, tokens, attributions_matrix,
                                 title="Heatmap Attribution",
                                 figsize=(12, 8)):
        """绘制热力图归因"""
        fig, ax = plt.subplots(figsize=figsize)
        
        # 绘制热力图
        im = ax.imshow(attributions_matrix, cmap='RdBu_r', aspect='auto')
        
        # 添加token标签
        ax.set_xticks(range(len(tokens)))
        ax.set_xticklabels(tokens, rotation=45, ha='right')
        
        ax.set_title(title)
        plt.colorbar(im)
        
        plt.tight_layout()
        return fig
    
    def plot_attribution_comparison(self, tokens, attributions_dict,
                                    title="Attribution Comparison",
                                    figsize=(14, 6)):
        """绘制归因比较图"""
        fig, axes = plt.subplots(1, len(attributions_dict), figsize=figsize)
        
        if len(attributions_dict) == 1:
            axes = [axes]
        
        for ax, (method_name, attributions) in zip(axes, attributions_dict.items()):
            # 归一化
            if len(attributions) > 0:
                max_abs = max(abs(min(attributions)), abs(max(attributions)))
                if max_abs > 0:
                    attributions = [a / max_abs for a in attributions]
            
            # 绘制条形图
            colors = ['red' if a < 0 else 'blue' for a in attributions]
            ax.bar(range(len(tokens)), attributions, color=colors, alpha=0.7)
            
            ax.set_xticks(range(len(tokens)))
            ax.set_xticklabels(tokens, rotation=45, ha='right', fontsize=8)
            ax.set_ylabel('Score')
            ax.set_title(method_name)
            ax.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
        
        plt.suptitle(title)
        plt.tight_layout()
        return fig

3. 归因聚合

class AttributionAggregator:
    def __init__(self):
        self.attributions = {}
    
    def add_attribution(self, method_name, tokens, attributions):
        """添加归因结果"""
        self.attributions[method_name] = {
            'tokens': tokens,
            'attributions': attributions
        }
    
    def aggregate_attributions(self, method='average'):
        """聚合多种方法的归因结果"""
        if not self.attributions:
            return None
        
        # 获取所有方法的tokens
        all_tokens = set()
        for data in self.attributions.values():
            all_tokens.update(data['tokens'])
        
        all_tokens = sorted(all_tokens)
        
        # 聚合归因值
        aggregated = {}
        
        for token in all_tokens:
            token_attributions = []
            
            for method_name, data in self.attributions.items():
                if token in data['tokens']:
                    idx = data['tokens'].index(token)
                    token_attributions.append(data['attributions'][idx])
            
            if token_attributions:
                if method == 'average':
                    aggregated[token] = np.mean(token_attributions)
                elif method == 'max':
                    aggregated[token] = max(token_attributions, key=abs)
                elif method == 'product':
                    aggregated[token] = np.prod(token_attributions)
        
        return aggregated
    
    def rank_tokens(self, top_k=10):
        """对token进行排名"""
        aggregated = self.aggregate_attributions()
        
        if not aggregated:
            return []
        
        # 按绝对值排序
        ranked = sorted(aggregated.items(), 
                       key=lambda x: abs(x[1]), 
                       reverse=True)
        
        return ranked[:top_k]

LLM特征归因实践

1. 文本分类归因

class TextClassificationAttributor:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def explain_prediction(self, text, target_class=None):
        """解释分类预测"""
        # 编码文本
        inputs = self.tokenizer(text, return_tensors="pt")
        tokens = self.tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
        
        # 获取预测
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
        
        if target_class is None:
            target_class = logits.argmax(dim=-1).item()
        
        # 计算多种归因方法
        attributions = {}
        
        # 方法1:注意力归因
        attention_attr = self._compute_attention_attribution(inputs['input_ids'])
        attributions['attention'] = attention_attr
        
        # 方法2:梯度归因
        gradient_attr = self._compute_gradient_attribution(
            inputs['input_ids'], target_class
        )
        attributions['gradient'] = gradient_attr
        
        # 方法3:积分梯度
        ig_attr = self._compute_integrated_gradients(
            inputs['input_ids'], target_class
        )
        attributions['integrated_gradients'] = ig_attr
        
        return {
            'tokens': tokens,
            'attributions': attributions,
            'prediction': target_class,
            'confidence': torch.softmax(logits, dim=-1)[0][target_class].item()
        }
    
    def _compute_attention_attribution(self, input_ids):
        """计算注意力归因"""
        # 实现类似于之前的注意力归因方法
        return np.random.randn(input_ids.shape[1])  # 占位符
    
    def _compute_gradient_attribution(self, input_ids, target_class):
        """计算梯度归因"""
        # 实现梯度归因
        return np.random.randn(input_ids.shape[1])  # 占位符
    
    def _compute_integrated_gradients(self, input_ids, target_class):
        """计算积分梯度"""
        # 实现积分梯度
        return np.random.randn(input_ids.shape[1])  # 占位符
    
    def visualize_explanation(self, explanation):
        """可视化解释"""
        visualizer = AttributionVisualizer()
        
        # 绘制每种方法的归因
        figs = {}
        for method, attributions in explanation['attributions'].items():
            fig = visualizer.plot_token_attribution(
                explanation['tokens'],
                attributions,
                title=f"Attribution Method: {method}"
            )
            figs[method] = fig
        
        # 绘制比较图
        comparison_fig = visualizer.plot_attribution_comparison(
            explanation['tokens'],
            explanation['attributions'],
            title="Attribution Methods Comparison"
        )
        
        return figs, comparison_fig

2. 问答系统归因

class QASystemAttributor:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def explain_qa(self, question, context):
        """解释问答预测"""
        # 编码输入
        inputs = self.tokenizer(
            question, context, 
            return_tensors="pt",
            max_length=512,
            truncation=True,
            padding=True
        )
        
        tokens = self.tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
        
        # 获取预测
        with torch.no_grad():
            outputs = self.model(**inputs)
            start_logits = outputs.start_logits
            end_logits = outputs.end_logits
        
        # 获取答案位置
        start_idx = start_logits.argmax(dim=-1).item()
        end_idx = end_logits.argmax(dim=-1).item()
        
        # 计算归因
        attributions = self._compute_qa_attributions(
            inputs, start_idx, end_idx
        )
        
        # 提取答案
        answer_tokens = tokens[start_idx:end_idx+1]
        answer = self.tokenizer.convert_tokens_to_string(answer_tokens)
        
        return {
            'question': question,
            'context': context,
            'answer': answer,
            'tokens': tokens,
            'attributions': attributions,
            'start_idx': start_idx,
            'end_idx': end_idx
        }
    
    def _compute_qa_attributions(self, inputs, start_idx, end_idx):
        """计算QA归因"""
        attributions = {}
        
        # 方法1:注意力归因
        attributions['attention'] = self._attention_attribution(inputs)
        
        # 方法2:梯度归因
        attributions['gradient'] = self._gradient_attribution(
            inputs, start_idx, end_idx
        )
        
        # 方法3:LIME归因
        attributions['lime'] = self._lime_attribution(inputs, start_idx, end_idx)
        
        return attributions
    
    def _attention_attribution(self, inputs):
        """注意力归因"""
        return np.random.randn(inputs['input_ids'].shape[1])
    
    def _gradient_attribution(self, inputs, start_idx, end_idx):
        """梯度归因"""
        return np.random.randn(inputs['input_ids'].shape[1])
    
    def _lime_attribution(self, inputs, start_idx, end_idx, n_samples=100):
        """LIME归因"""
        # 简化的LIME实现
        n_features = inputs['input_ids'].shape[1]
        attributions = np.zeros(n_features)
        
        for _ in range(n_samples):
            # 随机掩码
            mask = np.random.binomial(1, 0.5, n_features)
            
            # 应用掩码
            masked_inputs = inputs['input_ids'].clone()
            masked_inputs[0][mask == 0] = self.tokenizer.pad_token_id
            
            # 获取预测
            with torch.no_grad():
                outputs = self.model(**{'input_ids': masked_inputs})
                start_logits = outputs.start_logits
                end_logits = outputs.end_logits
            
            # 计算权重
            weight = np.exp(-np.sum((1 - mask) ** 2) / 2)
            
            # 更新归因
            for i in range(n_features):
                if mask[i] == 1:
                    attributions[i] += weight
        
        # 归一化
        attributions /= np.sum(np.abs(attributions)) + 1e-10
        
        return attributions
    
    def visualize_qa_explanation(self, explanation):
        """可视化QA解释"""
        visualizer = AttributionVisualizer()
        
        # 绘制归因图
        figs = {}
        for method, attributions in explanation['attributions'].items():
            fig = visualizer.plot_token_attribution(
                explanation['tokens'],
                attributions,
                title=f"QA Attribution: {method}"
            )
            figs[method] = fig
        
        # 高亮答案
        answer_highlight = self._highlight_answer(explanation)
        
        return figs, answer_highlight
    
    def _highlight_answer(self, explanation):
        """高亮答案"""
        tokens = explanation['tokens']
        start_idx = explanation['start_idx']
        end_idx = explanation['end_idx']
        
        # 创建高亮文本
        highlighted_tokens = []
        for i, token in enumerate(tokens):
            if start_idx <= i <= end_idx:
                highlighted_tokens.append(f"**{token}**")
            else:
                highlighted_tokens.append(token)
        
        return ' '.join(highlighted_tokens)

3. 生成模型归因

class GenerationAttributor:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def explain_generation(self, prompt, generated_text):
        """解释生成过程"""
        # 编码输入
        prompt_inputs = self.tokenizer(prompt, return_tensors="pt")
        prompt_tokens = self.tokenizer.convert_ids_to_tokens(
            prompt_inputs['input_ids'][0]
        )
        
        # 编码生成文本
        full_text = prompt + generated_text
        full_inputs = self.tokenizer(full_text, return_tensors="pt")
        full_tokens = self.tokenizer.convert_ids_to_tokens(
            full_inputs['input_ids'][0]
        )
        
        # 计算每个生成token的归因
        prompt_len = len(prompt_tokens)
        generated_len = len(full_tokens) - prompt_len
        
        attributions = {}
        
        for i in range(prompt_len, len(full_tokens)):
            token = full_tokens[i]
            token_attributions = self._compute_token_attribution(
                full_inputs['input_ids'], i, prompt_len
            )
            attributions[token] = token_attributions
        
        return {
            'prompt': prompt,
            'generated_text': generated_text,
            'prompt_tokens': prompt_tokens,
            'generated_tokens': full_tokens[prompt_len:],
            'attributions': attributions
        }
    
    def _compute_token_attribution(self, input_ids, target_idx, prompt_len):
        """计算单个token的归因"""
        # 使用多种归因方法
        attributions = {}
        
        # 方法1:基于梯度的归因
        attributions['gradient'] = self._gradient_attribution(
            input_ids, target_idx
        )
        
        # 方法2:基于注意力的归因
        attributions['attention'] = self._attention_attribution(
            input_ids, target_idx
        )
        
        return attributions
    
    def _gradient_attribution(self, input_ids, target_idx):
        """梯度归因"""
        self.model.train()
        self.model.zero_grad()
        
        # 前向传播
        outputs = self.model(input_ids)
        logits = outputs.logits if hasattr(outputs, 'logits') else outputs
        
        # 计算目标token的梯度
        target_logits = logits[0, target_idx, :]
        target_class = target_logits.argmax()
        
        # 反向传播
        target_logits[target_class].backward()
        
        # 获取嵌入梯度
        embeddings = self.model.get_input_embeddings()(input_ids)
        gradients = embeddings.grad
        
        # 计算归因
        attribution = (gradients * embeddings).sum(dim=-1)
        
        return attribution.detach().cpu().numpy().flatten()
    
    def _attention_attribution(self, input_ids, target_idx):
        """注意力归因"""
        # 实现注意力归因
        return np.random.randn(input_ids.shape[1])  # 占位符
    
    def visualize_generation_attribution(self, explanation):
        """可视化生成归因"""
        visualizer = AttributionVisualizer()
        
        # 创建归因热力图
        generated_tokens = explanation['generated_tokens']
        prompt_tokens = explanation['prompt_tokens']
        
        # 收集所有归因
        all_attributions = []
        for token, token_attr in explanation['attributions'].items():
            for method, attr in token_attr.items():
                all_attributions.append(attr)
        
        if all_attributions:
            # 平均所有归因
            avg_attributions = np.mean(all_attributions, axis=0)
            
            # 绘制热力图
            fig = visualizer.plot_token_attribution(
                prompt_tokens,
                avg_attributions[:len(prompt_tokens)],
                title="Prompt Attribution for Generation"
            )
            
            return fig
        
        return None

实际应用案例

案例:LLM特征归因分析系统

# LLM特征归因分析系统
class LLMFeatureAttributionSystem:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.classification_attributor = TextClassificationAttributor(model, tokenizer)
        self.qa_attributor = QASystemAttributor(model, tokenizer)
        self.generation_attributor = GenerationAttributor(model, tokenizer)
        self.aggregator = AttributionAggregator()
    
    def analyze_text_classification(self, text, target_class=None):
        """分析文本分类归因"""
        explanation = self.classification_attributor.explain_prediction(
            text, target_class
        )
        
        # 添加到聚合器
        for method, attributions in explanation['attributions'].items():
            self.aggregator.add_attribution(
                f"classification_{method}",
                explanation['tokens'],
                attributions
            )
        
        # 可视化
        figs, comparison_fig = self.classification_attributor.visualize_explanation(
            explanation
        )
        
        return {
            'explanation': explanation,
            'visualizations': figs,
            'comparison': comparison_fig
        }
    
    def analyze_qa(self, question, context):
        """分析问答归因"""
        explanation = self.qa_attributor.explain_qa(question, context)
        
        # 添加到聚合器
        for method, attributions in explanation['attributions'].items():
            self.aggregator.add_attribution(
                f"qa_{method}",
                explanation['tokens'],
                attributions
            )
        
        # 可视化
        figs, answer_highlight = self.qa_attributor.visualize_qa_explanation(
            explanation
        )
        
        return {
            'explanation': explanation,
            'visualizations': figs,
            'answer_highlight': answer_highlight
        }
    
    def analyze_generation(self, prompt, generated_text):
        """分析生成归因"""
        explanation = self.generation_attributor.explain_generation(
            prompt, generated_text
        )
        
        # 可视化
        fig = self.generation_attributor.visualize_generation_attribution(
            explanation
        )
        
        return {
            'explanation': explanation,
            'visualization': fig
        }
    
    def get_aggregated_ranking(self, top_k=10):
        """获取聚合排名"""
        return self.aggregator.rank_tokens(top_k)
    
    def generate_comprehensive_report(self):
        """生成综合报告"""
        report = {
            'total_analyses': len(self.aggregator.attributions),
            'methods_used': list(self.aggregator.attributions.keys()),
            'top_tokens': self.get_aggregated_ranking(20),
            'recommendations': self._generate_recommendations()
        }
        
        return report
    
    def _generate_recommendations(self):
        """生成建议"""
        recommendations = []
        
        top_tokens = self.get_aggregated_ranking(5)
        
        if top_tokens:
            # 分析top token的模式
            token_texts = [token for token, _ in top_tokens]
            
            # 检查是否主要是标点符号
            punctuation_ratio = sum(1 for t in token_texts if t in '.,!?;:') / len(token_texts)
            if punctuation_ratio > 0.5:
                recommendations.append("归因主要集中在标点符号,可能需要调整模型或归因方法")
            
            # 检查是否主要是停用词
            stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'in', 'on', 'at'}
            stopwords_ratio = sum(1 for t in token_texts if t.lower() in stopwords) / len(token_texts)
            if stopwords_ratio > 0.5:
                recommendations.append("归因主要集中在停用词,可能需要更好的特征选择")
        
        return recommendations

# 使用示例
# system = LLMFeatureAttributionSystem(model, tokenizer)
# 
# # 分析文本分类
# classification_result = system.analyze_text_classification(
#     "This movie was really great!"
# )
# 
# # 分析问答
# qa_result = system.analyze_qa(
#     "What is machine learning?",
#     "Machine learning is a subset of artificial intelligence..."
# )
# 
# # 生成报告
# report = system.generate_comprehensive_report()

总结

特征归因是理解和解释LLM的重要技术:

  1. 模型理解 - 帮助理解模型决策依据
  2. 调试工具 - 诊断模型行为异常
  3. 信任建立 - 增强用户对模型的信任
  4. 改进指导 - 指导模型和数据改进
  5. 合规要求 - 满足可解释性法规要求

通过特征归因分析,我们可以更好地理解LLM如何做出决策,提高模型的透明度和可信度。