LLM数据管道:端到端的数据处理流程
--- title: "LLM数据管道:端到端的数据处理流程" description: "构建高效的LLM数据管道,实现从数据采集到模型训练的自动化流程" tags: ["LLM", "数据管道", "Pipeline", "数据处理", "自动化"] category: "llm" icon: "🔗"
LLM数据管道:端到端的数据处理流程
什么是LLM数据管道
LLM数据管道是将数据处理、模型训练和推理部署等环节串联成自动化流程的系统架构。它确保数据从源头到最终应用的每一步都可靠、可追踪、可重复。
数据管道架构
1. 数据采集管道
import json
import hashlib
from pathlib import Path
from datetime import datetime
class DataCollectionPipeline:
def __init__(self, raw_dir="data/raw"):
self.raw_dir = Path(raw_dir)
self.raw_dir.mkdir(parents=True, exist_ok=True)
def collect_from_source(self, source_type, config):
"""从不同来源采集数据"""
collectors = {
"api": self._collect_from_api,
"file": self._collect_from_file,
"database": self._collect_from_database
}
return collectors[source_type](config)
def _collect_from_api(self, config):
import requests
response = requests.get(config["url"], params=config.get("params", {}))
data = response.json()
return self._save_raw(data, config.get("name", "api_data"))
def _collect_from_file(self, config):
with open(config["path"], "r", encoding="utf-8") as f:
data = json.load(f)
return self._save_raw(data, config.get("name", "file_data"))
def _save_raw(self, data, name):
hash_id = hashlib.md5(json.dumps(data, ensure_ascii=False).encode()).hexdigest()[:8]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{name}_{timestamp}_{hash_id}.json"
filepath = self.raw_dir / filename
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"保存原始数据: {filepath}")
return filepath
# 使用示例
pipeline = DataCollectionPipeline()
pipeline.collect_from_source("api", {
"url": "https://api.example.com/data",
"name": "user_feedback"
})
2. 数据清洗管道
import re
from typing import List, Dict
class DataCleaningPipeline:
def __init__(self):
self.cleaners = []
def add_cleaner(self, cleaner_func):
self.cleaners.append(cleaner_func)
return self
def clean(self, data: List[Dict]) -> List[Dict]:
cleaned = data
for cleaner in self.cleaners:
cleaned = cleaner(cleaned)
print(f"执行清洗: {cleaner.__name__}, 剩余 {len(cleaned)} 条")
return cleaned
@staticmethod
def remove_duplicates(data):
seen = set()
unique_data = []
for item in data:
content = item.get("content", "")
if content not in seen:
seen.add(content)
unique_data.append(item)
return unique_data
@staticmethod
def remove_empty(data):
return [item for item in data if item.get("content", "").strip()]
@staticmethod
def normalize_text(data):
for item in data:
text = item.get("content", "")
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?,。!?]', '', text)
item["content"] = text.strip()
return data
# 构建清洗管道
cleaner = DataCleaningPipeline()
cleaner.add_cleaner(DataCleaningPipeline.remove_empty)
cleaner.add_cleaner(DataCleaningPipeline.remove_duplicates)
cleaner.add_cleaner(DataCleaningPipeline.normalize_text)
3. 数据转换管道
class DataTransformPipeline:
def __init__(self):
self.transforms = []
def add_transform(self, transform_func):
self.transforms.append(transform_func)
return self
def transform(self, data):
result = data
for transform in self.transforms:
result = transform(result)
return result
@staticmethod
def format_for_training(data, format_type="alpaca"):
if format_type == "alpaca":
return [{
"instruction": item.get("question", ""),
"input": "",
"output": item.get("answer", "")
} for item in data]
elif format_type == "sharegpt":
return [{
"conversations": [
{"from": "human", "value": item.get("question", "")},
{"from": "gpt", "value": item.get("answer", "")}
]
} for item in data]
@staticmethod
def add_metadata(data):
for i, item in enumerate(data):
item["id"] = f"sample_{i:06d}"
item["source"] = "pipeline"
item["created_at"] = datetime.now().isoformat()
return data
# 使用示例
transformer = DataTransformPipeline()
transformer.add_transform(DataTransformPipeline.add_metadata)
transformer.add_transform(lambda d: DataTransformPipeline.format_for_training(d, "alpaca"))
完整管道示例
class LLMPipeline:
def __init__(self):
self.collector = DataCollectionPipeline()
self.cleaner = DataCleaningPipeline()
self.transformer = DataTransformPipeline()
self.output_dir = Path("data/processed")
self.output_dir.mkdir(parents=True, exist_ok=True)
def run(self, source_config):
# 采集
raw_path = self.collector.collect_from_source(
source_config["type"], source_config
)
with open(raw_path, "r", encoding="utf-8") as f:
raw_data = json.load(f)
# 清洗
cleaned_data = self.cleaner.clean(raw_data)
# 转换
final_data = self.transformer.transform(cleaned_data)
# 保存
output_path = self.output_dir / "training_data.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(final_data, f, ensure_ascii=False, indent=2)
print(f"管道完成: {len(final_data)} 条数据已保存")
return output_path
# 执行管道
pipeline = LLMPipeline()
pipeline.run({"type": "file", "path": "data/raw/input.json", "name": "dataset"})
最佳实践
- 版本控制:对每个版本的数据进行版本管理
- 数据验证:在每个管道节点添加数据质量检查
- 监控告警:监控管道执行状态和数据质量指标
- 增量处理:支持增量数据处理,避免重复计算
总结
LLM数据管道是数据驱动应用的基础设施。通过模块化设计,我们可以灵活组合各种数据处理组件,构建可靠、高效的数据流水线。