内容审核:AI生成内容的质量控制
--- title: "内容审核:AI生成内容的质量控制" description: "建立AI生成内容的审核机制,确保内容质量和合规性" tags: ["内容审核", "质量控制", "AI内容", "LLM", "审核"] category: "llm" icon: "📋"
内容审核:AI生成内容的质量控制
审核概述
内容审核是确保AI生成内容质量、安全性和合规性的系统化过程。
审核框架
1. 审核流程
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
class ReviewStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
NEEDS_REVISION = "needs_revision"
@dataclass
class ReviewRequest:
"""审核请求"""
request_id: str
content: str
content_type: str
submitted_by: str
submitted_at: datetime
status: ReviewStatus = ReviewStatus.PENDING
reviewer: Optional[str] = None
review_notes: Optional[str] = None
review_score: Optional[float] = None
@dataclass
class ReviewCriteria:
"""审核标准"""
name: str
description: str
weight: float
check_func: callable
class ContentReviewSystem:
"""内容审核系统"""
def __init__(self):
self.review_queue: List[ReviewRequest] = []
self.review_criteria: List[ReviewCriteria] = []
self.review_history: List[Dict] = []
def add_review_criteria(self, criteria: ReviewCriteria):
"""添加审核标准"""
self.review_criteria.append(criteria)
def submit_for_review(self, content: str, content_type: str,
submitted_by: str) -> str:
"""提交审核"""
request_id = f"review_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
request = ReviewRequest(
request_id=request_id,
content=content,
content_type=content_type,
submitted_by=submitted_by,
submitted_at=datetime.now()
)
self.review_queue.append(request)
return request_id
def auto_review(self, request_id: str) -> Dict:
"""自动审核"""
request = self._get_request(request_id)
if not request:
return {"error": "Request not found"}
# 执行审核标准
results = []
total_score = 0
for criteria in self.review_criteria:
try:
result = criteria.check_func(request.content)
results.append({
"criteria": criteria.name,
"passed": result.get("passed", False),
"score": result.get("score", 0),
"message": result.get("message", "")
})
total_score += result.get("score", 0) * criteria.weight
except Exception as e:
results.append({
"criteria": criteria.name,
"passed": False,
"score": 0,
"message": f"检查失败: {str(e)}"
})
# 计算总体分数
max_possible_score = sum(c.weight for c in self.review_criteria)
normalized_score = total_score / max_possible_score if max_possible_score > 0 else 0
# 确定状态
if normalized_score >= 0.8:
status = ReviewStatus.APPROVED
elif normalized_score >= 0.6:
status = ReviewStatus.NEEDS_REVISION
else:
status = ReviewStatus.REJECTED
# 更新请求
request.status = status
request.review_score = normalized_score
# 记录审核历史
self.review_history.append({
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"status": status.value,
"score": normalized_score,
"results": results
})
return {
"request_id": request_id,
"status": status.value,
"score": normalized_score,
"results": results
}
def manual_review(self, request_id: str, reviewer: str,
approved: bool, notes: str = "") -> Dict:
"""人工审核"""
request = self._get_request(request_id)
if not request:
return {"error": "Request not found"}
request.reviewer = reviewer
request.review_notes = notes
request.status = ReviewStatus.APPROVED if approved else ReviewStatus.REJECTED
# 记录审核历史
self.review_history.append({
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"reviewer": reviewer,
"status": request.status.value,
"notes": notes
})
return {
"request_id": request_id,
"status": request.status.value,
"reviewer": reviewer
}
def _get_request(self, request_id: str) -> Optional[ReviewRequest]:
"""获取审核请求"""
for request in self.review_queue:
if request.request_id == request_id:
return request
return None
2. 审核标准
class ReviewCriteriaBuilder:
"""审核标准构建器"""
@staticmethod
def create_safety_criteria() -> ReviewCriteria:
"""创建安全标准"""
def check_safety(content: str) -> Dict:
harmful_terms = ["暴力", "仇恨", "歧视", "色情", "非法"]
found_terms = [term for term in harmful_terms if term in content]
passed = len(found_terms) == 0
score = 1.0 - (len(found_terms) * 0.2)
return {
"passed": passed,
"score": max(score, 0),
"message": f"发现不安全内容: {found_terms}" if found_terms else "内容安全"
}
return ReviewCriteria(
name="safety",
description="内容安全性检查",
weight=0.3,
check_func=check_safety
)
@staticmethod
def create_quality_criteria() -> ReviewCriteria:
"""创建质量标准"""
def check_quality(content: str) -> Dict:
# 检查长度
if len(content) < 50:
return {"passed": False, "score": 0.3, "message": "内容过短"}
# 检查重复
words = content.split()
if len(set(words)) < len(words) * 0.5:
return {"passed": False, "score": 0.4, "message": "内容重复度高"}
# 检查标点
if "。" not in content and len(content) > 100:
return {"passed": False, "score": 0.6, "message": "缺乏标点符号"}
return {"passed": True, "score": 0.9, "message": "内容质量良好"}
return ReviewCriteria(
name="quality",
description="内容质量检查",
weight=0.3,
check_func=check_quality
)
@staticmethod
def create_compliance_criteria() -> ReviewCriteria:
"""创建合规标准"""
def check_compliance(content: str) -> Dict:
# 检查是否包含敏感信息
import re
patterns = {
"email": r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
"phone": r'1[3-9]\d{9}',
"id_card": r'\d{17}[\dXx]'
}
violations = []
for info_type, pattern in patterns.items():
if re.search(pattern, content):
violations.append(info_type)
passed = len(violations) == 0
score = 1.0 - (len(violations) * 0.3)
return {
"passed": passed,
"score": max(score, 0),
"message": f"包含敏感信息: {violations}" if violations else "内容合规"
}
return ReviewCriteria(
name="compliance",
description="内容合规检查",
weight=0.2,
check_func=check_compliance
)
@staticmethod
def create_relevance_criteria(topic: str) -> ReviewCriteria:
"""创建相关性标准"""
def check_relevance(content: str) -> Dict:
topic_keywords = topic.split()
relevant_count = sum(1 for kw in topic_keywords if kw in content)
relevance_score = relevant_count / len(topic_keywords) if topic_keywords else 0
passed = relevance_score > 0.3
return {
"passed": passed,
"score": relevance_score,
"message": f"相关性分数: {relevance_score:.2f}"
}
return ReviewCriteria(
name="relevance",
description=f"与主题'{topic}'的相关性检查",
weight=0.2,
check_func=check_relevance
)
3. 审核报告
class ReviewReportGenerator:
"""审核报告生成器"""
def __init__(self, review_system: ContentReviewSystem):
self.review_system = review_system
def generate_report(self, request_id: str) -> str:
"""生成审核报告"""
request = self.review_system._get_request(request_id)
if not request:
return "审核请求不存在"
# 获取审核历史
history = [h for h in self.review_system.review_history if h["request_id"] == request_id]
report = f"""
内容审核报告
{'='*50}
请求ID: {request.request_id}
内容类型: {request.content_type}
提交者: {request.submitted_by}
提交时间: {request.submitted_at}
当前状态: {request.status.value}
审核结果:
"""
if history:
latest = history[-1]
report += f"审核分数: {latest.get('score', 'N/A')}\n"
if "results" in latest:
report += "\n详细结果:\n"
for result in latest["results"]:
status = "✓" if result["passed"] else "✗"
report += f" {status} {result['criteria']}: {result['message']}\n"
if request.reviewer:
report += f"\n人工审核:\n"
report += f" 审核人: {request.reviewer}\n"
report += f" 备注: {request.review_notes}\n"
return report
def generate_statistics_report(self, days: int = 30) -> str:
"""生成统计报告"""
cutoff_date = datetime.now() - timedelta(days=days)
recent_history = [
h for h in self.review_system.review_history
if datetime.fromisoformat(h["timestamp"]) > cutoff_date
]
if not recent_history:
return "无审核记录"
# 统计
total = len(recent_history)
approved = sum(1 for h in recent_history if h["status"] == "approved")
rejected = sum(1 for h in recent_history if h["status"] == "rejected")
needs_revision = sum(1 for h in recent_history if h["status"] == "needs_revision")
scores = [h.get("score", 0) for h in recent_history if "score" in h]
avg_score = sum(scores) / len(scores) if scores else 0
report = f"""
审核统计报告
{'='*50}
统计期间: 最近{days}天
总审核数: {total}
审核结果分布:
通过: {approved} ({approved/total*100:.1f}%)
拒绝: {rejected} ({rejected/total*100:.1f}%)
需修改: {needs_revision} ({needs_revision/total*100:.1f}%)
平均分数: {avg_score:.2f}
"""
return report
最佳实践
- 多层审核:实施自动审核和人工审核结合
- 明确标准:制定清晰的审核标准
- 持续改进:根据审核结果持续改进
- 透明报告:向用户透明地展示审核结果
总结
内容审核是确保AI生成内容质量的关键环节。通过建立完善的审核机制,可以确保内容的安全性、质量和合规性。