数据工程管道:构建LLM训练数据的基础
--- title: "数据工程管道:构建LLM训练数据的基础" description: "从数据采集到特征工程,构建完整的LLM数据处理管道" tags: ["数据工程", "数据管道", "特征工程", "数据处理", "ETL"] category: "llm" icon: "📊"
数据工程管道:构建LLM训练数据的基础
数据工程在LLM中的角色
数据工程是LLM开发的基础环节,负责构建可靠的数据基础设施,确保训练数据的质量、一致性和可用性。
数据采集与存储
1. 多源数据采集
import json
from pathlib import Path
from datetime import datetime
class MultiSourceCollector:
def __init__(self):
self.sources = {}
def register_source(self, name, fetcher):
self.sources[name] = fetcher
def collect_all(self, output_dir="data/raw"):
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
all_data = {}
for name, fetcher in self.sources.items():
try:
data = fetcher()
all_data[name] = data
filename = f"{name}_{datetime.now().strftime('%Y%m%d')}.json"
with open(output_path / filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✓ 采集 {name}: {len(data)} 条")
except Exception as e:
print(f"✗ 采集 {name} 失败: {e}")
return all_data
# 使用示例
collector = MultiSourceCollector()
collector.register_source("web", lambda: [{"text": "网页数据"}])
collector.register_source("api", lambda: [{"text": "API数据"}])
collector.register_source("db", lambda: [{"text": "数据库数据"}])
2. 数据存储架构
class DataStore:
def __init__(self, base_path="data"):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
def save(self, data, layer, name):
"""保存数据到指定层级"""
layer_path = self.base_path / layer
layer_path.mkdir(exist_ok=True)
filepath = layer_path / f"{name}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return filepath
def load(self, layer, name):
"""从指定层级加载数据"""
filepath = self.base_path / layer / f"{name}.json"
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
# 数据层级架构
# raw -> cleaned -> processed -> training
store = DataStore()
store.save(raw_data, "raw", "web_crawl")
store.save(cleaned_data, "cleaned", "web_crawl")
store.save(processed_data, "processed", "web_crawl")
数据清洗流程
1. 文本清洗
import re
class TextCleaner:
def __init__(self):
self.rules = []
def add_rule(self, rule_func):
self.rules.append(rule_func)
return self
def clean(self, text):
for rule in self.rules:
text = rule(text)
return text
cleaner = TextCleaner()
cleaner.add_rule(lambda t: re.sub(r'<[^>]+>', '', t)) # 移除HTML标签
cleaner.add_rule(lambda t: re.sub(r'\s+', ' ', t)) # 规范化空白
cleaner.add_rule(lambda t: re.sub(r'[^\w\s\u4e00-\u9fff]', '', t)) # 保留中英文字符
cleaner.add_rule(lambda t: t.strip())
# 批量清洗
def batch_clean(texts, cleaner):
return [cleaner.clean(text) for text in texts]
2. 质量过滤
class QualityFilter:
def __init__(self):
self.filters = []
def add_filter(self, filter_func):
self.filters.append(filter_func)
return self
def filter(self, data):
filtered = data
for f in self.filters:
before_count = len(filtered)
filtered = list(filter(f, filtered))
after_count = len(filtered)
print(f"过滤: {before_count} -> {after_count}")
return filtered
quality_filter = QualityFilter()
quality_filter.add_filter(lambda x: len(x.get("text", "")) > 50) # 最小长度
quality_filter.add_filter(lambda x: len(x.get("text", "")) < 10000) # 最大长度
quality_filter.add_filter(lambda x: x.get("text", "").count("。") > 2) # 句子数量
特征工程
1. 文本特征提取
class TextFeatureExtractor:
def __init__(self):
self.features = {}
def register_feature(self, name, extractor):
self.features[name] = extractor
def extract(self, text):
result = {}
for name, extractor in self.features.items():
result[name] = extractor(text)
return result
extractor = TextFeatureExtractor()
extractor.register_feature("length", len)
extractor.register_feature("word_count", lambda t: len(t.split()))
extractor.register_feature("sentence_count", lambda t: t.count("。") + 1)
extractor.register_feature("avg_word_length", lambda t: sum(len(w) for w in t.split()) / max(len(t.split()), 1))
# 提取特征
text = "这是一段用于测试的中文文本,包含多个句子。"
features = extractor.extract(text)
print(features)
2. 数据集划分
def split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, seed=42):
import random
random.seed(seed)
shuffled = data.copy()
random.shuffle(shuffled)
n = len(shuffled)
train_end = int(n * train_ratio)
val_end = train_end + int(n * val_ratio)
return {
"train": shuffled[:train_end],
"val": shuffled[train_end:val_end],
"test": shuffled[val_end:]
}
# 使用示例
splits = split_dataset(processed_data)
print(f"训练集: {len(splits['train'])}, 验证集: {len(splits['val'])}, 测试集: {len(splits['test'])}")
完整数据管道
class DataEngineeringPipeline:
def __init__(self):
self.collector = MultiSourceCollector()
self.cleaner = TextCleaner()
self.quality_filter = QualityFilter()
self.extractor = TextFeatureExtractor()
self.store = DataStore()
def run(self):
# 1. 采集
raw_data = self.collector.collect_all()
# 2. 清洗
cleaned = []
for source, items in raw_data.items():
for item in items:
item["text"] = self.cleaner.clean(item.get("text", ""))
cleaned.append(item)
# 3. 质量过滤
filtered = self.quality_filter.filter(cleaned)
# 4. 特征提取
for item in filtered:
item["features"] = self.extractor.extract(item["text"])
# 5. 划分数据集
splits = split_dataset(filtered)
# 6. 保存
for split_name, data in splits.items():
self.store.save(data, "processed", split_name)
print(f"管道完成: 总计 {len(filtered)} 条数据")
return splits
最佳实践
- 数据血缘追踪:记录数据的来源和处理历史
- 质量监控:实时监控数据质量指标
- 增量更新:支持增量数据处理
- 版本管理:对数据集进行版本控制
总结
数据工程管道是LLM开发的基石。通过系统化的数据采集、清洗、转换和存储,我们可以构建高质量的训练数据集,为模型训练奠定坚实基础。