← 返回首页
🧠

LLM治理:企业级LLM使用的治理框架

📂 llm ⏱ 4 min 620 words

--- title: "LLM治理:企业级LLM使用的治理框架" description: "探讨企业级LLM使用的治理框架,包括使用策略、审批流程、风险管控、责任划分等组织管理实践" tags: ["LLM治理", "企业治理", "风险管控", "使用策略"] category: "llm" icon: "🧠"

LLM治理:企业级LLM使用的治理框架

为什么需要LLM治理

随着LLM在企业中的广泛应用,缺乏治理框架可能导致数据泄露、合规风险、成本失控等问题。系统化的治理框架确保LLM使用既高效又安全。

治理框架设计

1. 使用策略定义

from dataclasses import dataclass
from enum import Enum
from typing import Optional

class AccessLevel(Enum):
    PUBLIC = "public"         # 公开信息处理
    INTERNAL = "internal"     # 内部信息处理
    CONFIDENTIAL = "confidential"  # 机密信息处理
    RESTRICTED = "restricted"      # 受限信息处理

class DataClassification(Enum):
    PUBLIC = 1
    INTERNAL = 2
    CONFIDENTIAL = 3
    SECRET = 4

@dataclass
class LLMUsagePolicy:
    name: str
    allowed_data_classifications: list[DataClassification]
    required_approvals: list[str]
    max_tokens_per_request: int
    allowed_models: list[str]
    data_retention_days: int
    logging_required: bool = True

# 策略配置
POLICIES = {
    "customer_support": LLMUsagePolicy(
        name="客服对话",
        allowed_data_classifications=[DataClassification.PUBLIC, DataClassification.INTERNAL],
        required_approvals=["team_lead"],
        max_tokens=4096,
        allowed_models=["gpt-4o-mini", "claude-sonnet-4-20250514"],
        data_retention_days=90,
    ),
    "code_review": LLMUsagePolicy(
        name="代码审查",
        allowed_data_classifications=[DataClassification.INTERNAL, DataClassification.CONFIDENTIAL],
        required_approvals=["security_team"],
        max_tokens=8192,
        allowed_models=["gpt-4o"],
        data_retention_days=30,
    ),
    "document_analysis": LLMUsagePolicy(
        name="文档分析",
        allowed_data_classifications=[DataClassification.INTERNAL],
        required_approvals=["compliance"],
        max_tokens=16384,
        allowed_models=["gpt-4o"],
        data_retention_days=7,
    ),
}

2. 访问控制体系

class AccessControlManager:
    def __init__(self, policies: dict):
        self.policies = policies
        self.user_permissions = {}

    def check_access(
        self,
        user_id: str,
        feature: str,
        data_classification: DataClassification,
    ) -> dict:
        # 获取用户权限
        user_perms = self.user_permissions.get(user_id, {})

        # 检查功能权限
        if feature not in user_perms.get("allowed_features", []):
            return {
                "allowed": False,
                "reason": f"用户无权使用 {feature} 功能",
            }

        # 检查数据分级权限
        policy = self.policies.get(feature)
        if not policy:
            return {"allowed": False, "reason": "未知功能"}

        if data_classification not in policy.allowed_data_classifications:
            return {
                "allowed": False,
                "reason": f"数据级别 {data_classification.name} 超出允许范围",
            }

        # 检查审批状态
        for approval in policy.required_approvals:
            if not user_perms.get("approvals", {}).get(approval):
                return {
                    "allowed": False,
                    "reason": f"缺少 {approval} 审批",
                }

        return {"allowed": True}

    def grant_permission(self, user_id: str, feature: str, approver: str):
        if user_id not in self.user_permissions:
            self.user_permissions[user_id] = {"allowed_features": [], "approvals": {}}

        self.user_permissions[user_id]["allowed_features"].append(feature)
        self.user_permissions[user_id]["approvals"][approver] = datetime.now()

3. 审批工作流

class ApprovalWorkflow:
    def __init__(self):
        self.pending_requests = []

    async def submit_request(
        self,
        user_id: str,
        feature: str,
        justification: str,
        data_samples: list[str] = None,
    ) -> str:
        request_id = str(uuid.uuid4())

        request = {
            "id": request_id,
            "user_id": user_id,
            "feature": feature,
            "justification": justification,
            "data_samples": data_samples or [],
            "status": "pending",
            "created_at": datetime.now(),
            "approvals": [],
        }

        self.pending_requests.append(request)

        # 通知审批人
        await self._notify_approvers(request)

        return request_id

    async def approve_request(self, request_id: str, approver: str, comments: str = ""):
        for request in self.pending_requests:
            if request["id"] == request_id:
                request["approvals"].append({
                    "approver": approver,
                    "timestamp": datetime.now(),
                    "comments": comments,
                })

                # 检查是否满足所有审批要求
                policy = POLICIES.get(request["feature"])
                required = set(policy.required_approvals)
                obtained = set(a["approver"] for a in request["approvals"])

                if required.issubset(obtained):
                    request["status"] = "approved"
                    await self._grant_access(request)
                    return True

        return False

4. 数据安全管控

class DataSecurityManager:
    def __init__(self):
        self.sensitive_patterns = [
            r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',  # 信用卡号
            r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',              # 社保号
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # 邮箱
            r'\b1[3-9]\d{9}\b',                                 # 手机号
        ]

    def scan_for_sensitive_data(self, text: str) -> dict:
        import re
        findings = []

        for pattern in self.sensitive_patterns:
            matches = re.findall(pattern, text)
            if matches:
                findings.append({
                    "type": self._identify_type(pattern),
                    "count": len(matches),
                    "samples": matches[:3],
                })

        return {
            "has_sensitive_data": len(findings) > 0,
            "findings": findings,
            "risk_level": self._assess_risk(findings),
        }

    def sanitize_input(self, text: str) -> str:
        import re
        sanitized = text

        for pattern in self.sensitive_patterns:
            sanitized = re.sub(pattern, "[REDACTED]", sanitized)

        return sanitized

    def _identify_type(self, pattern: str) -> str:
        # 根据模式识别数据类型
        type_map = {
            r'\d{4}[-\s]?\d{4}': "credit_card",
            r'\d{3}[-\s]?\d{2}': "ssn",
            r'@': "email",
            r'1[3-9]\d{9}': "phone",
        }
        for p, t in type_map.items():
            if p in pattern:
                return t
        return "unknown"

5. 日志与审计

class GovernanceAuditLogger:
    def __init__(self, storage):
        self.storage = storage

    async def log_llm_interaction(
        self,
        user_id: str,
        feature: str,
        request: dict,
        response: dict,
        policy_check: dict,
    ):
        audit_record = {
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "feature": feature,
            "request_summary": {
                "model": request.get("model"),
                "input_length": len(request.get("messages", [])),
            },
            "response_summary": {
                "tokens_used": response.get("usage", {}).get("total_tokens"),
                "finish_reason": response.get("choices", [{}])[0].get("finish_reason"),
            },
            "policy_compliance": policy_check,
            "data_classification": request.get("data_classification", "unknown"),
        }

        await self.storage.write("audit_llm", audit_record)

    async def generate_compliance_report(self, start_date, end_date) -> dict:
        records = await self.storage.query(
            "audit_llm",
            start_date=start_date,
            end_date=end_date,
        )

        return {
            "total_interactions": len(records),
            "by_user": self._group_by(records, "user_id"),
            "by_feature": self._group_by(records, "feature"),
            "policy_violations": [
                r for r in records
                if not r["policy_compliance"].get("allowed", True)
            ],
            "data_classification_distribution": self._group_by(
                records, "data_classification"
            ),
        }

治理仪表板

class GovernanceDashboard:
    def __init__(self, access_control, audit_logger):
        self.access_control = access_control
        self.audit_logger = audit_logger

    async def get_governance_overview(self, days: int = 30) -> dict:
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)

        report = await self.audit_logger.generate_compliance_report(
            start_date, end_date
        )

        return {
            "summary": {
                "total_interactions": report["total_interactions"],
                "unique_users": len(report["by_user"]),
                "features_used": len(report["by_feature"]),
            },
            "compliance": {
                "policy_violations": len(report["policy_violations"]),
                "violation_rate": len(report["policy_violations"]) / max(report["total_interactions"], 1),
            },
            "security": {
                "data_classification": report["data_classification_distribution"],
            },
            "top_users": sorted(
                report["by_user"].items(),
                key=lambda x: x[1],
                reverse=True,
            )[:10],
        }

总结

企业级LLM治理需要覆盖策略定义、访问控制、审批流程、数据安全、审计追踪等多个维度。通过系统化的治理框架,企业可以在享受LLM能力的同时,有效管控风险,确保合规使用。