← 返回首页
🧠

数据工程管道:构建LLM训练数据的基础

📂 llm ⏱ 3 min 529 words

--- title: "数据工程管道:构建LLM训练数据的基础" description: "从数据采集到特征工程,构建完整的LLM数据处理管道" tags: ["数据工程", "数据管道", "特征工程", "数据处理", "ETL"] category: "llm" icon: "📊"

数据工程管道:构建LLM训练数据的基础

数据工程在LLM中的角色

数据工程是LLM开发的基础环节,负责构建可靠的数据基础设施,确保训练数据的质量、一致性和可用性。

数据采集与存储

1. 多源数据采集

import json
from pathlib import Path
from datetime import datetime

class MultiSourceCollector:
    def __init__(self):
        self.sources = {}
    
    def register_source(self, name, fetcher):
        self.sources[name] = fetcher
    
    def collect_all(self, output_dir="data/raw"):
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        all_data = {}
        for name, fetcher in self.sources.items():
            try:
                data = fetcher()
                all_data[name] = data
                
                filename = f"{name}_{datetime.now().strftime('%Y%m%d')}.json"
                with open(output_path / filename, "w", encoding="utf-8") as f:
                    json.dump(data, f, ensure_ascii=False, indent=2)
                print(f"✓ 采集 {name}: {len(data)} 条")
            except Exception as e:
                print(f"✗ 采集 {name} 失败: {e}")
        
        return all_data

# 使用示例
collector = MultiSourceCollector()
collector.register_source("web", lambda: [{"text": "网页数据"}])
collector.register_source("api", lambda: [{"text": "API数据"}])
collector.register_source("db", lambda: [{"text": "数据库数据"}])

2. 数据存储架构

class DataStore:
    def __init__(self, base_path="data"):
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
    
    def save(self, data, layer, name):
        """保存数据到指定层级"""
        layer_path = self.base_path / layer
        layer_path.mkdir(exist_ok=True)
        
        filepath = layer_path / f"{name}.json"
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        return filepath
    
    def load(self, layer, name):
        """从指定层级加载数据"""
        filepath = self.base_path / layer / f"{name}.json"
        with open(filepath, "r", encoding="utf-8") as f:
            return json.load(f)

# 数据层级架构
# raw -> cleaned -> processed -> training
store = DataStore()
store.save(raw_data, "raw", "web_crawl")
store.save(cleaned_data, "cleaned", "web_crawl")
store.save(processed_data, "processed", "web_crawl")

数据清洗流程

1. 文本清洗

import re

class TextCleaner:
    def __init__(self):
        self.rules = []
    
    def add_rule(self, rule_func):
        self.rules.append(rule_func)
        return self
    
    def clean(self, text):
        for rule in self.rules:
            text = rule(text)
        return text

cleaner = TextCleaner()
cleaner.add_rule(lambda t: re.sub(r'<[^>]+>', '', t))  # 移除HTML标签
cleaner.add_rule(lambda t: re.sub(r'\s+', ' ', t))     # 规范化空白
cleaner.add_rule(lambda t: re.sub(r'[^\w\s\u4e00-\u9fff]', '', t))  # 保留中英文字符
cleaner.add_rule(lambda t: t.strip())

# 批量清洗
def batch_clean(texts, cleaner):
    return [cleaner.clean(text) for text in texts]

2. 质量过滤

class QualityFilter:
    def __init__(self):
        self.filters = []
    
    def add_filter(self, filter_func):
        self.filters.append(filter_func)
        return self
    
    def filter(self, data):
        filtered = data
        for f in self.filters:
            before_count = len(filtered)
            filtered = list(filter(f, filtered))
            after_count = len(filtered)
            print(f"过滤: {before_count} -> {after_count}")
        return filtered

quality_filter = QualityFilter()
quality_filter.add_filter(lambda x: len(x.get("text", "")) > 50)  # 最小长度
quality_filter.add_filter(lambda x: len(x.get("text", "")) < 10000)  # 最大长度
quality_filter.add_filter(lambda x: x.get("text", "").count("。") > 2)  # 句子数量

特征工程

1. 文本特征提取

class TextFeatureExtractor:
    def __init__(self):
        self.features = {}
    
    def register_feature(self, name, extractor):
        self.features[name] = extractor
    
    def extract(self, text):
        result = {}
        for name, extractor in self.features.items():
            result[name] = extractor(text)
        return result

extractor = TextFeatureExtractor()
extractor.register_feature("length", len)
extractor.register_feature("word_count", lambda t: len(t.split()))
extractor.register_feature("sentence_count", lambda t: t.count("。") + 1)
extractor.register_feature("avg_word_length", lambda t: sum(len(w) for w in t.split()) / max(len(t.split()), 1))

# 提取特征
text = "这是一段用于测试的中文文本,包含多个句子。"
features = extractor.extract(text)
print(features)

2. 数据集划分

def split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, seed=42):
    import random
    random.seed(seed)
    
    shuffled = data.copy()
    random.shuffle(shuffled)
    
    n = len(shuffled)
    train_end = int(n * train_ratio)
    val_end = train_end + int(n * val_ratio)
    
    return {
        "train": shuffled[:train_end],
        "val": shuffled[train_end:val_end],
        "test": shuffled[val_end:]
    }

# 使用示例
splits = split_dataset(processed_data)
print(f"训练集: {len(splits['train'])}, 验证集: {len(splits['val'])}, 测试集: {len(splits['test'])}")

完整数据管道

class DataEngineeringPipeline:
    def __init__(self):
        self.collector = MultiSourceCollector()
        self.cleaner = TextCleaner()
        self.quality_filter = QualityFilter()
        self.extractor = TextFeatureExtractor()
        self.store = DataStore()
    
    def run(self):
        # 1. 采集
        raw_data = self.collector.collect_all()
        
        # 2. 清洗
        cleaned = []
        for source, items in raw_data.items():
            for item in items:
                item["text"] = self.cleaner.clean(item.get("text", ""))
                cleaned.append(item)
        
        # 3. 质量过滤
        filtered = self.quality_filter.filter(cleaned)
        
        # 4. 特征提取
        for item in filtered:
            item["features"] = self.extractor.extract(item["text"])
        
        # 5. 划分数据集
        splits = split_dataset(filtered)
        
        # 6. 保存
        for split_name, data in splits.items():
            self.store.save(data, "processed", split_name)
        
        print(f"管道完成: 总计 {len(filtered)} 条数据")
        return splits

最佳实践

  1. 数据血缘追踪:记录数据的来源和处理历史
  2. 质量监控:实时监控数据质量指标
  3. 增量更新:支持增量数据处理
  4. 版本管理:对数据集进行版本控制

总结

数据工程管道是LLM开发的基石。通过系统化的数据采集、清洗、转换和存储,我们可以构建高质量的训练数据集,为模型训练奠定坚实基础。