← 返回首页
🧠

LLM数据管道:端到端的数据处理流程

📂 llm ⏱ 3 min 443 words

--- title: "LLM数据管道:端到端的数据处理流程" description: "构建高效的LLM数据管道,实现从数据采集到模型训练的自动化流程" tags: ["LLM", "数据管道", "Pipeline", "数据处理", "自动化"] category: "llm" icon: "🔗"

LLM数据管道:端到端的数据处理流程

什么是LLM数据管道

LLM数据管道是将数据处理、模型训练和推理部署等环节串联成自动化流程的系统架构。它确保数据从源头到最终应用的每一步都可靠、可追踪、可重复。

数据管道架构

1. 数据采集管道

import json
import hashlib
from pathlib import Path
from datetime import datetime

class DataCollectionPipeline:
    def __init__(self, raw_dir="data/raw"):
        self.raw_dir = Path(raw_dir)
        self.raw_dir.mkdir(parents=True, exist_ok=True)
    
    def collect_from_source(self, source_type, config):
        """从不同来源采集数据"""
        collectors = {
            "api": self._collect_from_api,
            "file": self._collect_from_file,
            "database": self._collect_from_database
        }
        return collectors[source_type](config)
    
    def _collect_from_api(self, config):
        import requests
        response = requests.get(config["url"], params=config.get("params", {}))
        data = response.json()
        return self._save_raw(data, config.get("name", "api_data"))
    
    def _collect_from_file(self, config):
        with open(config["path"], "r", encoding="utf-8") as f:
            data = json.load(f)
        return self._save_raw(data, config.get("name", "file_data"))
    
    def _save_raw(self, data, name):
        hash_id = hashlib.md5(json.dumps(data, ensure_ascii=False).encode()).hexdigest()[:8]
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{name}_{timestamp}_{hash_id}.json"
        filepath = self.raw_dir / filename
        
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        print(f"保存原始数据: {filepath}")
        return filepath

# 使用示例
pipeline = DataCollectionPipeline()
pipeline.collect_from_source("api", {
    "url": "https://api.example.com/data",
    "name": "user_feedback"
})

2. 数据清洗管道

import re
from typing import List, Dict

class DataCleaningPipeline:
    def __init__(self):
        self.cleaners = []
    
    def add_cleaner(self, cleaner_func):
        self.cleaners.append(cleaner_func)
        return self
    
    def clean(self, data: List[Dict]) -> List[Dict]:
        cleaned = data
        for cleaner in self.cleaners:
            cleaned = cleaner(cleaned)
            print(f"执行清洗: {cleaner.__name__}, 剩余 {len(cleaned)} 条")
        return cleaned
    
    @staticmethod
    def remove_duplicates(data):
        seen = set()
        unique_data = []
        for item in data:
            content = item.get("content", "")
            if content not in seen:
                seen.add(content)
                unique_data.append(item)
        return unique_data
    
    @staticmethod
    def remove_empty(data):
        return [item for item in data if item.get("content", "").strip()]
    
    @staticmethod
    def normalize_text(data):
        for item in data:
            text = item.get("content", "")
            text = re.sub(r'\s+', ' ', text)
            text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?,。!?]', '', text)
            item["content"] = text.strip()
        return data

# 构建清洗管道
cleaner = DataCleaningPipeline()
cleaner.add_cleaner(DataCleaningPipeline.remove_empty)
cleaner.add_cleaner(DataCleaningPipeline.remove_duplicates)
cleaner.add_cleaner(DataCleaningPipeline.normalize_text)

3. 数据转换管道

class DataTransformPipeline:
    def __init__(self):
        self.transforms = []
    
    def add_transform(self, transform_func):
        self.transforms.append(transform_func)
        return self
    
    def transform(self, data):
        result = data
        for transform in self.transforms:
            result = transform(result)
        return result
    
    @staticmethod
    def format_for_training(data, format_type="alpaca"):
        if format_type == "alpaca":
            return [{
                "instruction": item.get("question", ""),
                "input": "",
                "output": item.get("answer", "")
            } for item in data]
        elif format_type == "sharegpt":
            return [{
                "conversations": [
                    {"from": "human", "value": item.get("question", "")},
                    {"from": "gpt", "value": item.get("answer", "")}
                ]
            } for item in data]
    
    @staticmethod
    def add_metadata(data):
        for i, item in enumerate(data):
            item["id"] = f"sample_{i:06d}"
            item["source"] = "pipeline"
            item["created_at"] = datetime.now().isoformat()
        return data

# 使用示例
transformer = DataTransformPipeline()
transformer.add_transform(DataTransformPipeline.add_metadata)
transformer.add_transform(lambda d: DataTransformPipeline.format_for_training(d, "alpaca"))

完整管道示例

class LLMPipeline:
    def __init__(self):
        self.collector = DataCollectionPipeline()
        self.cleaner = DataCleaningPipeline()
        self.transformer = DataTransformPipeline()
        self.output_dir = Path("data/processed")
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def run(self, source_config):
        # 采集
        raw_path = self.collector.collect_from_source(
            source_config["type"], source_config
        )
        with open(raw_path, "r", encoding="utf-8") as f:
            raw_data = json.load(f)
        
        # 清洗
        cleaned_data = self.cleaner.clean(raw_data)
        
        # 转换
        final_data = self.transformer.transform(cleaned_data)
        
        # 保存
        output_path = self.output_dir / "training_data.json"
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(final_data, f, ensure_ascii=False, indent=2)
        
        print(f"管道完成: {len(final_data)} 条数据已保存")
        return output_path

# 执行管道
pipeline = LLMPipeline()
pipeline.run({"type": "file", "path": "data/raw/input.json", "name": "dataset"})

最佳实践

  1. 版本控制:对每个版本的数据进行版本管理
  2. 数据验证:在每个管道节点添加数据质量检查
  3. 监控告警:监控管道执行状态和数据质量指标
  4. 增量处理:支持增量数据处理,避免重复计算

总结

LLM数据管道是数据驱动应用的基础设施。通过模块化设计,我们可以灵活组合各种数据处理组件,构建可靠、高效的数据流水线。