LLM治理:企业级LLM使用的治理框架
--- title: "LLM治理:企业级LLM使用的治理框架" description: "探讨企业级LLM使用的治理框架,包括使用策略、审批流程、风险管控、责任划分等组织管理实践" tags: ["LLM治理", "企业治理", "风险管控", "使用策略"] category: "llm" icon: "🧠"
LLM治理:企业级LLM使用的治理框架
为什么需要LLM治理
随着LLM在企业中的广泛应用,缺乏治理框架可能导致数据泄露、合规风险、成本失控等问题。系统化的治理框架确保LLM使用既高效又安全。
治理框架设计
1. 使用策略定义
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class AccessLevel(Enum):
PUBLIC = "public" # 公开信息处理
INTERNAL = "internal" # 内部信息处理
CONFIDENTIAL = "confidential" # 机密信息处理
RESTRICTED = "restricted" # 受限信息处理
class DataClassification(Enum):
PUBLIC = 1
INTERNAL = 2
CONFIDENTIAL = 3
SECRET = 4
@dataclass
class LLMUsagePolicy:
name: str
allowed_data_classifications: list[DataClassification]
required_approvals: list[str]
max_tokens_per_request: int
allowed_models: list[str]
data_retention_days: int
logging_required: bool = True
# 策略配置
POLICIES = {
"customer_support": LLMUsagePolicy(
name="客服对话",
allowed_data_classifications=[DataClassification.PUBLIC, DataClassification.INTERNAL],
required_approvals=["team_lead"],
max_tokens=4096,
allowed_models=["gpt-4o-mini", "claude-sonnet-4-20250514"],
data_retention_days=90,
),
"code_review": LLMUsagePolicy(
name="代码审查",
allowed_data_classifications=[DataClassification.INTERNAL, DataClassification.CONFIDENTIAL],
required_approvals=["security_team"],
max_tokens=8192,
allowed_models=["gpt-4o"],
data_retention_days=30,
),
"document_analysis": LLMUsagePolicy(
name="文档分析",
allowed_data_classifications=[DataClassification.INTERNAL],
required_approvals=["compliance"],
max_tokens=16384,
allowed_models=["gpt-4o"],
data_retention_days=7,
),
}
2. 访问控制体系
class AccessControlManager:
def __init__(self, policies: dict):
self.policies = policies
self.user_permissions = {}
def check_access(
self,
user_id: str,
feature: str,
data_classification: DataClassification,
) -> dict:
# 获取用户权限
user_perms = self.user_permissions.get(user_id, {})
# 检查功能权限
if feature not in user_perms.get("allowed_features", []):
return {
"allowed": False,
"reason": f"用户无权使用 {feature} 功能",
}
# 检查数据分级权限
policy = self.policies.get(feature)
if not policy:
return {"allowed": False, "reason": "未知功能"}
if data_classification not in policy.allowed_data_classifications:
return {
"allowed": False,
"reason": f"数据级别 {data_classification.name} 超出允许范围",
}
# 检查审批状态
for approval in policy.required_approvals:
if not user_perms.get("approvals", {}).get(approval):
return {
"allowed": False,
"reason": f"缺少 {approval} 审批",
}
return {"allowed": True}
def grant_permission(self, user_id: str, feature: str, approver: str):
if user_id not in self.user_permissions:
self.user_permissions[user_id] = {"allowed_features": [], "approvals": {}}
self.user_permissions[user_id]["allowed_features"].append(feature)
self.user_permissions[user_id]["approvals"][approver] = datetime.now()
3. 审批工作流
class ApprovalWorkflow:
def __init__(self):
self.pending_requests = []
async def submit_request(
self,
user_id: str,
feature: str,
justification: str,
data_samples: list[str] = None,
) -> str:
request_id = str(uuid.uuid4())
request = {
"id": request_id,
"user_id": user_id,
"feature": feature,
"justification": justification,
"data_samples": data_samples or [],
"status": "pending",
"created_at": datetime.now(),
"approvals": [],
}
self.pending_requests.append(request)
# 通知审批人
await self._notify_approvers(request)
return request_id
async def approve_request(self, request_id: str, approver: str, comments: str = ""):
for request in self.pending_requests:
if request["id"] == request_id:
request["approvals"].append({
"approver": approver,
"timestamp": datetime.now(),
"comments": comments,
})
# 检查是否满足所有审批要求
policy = POLICIES.get(request["feature"])
required = set(policy.required_approvals)
obtained = set(a["approver"] for a in request["approvals"])
if required.issubset(obtained):
request["status"] = "approved"
await self._grant_access(request)
return True
return False
4. 数据安全管控
class DataSecurityManager:
def __init__(self):
self.sensitive_patterns = [
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', # 信用卡号
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b', # 社保号
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 邮箱
r'\b1[3-9]\d{9}\b', # 手机号
]
def scan_for_sensitive_data(self, text: str) -> dict:
import re
findings = []
for pattern in self.sensitive_patterns:
matches = re.findall(pattern, text)
if matches:
findings.append({
"type": self._identify_type(pattern),
"count": len(matches),
"samples": matches[:3],
})
return {
"has_sensitive_data": len(findings) > 0,
"findings": findings,
"risk_level": self._assess_risk(findings),
}
def sanitize_input(self, text: str) -> str:
import re
sanitized = text
for pattern in self.sensitive_patterns:
sanitized = re.sub(pattern, "[REDACTED]", sanitized)
return sanitized
def _identify_type(self, pattern: str) -> str:
# 根据模式识别数据类型
type_map = {
r'\d{4}[-\s]?\d{4}': "credit_card",
r'\d{3}[-\s]?\d{2}': "ssn",
r'@': "email",
r'1[3-9]\d{9}': "phone",
}
for p, t in type_map.items():
if p in pattern:
return t
return "unknown"
5. 日志与审计
class GovernanceAuditLogger:
def __init__(self, storage):
self.storage = storage
async def log_llm_interaction(
self,
user_id: str,
feature: str,
request: dict,
response: dict,
policy_check: dict,
):
audit_record = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"feature": feature,
"request_summary": {
"model": request.get("model"),
"input_length": len(request.get("messages", [])),
},
"response_summary": {
"tokens_used": response.get("usage", {}).get("total_tokens"),
"finish_reason": response.get("choices", [{}])[0].get("finish_reason"),
},
"policy_compliance": policy_check,
"data_classification": request.get("data_classification", "unknown"),
}
await self.storage.write("audit_llm", audit_record)
async def generate_compliance_report(self, start_date, end_date) -> dict:
records = await self.storage.query(
"audit_llm",
start_date=start_date,
end_date=end_date,
)
return {
"total_interactions": len(records),
"by_user": self._group_by(records, "user_id"),
"by_feature": self._group_by(records, "feature"),
"policy_violations": [
r for r in records
if not r["policy_compliance"].get("allowed", True)
],
"data_classification_distribution": self._group_by(
records, "data_classification"
),
}
治理仪表板
class GovernanceDashboard:
def __init__(self, access_control, audit_logger):
self.access_control = access_control
self.audit_logger = audit_logger
async def get_governance_overview(self, days: int = 30) -> dict:
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
report = await self.audit_logger.generate_compliance_report(
start_date, end_date
)
return {
"summary": {
"total_interactions": report["total_interactions"],
"unique_users": len(report["by_user"]),
"features_used": len(report["by_feature"]),
},
"compliance": {
"policy_violations": len(report["policy_violations"]),
"violation_rate": len(report["policy_violations"]) / max(report["total_interactions"], 1),
},
"security": {
"data_classification": report["data_classification_distribution"],
},
"top_users": sorted(
report["by_user"].items(),
key=lambda x: x[1],
reverse=True,
)[:10],
}
总结
企业级LLM治理需要覆盖策略定义、访问控制、审批流程、数据安全、审计追踪等多个维度。通过系统化的治理框架,企业可以在享受LLM能力的同时,有效管控风险,确保合规使用。