← 返回首页
🧠

GPU调度系统

📂 llm ⏱ 3 min 435 words

--- title: "GPU调度系统" description: "介绍GPU调度系统的设计与实现,包括Kubernetes调度、资源调度策略和多租户GPU资源管理方案。" tags: ["GPU调度", "资源调度", "Kubernetes", "调度策略"] category: "llm" icon: "🧠"

GPU调度系统

GPU调度的挑战

GPU资源昂贵且异构,调度系统需要在多个用户和任务之间高效分配GPU资源。核心挑战包括:资源碎片化、抢占与恢复、公平性保障、以及异构GPU的统一管理。

Kubernetes GPU调度

GPU资源声明

apiVersion: v1
kind: Pod
metadata:
  name: llm-inference
spec:
  containers:
  - name: inference
    image: vllm/vllm:latest
    resources:
      limits:
        nvidia.com/gpu: 2  # 请求2张GPU
        nvidia.com/gpu: "A100-80GB"  # 指定GPU型号
      requests:
        nvidia.com/gpu: 2
  nodeSelector:
    nvidia.com/gpu.product: "NVIDIA-A100-80GB-HBM2e"

GPU资源分区

# 使用MIG(Multi-Instance GPU)分割GPU
apiVersion: v1
kind: Pod
metadata:
  name: small-inference
spec:
  containers:
  - name: inference
    resources:
      limits:
        nvidia.com/mig-1g.5gb: 1  # 使用1/7的A100

自定义调度器

基于优先级的调度

from dataclasses import dataclass
from typing import List
import heapq

@dataclass
class GPURequest:
    priority: int
    gpu_count: int
    job_id: str
    model_name: str
    
    def __lt__(self, other):
        return self.priority > other.priority  # 高优先级先调度

class GPUScheduler:
    def __init__(self, total_gpus: int):
        self.total_gpus = total_gpus
        self.available_gpus = total_gpus
        self.pending_queue: List[GPURequest] = []
        self.running_jobs = {}
    
    def submit(self, request: GPURequest):
        heapq.heappush(self.pending_queue, request)
        self._try_schedule()
    
    def _try_schedule(self):
        while self.pending_queue and self.pending_queue[0].gpu_count <= self.available_gpus:
            request = heapq.heappop(self.pending_queue)
            self.available_gpus -= request.gpu_count
            self.running_jobs[request.job_id] = request
            print(f"调度任务 {request.job_id}: {request.gpu_count}张GPU")
    
    def complete(self, job_id: str):
        if job_id in self.running_jobs:
            request = self.running_jobs.pop(job_id)
            self.available_gpus += request.gpu_count
            self._try_schedule()

# 使用示例
scheduler = GPUScheduler(total_gpus=8)
scheduler.submit(GPURequest(priority=1, gpu_count=2, job_id="job1", model_name="7B"))
scheduler.submit(GPURequest(priority=10, gpu_count=4, job_id="critical", model_name="70B"))

GPU共享调度

class SharedGPUScheduler:
    """支持GPU共享的调度器,多个任务可共享同一张GPU"""
    
    def __init__(self, total_gpus: int, max_shares_per_gpu: int = 4):
        self.total_gpus = total_gpus
        self.max_shares = max_shares_per_gpu
        self.gpu_shares = [0] * total_gpus
    
    def allocate(self, request) -> list:
        """为任务分配GPU份额"""
        allocated = []
        for gpu_id in range(self.total_gpus):
            if self.gpu_shares[gpu_id] < self.max_shares:
                self.gpu_shares[gpu_id] += 1
                allocated.append(gpu_id)
                if len(allocated) >= request.min_gpus:
                    break
        return allocated
    
    def release(self, gpu_ids: list):
        for gpu_id in gpu_ids:
            self.gpu_shares[gpu_id] -= 1
    
    def get_utilization(self):
        """计算GPU利用率"""
        total_shares = sum(self.gpu_shares)
        return total_shares / (self.total_gpus * self.max_shares)

调度策略

公平调度

class FairScheduler:
    """按比例分配GPU资源"""
    
    def __init__(self, total_gpus: int):
        self.total_gpus = total_gpus
        self.quotas = {}  # user_id -> quota
    
    def set_quota(self, user_id: str, quota: float):
        self.quotas[user_id] = quota
    
    def allocate(self, user_id: str, requested: int) -> int:
        """确保不超过用户的配额"""
        quota = self.quotas.get(user_id, 1.0)
        max_allowed = int(self.total_gpus * quota)
        return min(requested, max_allowed)

抢占调度

class PreemptiveScheduler:
    """支持任务抢占的调度器"""
    
    def __init__(self):
        self.running = []
        self.preempted = []
    
    def preempt(self, job_id: str):
        """抢占低优先级任务"""
        for i, (priority, jid, gpus) in enumerate(self.running):
            if jid == job_id and priority < 5:
                self.running.pop(i)
                self.preempted.append((priority, jid, gpus))
                print(f"任务 {job_id} 被抢占")
                return True
        return False
    
    def restore(self, job_id: str):
        """恢复被抢占的任务"""
        for i, (priority, jid, gpus) in enumerate(self.preempted):
            if jid == job_id:
                self.preempted.pop(i)
                self.running.append((priority, jid, gpus))
                print(f"任务 {job_id} 已恢复")
                return True
        return False

监控与弹性伸缩

class GPUMonitor:
    """GPU资源监控"""
    
    def __init__(self):
        self.metrics = []
    
    def collect_metrics(self) -> dict:
        import subprocess
        result = subprocess.run(
            ['nvidia-smi', '--query-gpu=index,utilization.gpu,memory.used,memory.total',
             '--format=csv,noheader,nounits'],
            capture_output=True, text=True
        )
        
        metrics = {}
        for line in result.stdout.strip().split('\n'):
            idx, util, mem_used, mem_total = line.split(', ')
            metrics[int(idx)] = {
                'utilization': float(util),
                'memory_used': float(mem_used),
                'memory_total': float(mem_total),
            }
        return metrics
    
    def should_scale_out(self, metrics: dict, threshold: float = 0.8) -> bool:
        """判断是否需要扩容"""
        avg_util = sum(m['utilization'] for m in metrics.values()) / len(metrics)
        return avg_util > threshold * 100

最佳实践