GPU调度系统
--- title: "GPU调度系统" description: "介绍GPU调度系统的设计与实现,包括Kubernetes调度、资源调度策略和多租户GPU资源管理方案。" tags: ["GPU调度", "资源调度", "Kubernetes", "调度策略"] category: "llm" icon: "🧠"
GPU调度系统
GPU调度的挑战
GPU资源昂贵且异构,调度系统需要在多个用户和任务之间高效分配GPU资源。核心挑战包括:资源碎片化、抢占与恢复、公平性保障、以及异构GPU的统一管理。
Kubernetes GPU调度
GPU资源声明
apiVersion: v1
kind: Pod
metadata:
name: llm-inference
spec:
containers:
- name: inference
image: vllm/vllm:latest
resources:
limits:
nvidia.com/gpu: 2 # 请求2张GPU
nvidia.com/gpu: "A100-80GB" # 指定GPU型号
requests:
nvidia.com/gpu: 2
nodeSelector:
nvidia.com/gpu.product: "NVIDIA-A100-80GB-HBM2e"
GPU资源分区
# 使用MIG(Multi-Instance GPU)分割GPU
apiVersion: v1
kind: Pod
metadata:
name: small-inference
spec:
containers:
- name: inference
resources:
limits:
nvidia.com/mig-1g.5gb: 1 # 使用1/7的A100
自定义调度器
基于优先级的调度
from dataclasses import dataclass
from typing import List
import heapq
@dataclass
class GPURequest:
priority: int
gpu_count: int
job_id: str
model_name: str
def __lt__(self, other):
return self.priority > other.priority # 高优先级先调度
class GPUScheduler:
def __init__(self, total_gpus: int):
self.total_gpus = total_gpus
self.available_gpus = total_gpus
self.pending_queue: List[GPURequest] = []
self.running_jobs = {}
def submit(self, request: GPURequest):
heapq.heappush(self.pending_queue, request)
self._try_schedule()
def _try_schedule(self):
while self.pending_queue and self.pending_queue[0].gpu_count <= self.available_gpus:
request = heapq.heappop(self.pending_queue)
self.available_gpus -= request.gpu_count
self.running_jobs[request.job_id] = request
print(f"调度任务 {request.job_id}: {request.gpu_count}张GPU")
def complete(self, job_id: str):
if job_id in self.running_jobs:
request = self.running_jobs.pop(job_id)
self.available_gpus += request.gpu_count
self._try_schedule()
# 使用示例
scheduler = GPUScheduler(total_gpus=8)
scheduler.submit(GPURequest(priority=1, gpu_count=2, job_id="job1", model_name="7B"))
scheduler.submit(GPURequest(priority=10, gpu_count=4, job_id="critical", model_name="70B"))
GPU共享调度
class SharedGPUScheduler:
"""支持GPU共享的调度器,多个任务可共享同一张GPU"""
def __init__(self, total_gpus: int, max_shares_per_gpu: int = 4):
self.total_gpus = total_gpus
self.max_shares = max_shares_per_gpu
self.gpu_shares = [0] * total_gpus
def allocate(self, request) -> list:
"""为任务分配GPU份额"""
allocated = []
for gpu_id in range(self.total_gpus):
if self.gpu_shares[gpu_id] < self.max_shares:
self.gpu_shares[gpu_id] += 1
allocated.append(gpu_id)
if len(allocated) >= request.min_gpus:
break
return allocated
def release(self, gpu_ids: list):
for gpu_id in gpu_ids:
self.gpu_shares[gpu_id] -= 1
def get_utilization(self):
"""计算GPU利用率"""
total_shares = sum(self.gpu_shares)
return total_shares / (self.total_gpus * self.max_shares)
调度策略
公平调度
class FairScheduler:
"""按比例分配GPU资源"""
def __init__(self, total_gpus: int):
self.total_gpus = total_gpus
self.quotas = {} # user_id -> quota
def set_quota(self, user_id: str, quota: float):
self.quotas[user_id] = quota
def allocate(self, user_id: str, requested: int) -> int:
"""确保不超过用户的配额"""
quota = self.quotas.get(user_id, 1.0)
max_allowed = int(self.total_gpus * quota)
return min(requested, max_allowed)
抢占调度
class PreemptiveScheduler:
"""支持任务抢占的调度器"""
def __init__(self):
self.running = []
self.preempted = []
def preempt(self, job_id: str):
"""抢占低优先级任务"""
for i, (priority, jid, gpus) in enumerate(self.running):
if jid == job_id and priority < 5:
self.running.pop(i)
self.preempted.append((priority, jid, gpus))
print(f"任务 {job_id} 被抢占")
return True
return False
def restore(self, job_id: str):
"""恢复被抢占的任务"""
for i, (priority, jid, gpus) in enumerate(self.preempted):
if jid == job_id:
self.preempted.pop(i)
self.running.append((priority, jid, gpus))
print(f"任务 {job_id} 已恢复")
return True
return False
监控与弹性伸缩
class GPUMonitor:
"""GPU资源监控"""
def __init__(self):
self.metrics = []
def collect_metrics(self) -> dict:
import subprocess
result = subprocess.run(
['nvidia-smi', '--query-gpu=index,utilization.gpu,memory.used,memory.total',
'--format=csv,noheader,nounits'],
capture_output=True, text=True
)
metrics = {}
for line in result.stdout.strip().split('\n'):
idx, util, mem_used, mem_total = line.split(', ')
metrics[int(idx)] = {
'utilization': float(util),
'memory_used': float(mem_used),
'memory_total': float(mem_total),
}
return metrics
def should_scale_out(self, metrics: dict, threshold: float = 0.8) -> bool:
"""判断是否需要扩容"""
avg_util = sum(m['utilization'] for m in metrics.values()) / len(metrics)
return avg_util > threshold * 100
最佳实践
- 使用Kubernetes Device Plugin管理GPU资源
- 为不同优先级的任务设置资源配额
- 监控GPU利用率,实现自动弹性伸缩
- 利用MIG技术分割大GPU为小实例
- 高带宽互联环境下优先使用张量并行