← 返回首页
🧠

LLM授权策略

📂 llm ⏱ 3 min 444 words

--- title: "LLM授权策略" description: "详细介绍LLM应用中的授权策略,包括角色控制、权限管理、细粒度访问控制等实践方案" tags: ["授权", "权限管理", "访问控制"] category: "llm" icon: "🧠"

LLM授权策略

授权与认证的区别

认证解决"你是谁"的问题,授权解决"你能做什么"的问题。在LLM应用中,即使用户通过认证,也需要限制其对特定模型、功能或数据的访问权限。

基于角色的访问控制(RBAC)

角色定义

为不同用户类型定义明确的角色和权限。

from enum import Enum
from typing import Set, Dict

class LLMRole(Enum):
    VIEWER = "viewer"
    USER = "user"
    DEVELOPER = "developer"
    ADMIN = "admin"

class LLMPermission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    MANAGE_KEYS = "manage_keys"
    VIEW_USAGE = "view_usage"

ROLE_PERMISSIONS: Dict[LLMRole, Set[LLMPermission]] = {
    LLMRole.VIEWER: {LLMPermission.READ},
    LLMRole.USER: {LLMPermission.READ, LLMPermission.WRITE},
    LLMRole.DEVELOPER: {LLMPermission.READ, LLMPermission.WRITE, LLMPermission.VIEW_USAGE},
    LLMRole.ADMIN: {p for p in LLMPermission}
}

权限检查装饰器

使用装饰器简化权限验证逻辑。

from functools import wraps

def require_permission(permission: LLMPermission):
    def decorator(func):
        @wraps(func)
        def wrapper(user_role: LLMRole, *args, **kwargs):
            if permission not in ROLE_PERMISSIONS.get(user_role, set()):
                raise PermissionError(f"角色 {user_role.value} 无权执行此操作")
            return func(user_role, *args, **kwargs)
        return wrapper
    return decorator

@require_permission(LLMPermission.MANAGE_KEYS)
def rotate_api_key(user_role, old_key, new_key):
    # 密钥轮换逻辑
    pass

基于属性的访问控制(ABAC)

ABAC根据用户属性、资源属性和环境条件动态决策。

from dataclasses import dataclass
from typing import Optional
from datetime import datetime

@dataclass
class AccessContext:
    user_id: str
    user_role: LLMRole
    department: str
    resource_type: str
    resource_owner: str
    timestamp: datetime
    ip_address: str

class ABACPolicy:
    def __init__(self):
        self.policies = []
    
    def add_policy(self, condition, effect="allow"):
        self.policies.append({"condition": condition, "effect": effect})
    
    def evaluate(self, context: AccessContext) -> bool:
        for policy in self.policies:
            if policy["condition"](context):
                return policy["effect"] == "allow"
        return False

# 使用示例
policy = ABACPolicy()

# 策略1: 管理员可以访问所有资源
policy.add_policy(
    lambda ctx: ctx.user_role == LLMRole.ADMIN
)

# 策略2: 用户只能访问自己部门的资源
policy.add_policy(
    lambda ctx: ctx.resource_owner == ctx.user_id or 
               ctx.resource_type in ["public", "shared"]
)

# 策略3: 工作时间外禁止访问敏感数据
policy.add_policy(
    lambda ctx: not (ctx.resource_type == "sensitive" and 
                    (ctx.timestamp.hour < 9 or ctx.timestamp.hour > 18)),
    effect="deny"
)

模型级别的授权

不同用户可能有权限使用不同的LLM模型。

class ModelAccessControl:
    def __init__(self):
        self.model_tiers = {
            "gpt-4": {"min_role": LLMRole.DEVELOPER, "quota": 1000},
            "gpt-3.5-turbo": {"min_role": LLMRole.USER, "quota": 5000},
            "gpt-4-turbo": {"min_role": LLMRole.ADMIN, "quota": 100}
        }
    
    def can_access_model(self, user_role: LLMRole, model: str) -> bool:
        if model not in self.model_tiers:
            return False
        
        tier = self.model_tiers[model]
        role_hierarchy = [LLMRole.VIEWER, LLMRole.USER, 
                         LLMRole.DEVELOPER, LLMRole.ADMIN]
        
        user_level = role_hierarchy.index(user_role)
        required_level = role_hierarchy.index(tier["min_role"])
        
        return user_level >= required_level
    
    def get_model_quota(self, model: str) -> int:
        return self.model_tiers.get(model, {}).get("quota", 0)

API密钥授权

为不同用途创建具有不同权限的API密钥。

import hashlib
import secrets
from datetime import datetime, timedelta

class APIKeyAuthorization:
    def __init__(self):
        self.keys = {}
    
    def create_key(self, user_id: str, permissions: Set[LLMPermission],
                   expires_days: int = 90, rate_limit: int = 100) -> str:
        key = f"sk-{secrets.token_hex(32)}"
        key_hash = hashlib.sha256(key.encode()).hexdigest()
        
        self.keys[key_hash] = {
            "user_id": user_id,
            "permissions": permissions,
            "created_at": datetime.now(),
            "expires_at": datetime.now() + timedelta(days=expires_days),
            "rate_limit": rate_limit,
            "is_active": True
        }
        
        return key
    
    def validate_key(self, api_key: str) -> dict:
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        
        if key_hash not in self.keys:
            raise ValueError("无效的API密钥")
        
        key_data = self.keys[key_hash]
        
        if not key_data["is_active"]:
            raise ValueError("API密钥已禁用")
        
        if datetime.now() > key_data["expires_at"]:
            raise ValueError("API密钥已过期")
        
        return key_data
    
    def check_permission(self, api_key: str, permission: LLMPermission) -> bool:
        key_data = self.validate_key(api_key)
        return permission in key_data["permissions"]

授权审计日志

记录所有授权决策,便于审计和故障排查。

import logging
from datetime import datetime

audit_logger = logging.getLogger("llm_audit")

def log_authorization_decision(user_id: str, action: str, 
                               resource: str, decision: bool,
                               reason: str = ""):
    audit_logger.info({
        "timestamp": datetime.now().isoformat(),
        "user_id": user_id,
        "action": action,
        "resource": resource,
        "decision": "ALLOW" if decision else "DENY",
        "reason": reason
    })

合理的授权策略能在保护系统安全的同时,为不同用户提供灵活的访问控制。