← 返回首页
🧠

数据质量控制

📂 llm ⏱ 5 min 925 words

--- title: "数据质量控制" description: "详细介绍数据质量控制的方法论,包括质量检测、数据清洗和标注质量保证技术" tags: ["数据质量", "质量检测", "数据清洗", "标注质量"] category: "llm" icon: "🧠"

数据质量控制

数据质量的重要性

数据质量是LLM性能的决定性因素。低质量数据会导致:

数据质量控制应该贯穿数据生命周期的每个阶段。

数据质量维度

质量维度定义

quality_dimensions = {
    "准确性": {
        "定义": "数据是否正确反映现实",
        "检测方法": "事实核查、专家审核",
        "示例": "统计数据是否正确、引用是否准确"
    },
    "完整性": {
        "定义": "数据是否完整无缺失",
        "检测方法": "缺失值检测、字段验证",
        "示例": "必填字段是否完整、文本是否截断"
    },
    "一致性": {
        "定义": "数据格式和标准是否一致",
        "检测方法": "格式验证、规则检查",
        "示例": "日期格式统一、编码一致"
    },
    "时效性": {
        "定义": "数据是否及时更新",
        "检测方法": "时间戳检查、版本对比",
        "示例": "信息是否过时、是否需要更新"
    },
    "相关性": {
        "定义": "数据是否与任务相关",
        "检测方法": "相关性评分、任务匹配",
        "示例": "内容是否符合训练目标"
    }
}

质量检测方法

自动化质量检测

import re
from collections import Counter

class AutomatedQualityDetector:
    """自动化数据质量检测"""
    def __init__(self):
        self.checks = [
            self.check_empty,
            self.check_too_short,
            self.check_too_long,
            self.check_encoding,
            self.check_language,
            self.check_special_chars,
            self.check_repetition,
            self.check_information_density
        ]
    
    def check_empty(self, text):
        """检查空文本"""
        return len(text.strip()) > 0
    
    def check_too_short(self, text, min_words=5):
        """检查过短文本"""
        return len(text.split()) >= min_words
    
    def check_too_long(self, text, max_words=10000):
        """检查过长文本"""
        return len(text.split()) <= max_words
    
    def check_encoding(self, text):
        """检查编码问题"""
        try:
            text.encode('utf-8')
            return True
        except UnicodeEncodeError:
            return False
    
    def check_language(self, text):
        """检查语言一致性"""
        chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
        english_chars = len(re.findall(r'[a-zA-Z]', text))
        total = chinese_chars + english_chars
        
        if total == 0:
            return False
        
        # 主要语言占比超过70%
        primary_ratio = max(chinese_chars, english_chars) / total
        return primary_ratio > 0.7
    
    def check_special_chars(self, text, threshold=0.3):
        """检查特殊字符比例"""
        special_chars = len(re.findall(r'[^\w\s]', text))
        total_chars = len(text)
        
        if total_chars == 0:
            return False
        
        return special_chars / total_chars < threshold
    
    def check_repetition(self, text, threshold=0.2):
        """检查重复内容"""
        words = text.split()
        if len(words) < 10:
            return True
        
        # 检查连续重复
        consecutive_repeats = 0
        for i in range(1, len(words)):
            if words[i] == words[i-1]:
                consecutive_repeats += 1
        
        return consecutive_repeats / len(words) < threshold
    
    def check_information_density(self, text, threshold=0.3):
        """检查信息密度"""
        words = text.split()
        if not words:
            return False
        
        # 计算有意义词的比例(非停用词)
        stopwords = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这'}
        
        meaningful_words = [w for w in words if w not in stopwords]
        
        return len(meaningful_words) / len(words) > threshold
    
    def detect_quality(self, text):
        """检测文本质量"""
        results = {}
        for check in self.checks:
            check_name = check.__name__
            results[check_name] = check(text)
        
        # 计算质量分数
        quality_score = sum(results.values()) / len(results)
        
        return {
            "checks": results,
            "quality_score": quality_score,
            "passed": quality_score > 0.7
        }

深度质量分析

class DeepQualityAnalyzer:
    """深度质量分析"""
    def __init__(self):
        self.analyzers = [
            self.analyze_coherence,
            self.analyze_readability,
            self.analyze_informativeness,
            self.analyze_safety
        ]
    
    def analyze_coherence(self, text):
        """分析文本连贯性"""
        sentences = text.split('。')
        if len(sentences) < 2:
            return 1.0
        
        # 简单的连贯性检查:句子长度方差
        lengths = [len(s.split()) for s in sentences if s.strip()]
        if not lengths:
            return 0.0
        
        variance = sum((l - sum(lengths)/len(lengths))**2 for l in lengths) / len(lengths)
        
        # 方差越小越连贯
        coherence = max(0, 1 - variance / 100)
        return coherence
    
    def analyze_readability(self, text):
        """分析可读性"""
        words = text.split()
        sentences = text.split('。')
        
        if not words or not sentences:
            return 0.0
        
        # 平均句长
        avg_sentence_length = len(words) / len([s for s in sentences if s.strip()])
        
        # 平均词长
        avg_word_length = sum(len(w) for w in words) / len(words)
        
        # 可读性分数(简化版)
        readability = max(0, 1 - (avg_sentence_length - 20) / 50)
        readability *= max(0, 1 - (avg_word_length - 5) / 10)
        
        return readability
    
    def analyze_informativeness(self, text):
        """分析信息量"""
        words = text.split()
        
        # 计算词汇多样性
        unique_words = set(words)
        lexical_diversity = len(unique_words) / len(words) if words else 0
        
        # 计算信息密度
        avg_word_length = sum(len(w) for w in words) / len(words) if words else 0
        information_density = min(1.0, avg_word_length / 10)
        
        return (lexical_diversity + information_density) / 2
    
    def analyze_safety(self, text):
        """分析安全性"""
        # 简单的安全检查
        unsafe_patterns = [
            r'暴力|伤害|死亡',
            r'歧视|偏见',
            r'色情|性',
            r'违法|犯罪'
        ]
        
        for pattern in unsafe_patterns:
            if re.search(pattern, text):
                return 0.0
        
        return 1.0

数据清洗流程

清洗管道

class DataCleaningPipeline:
    """数据清洗管道"""
    def __init__(self):
        self.steps = [
            self.remove_duplicates,
            self.fix_encoding,
            self.normalize_text,
            self.remove_noise,
            self.validate_format
        ]
    
    def remove_duplicates(self, dataset):
        """去重"""
        seen = set()
        unique_dataset = []
        
        for sample in dataset:
            text_hash = hash(sample["text"])
            if text_hash not in seen:
                seen.add(text_hash)
                unique_dataset.append(sample)
        
        return unique_dataset
    
    def fix_encoding(self, text):
        """修复编码问题"""
        # 替换特殊字符
        replacements = {
            '\u2018': "'", '\u2019': "'",
            '\u201c': '"', '\u201d': '"',
            '\u2013': '-', '\u2014': '-',
            '\u2026': '...',
            '\xa0': ' ', '\u3000': ' '
        }
        
        for old, new in replacements.items():
            text = text.replace(old, new)
        
        return text
    
    def normalize_text(self, text):
        """标准化文本"""
        # 统一空白字符
        text = re.sub(r'\s+', ' ', text)
        
        # 统一标点符号
        text = re.sub(r'[。,!?;:]', lambda m: m.group(), text)
        
        # 移除首尾空白
        text = text.strip()
        
        return text
    
    def remove_noise(self, text):
        """移除噪声"""
        # 移除HTML标签
        text = re.sub(r'<[^>]+>', '', text)
        
        # 移除URL
        text = re.sub(r'http\S+|www\.\S+', '', text)
        
        # 移除邮箱
        text = re.sub(r'\S+@\S+', '', text)
        
        # 移除多余空白
        text = re.sub(r'\s+', ' ', text).strip()
        
        return text
    
    def validate_format(self, sample):
        """验证格式"""
        required_fields = ["text", "metadata"]
        return all(field in sample for field in required_fields)
    
    def clean(self, dataset):
        """执行完整清洗流程"""
        # 应用所有清洗步骤
        cleaned_dataset = dataset
        
        for step in self.steps:
            cleaned_dataset = [step(s) if isinstance(step, type(self).validate_format) else {**s, "text": step(s["text"])} for s in cleaned_dataset]
        
        return cleaned_dataset

标注质量控制

标注一致性检查

class AnnotationQualityController:
    """标注质量控制"""
    def __init__(self, num_annotators):
        self.num_annotators = num_annotators
        self.annotations = []
    
    def add_annotation(self, sample_id, annotator_id, label, confidence):
        """添加标注"""
        self.annotations.append({
            "sample_id": sample_id,
            "annotator_id": annotator_id,
            "label": label,
            "confidence": confidence
        })
    
    def calculate_agreement(self):
        """计算标注者间一致性"""
        # 按样本分组
        sample_annotations = {}
        for ann in self.annotations:
            sample_id = ann["sample_id"]
            if sample_id not in sample_annotations:
                sample_annotations[sample_id] = []
            sample_annotations[sample_id].append(ann["label"])
        
        # 计算Cohen's Kappa
        from sklearn.metrics import cohen_kappa_score
        
        if len(sample_annotations) < 2:
            return 0.0
        
        # 简化计算:使用多数投票
        agreements = []
        for sample_id, labels in sample_annotations.items():
            if len(labels) >= 2:
                majority_label = Counter(labels).most_common(1)[0][0]
                agreement = sum(1 for l in labels if l == majority_label) / len(labels)
                agreements.append(agreement)
        
        return sum(agreements) / len(agreements) if agreements else 0.0
    
    def detect_low_confidence(self, threshold=0.5):
        """检测低置信度标注"""
        low_confidence = [
            ann for ann in self.annotations
            if ann["confidence"] < threshold
        ]
        return low_confidence
    
    def get_quality_report(self):
        """生成质量报告"""
        return {
            "total_annotations": len(self.annotations),
            "agreement_score": self.calculate_agreement(),
            "low_confidence_count": len(self.detect_low_confidence()),
            "annotator_distribution": Counter(ann["annotator_id"] for ann in self.annotations)
        }

质量监控仪表板

class QualityMonitoringDashboard:
    """质量监控仪表板"""
    def __init__(self):
        self.metrics_history = []
    
    def log_metrics(self, epoch, metrics):
        """记录指标"""
        self.metrics_history.append({
            "epoch": epoch,
            "timestamp": datetime.now(),
            **metrics
        })
    
    def plot_quality_trends(self):
        """绘制质量趋势"""
        import matplotlib.pyplot as plt
        
        epochs = [m["epoch"] for m in self.metrics_history]
        quality_scores = [m.get("quality_score", 0) for m in self.metrics_history]
        
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, quality_scores, marker='o')
        plt.xlabel("Epoch")
        plt.ylabel("Quality Score")
        plt.title("数据质量趋势")
        plt.grid(True)
        plt.show()
    
    def generate_report(self):
        """生成质量报告"""
        if not self.metrics_history:
            return "无数据"
        
        latest = self.metrics_history[-1]
        return {
            "当前质量分数": latest.get("quality_score", 0),
            "历史最高": max(m.get("quality_score", 0) for m in self.metrics_history),
            "历史最低": min(m.get("quality_score", 0) for m in self.metrics_history),
            "趋势": "上升" if len(self.metrics_history) > 1 and latest.get("quality_score", 0) > self.metrics_history[-2].get("quality_score", 0) else "下降"
        }

总结

数据质量控制是LLM训练成功的关键。通过多维度质量检测、系统化清洗流程和严格的标注质量控制,可以确保训练数据的高质量。持续的质量监控和反馈机制帮助及时发现和解决问题。