← 返回首页
🧠

LLM成本优化:降本增效的实战策略

📂 llm ⏱ 4 min 757 words

--- title: "LLM成本优化:降本增效的实战策略" description: "系统介绍LLM应用的成本优化方法,包括Token效率、缓存策略和资源调度" tags: ["成本优化", "Token效率", "缓存策略"] category: "llm" icon: "🧠"

LLM成本优化:降本增效的实战策略

LLM成本构成分析

大语言模型的使用成本主要包括以下几个部分:

以GPT-4为例,处理100万Token的成本约为$30-60,对于高频应用来说是一笔不小的开支。通过系统化的成本优化,可以将总成本降低50%以上。

Token效率优化

Prompt压缩

减少不必要的Token消耗:

class PromptOptimizer:
    def __init__(self):
        self.stop_words = {"的", "了", "是", "在", "有", "和", "与"}

    def compress_prompt(self, prompt: str) -> str:
        """移除冗余信息,压缩Prompt"""
        # 移除多余空白
        prompt = " ".join(prompt.split())

        # 移除冗余标点
        prompt = prompt.replace(",,", ",").replace("。。", "。")

        return prompt

    def optimize_system_prompt(self, system_prompt: str) -> str:
        """优化系统提示词"""
        # 使用更简洁的表达
        optimizations = {
            "请你帮我": "帮我",
            "请回答以下问题": "回答",
            "请用中文回答": "用中文回答",
        }
        for old, new in optimizations.items():
            system_prompt = system_prompt.replace(old, new)
        return system_prompt

    def count_tokens(self, text: str) -> int:
        """估算Token数量"""
        # 简单估算:中文每字符约1.5Token,英文每单词约1Token
        chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english_words = len(text.split()) - chinese_chars
        return int(chinese_chars * 1.5 + english_words)

响应长度控制

class ResponseLengthController:
    def __init__(self, max_tokens: int = 1000):
        self.max_tokens = max_tokens

    def generate_with_limit(self, prompt: str, target_length: str = "medium"):
        length_map = {
            "short": {"max_tokens": 200, "temperature": 0.3},
            "medium": {"max_tokens": 500, "temperature": 0.7},
            "long": {"max_tokens": 1000, "temperature": 0.8}
        }

        params = length_map.get(target_length, length_map["medium"])

        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=params["max_tokens"],
            temperature=params["temperature"]
        )

        return response

缓存策略

语义缓存

import hashlib
import json
from typing import Dict, Optional
from datetime import datetime, timedelta

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.95):
        self.cache = {}
        self.similarity_threshold = similarity_threshold

    def get_cache_key(self, prompt: str) -> str:
        """生成缓存键"""
        normalized = prompt.strip().lower()
        return hashlib.sha256(normalized.encode()).hexdigest()

    def get(self, prompt: str) -> Optional[Dict]:
        """查询缓存"""
        cache_key = self.get_cache_key(prompt)

        if cache_key in self.cache:
            entry = self.cache[cache_key]
            if datetime.now() < entry["expires_at"]:
                return entry["response"]

        # 语义相似度匹配
        for key, entry in self.cache.items():
            if self._semantic_similarity(prompt, entry["prompt"]) > self.similarity_threshold:
                if datetime.now() < entry["expires_at"]:
                    return entry["response"]

        return None

    def set(self, prompt: str, response: Dict, ttl_hours: int = 24):
        """设置缓存"""
        cache_key = self.get_cache_key(prompt)
        self.cache[cache_key] = {
            "prompt": prompt,
            "response": response,
            "created_at": datetime.now(),
            "expires_at": datetime.now() + timedelta(hours=ttl_hours)
        }

    def _semantic_similarity(self, text1: str, text2: str) -> float:
        """计算语义相似度"""
        # 简化的相似度计算
        words1 = set(text1.split())
        words2 = set(text2.split())
        intersection = words1 & words2
        union = words1 | words2
        return len(intersection) / len(union) if union else 0

批量请求合并

from typing import List, Dict
import asyncio

class RequestBatcher:
    def __init__(self, max_batch_size: int = 10, max_wait_time: float = 0.1):
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.pending_requests = []

    async def add_request(self, prompt: str) -> Dict:
        """添加请求到批次"""
        future = asyncio.Future()
        self.pending_requests.append({
            "prompt": prompt,
            "future": future
        })

        if len(self.pending_requests) >= self.max_batch_size:
            await self._process_batch()

        return await future

    async def _process_batch(self):
        """处理批次请求"""
        batch = self.pending_requests[:self.max_batch_size]
        self.pending_requests = self.pending_requests[self.max_batch_size:]

        # 批量调用API
        prompts = [req["prompt"] for req in batch]
        responses = await self._batch_inference(prompts)

        # 设置结果
        for req, response in zip(batch, responses):
            req["future"].set_result(response)

    async def _batch_inference(self, prompts: List[str]) -> List[Dict]:
        """批量推理"""
        # 这里可以使用支持批量推理的模型服务
        responses = []
        for prompt in prompts:
            response = await self._call_model(prompt)
            responses.append(response)
        return responses

模型选择策略

智能模型路由

class ModelRouter:
    def __init__(self):
        self.models = {
            "gpt-4": {"cost_per_1k": 0.06, "quality": 0.95},
            "gpt-3.5-turbo": {"cost_per_1k": 0.002, "quality": 0.8},
            "claude-3-haiku": {"cost_per_1k": 0.0025, "quality": 0.85},
            "local-7b": {"cost_per_1k": 0.0001, "quality": 0.7}
        }

    def route_request(self, prompt: str, task_type: str) -> str:
        """根据任务类型选择合适的模型"""
        if task_type == "simple_qa":
            return "local-7b"  # 简单问题使用本地小模型
        elif task_type == "code_generation":
            return "gpt-4"  # 代码生成需要高质量模型
        elif task_type == "summarization":
            return "gpt-3.5-turbo"  # 摘要任务可以使用中等模型
        else:
            return "gpt-3.5-turbo"

    def estimate_cost(self, model: str, tokens: int) -> float:
        """估算成本"""
        return self.models[model]["cost_per_1k"] * tokens / 1000

资源调度优化

GPU资源池化

class GPUPool:
    def __init__(self, total_gpus: int = 8):
        self.total_gpus = total_gpus
        self.available_gpus = total_gpus
        self.gpu_usage = {}

    def allocate(self, model_name: str, required_gpus: int) -> bool:
        """分配GPU资源"""
        if self.available_gpus >= required_gpus:
            self.available_gpus -= required_gpus
            self.gpu_usage[model_name] = required_gpus
            return True
        return False

    def release(self, model_name: str):
        """释放GPU资源"""
        if model_name in self.gpu_usage:
            self.available_gpus += self.gpu_usage[model_name]
            del self.gpu_usage[model_name]

    def get_utilization(self) -> float:
        """获取GPU利用率"""
        return (self.total_gpus - self.available_gpus) / self.total_gpus

自动扩缩容

class AutoScaler:
    def __init__(self, min_instances: int = 1, max_instances: int = 10):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances

    def scale_based_on_load(self, metrics: Dict):
        """根据负载自动扩缩容"""
        cpu_usage = metrics.get("cpu_usage", 0)
        gpu_usage = metrics.get("gpu_usage", 0)
        request_queue = metrics.get("request_queue", 0)

        # 扩容条件
        if (cpu_usage > 0.7 or gpu_usage > 0.8 or request_queue > 100):
            self._scale_up()

        # 缩容条件
        if (cpu_usage < 0.3 and gpu_usage < 0.4 and request_queue < 10):
            self._scale_down()

    def _scale_up(self):
        if self.current_instances < self.max_instances:
            self.current_instances += 1
            self._deploy_instance()

    def _scale_down(self):
        if self.current_instances > self.min_instances:
            self.current_instances -= 1
            self._remove_instance()

成本监控与报告

class CostMonitor:
    def __init__(self):
        self.costs = []

    def record_cost(self, category: str, amount: float, details: Dict):
        """记录成本"""
        self.costs.append({
            "category": category,
            "amount": amount,
            "details": details,
            "timestamp": datetime.now()
        })

    def generate_report(self, period: str = "daily") -> Dict:
        """生成成本报告"""
        from collections import defaultdict
        category_costs = defaultdict(float)
        total_cost = 0

        for cost in self.costs:
            category_costs[cost["category"]] += cost["amount"]
            total_cost += cost["amount"]

        return {
            "period": period,
            "total_cost": total_cost,
            "by_category": dict(category_costs),
            "cost_per_request": total_cost / max(len(self.costs), 1)
        }

    def set_budget_alert(self, monthly_budget: float, alert_threshold: float = 0.8):
        """设置预算告警"""
        current_month_cost = sum(
            c["amount"] for c in self.costs
            if c["timestamp"].month == datetime.now().month
        )
        if current_month_cost > monthly_budget * alert_threshold:
            self._send_alert(f"月度成本已达预算的{current_month_cost/monthly_budget*100:.1f}%")

成本优化清单

优化方向 预期节省 实施难度
Prompt压缩 10-20%
语义缓存 30-50%
批量请求 15-25%
模型路由 40-60%
自动扩缩容 20-30%

通过系统化的成本优化策略,可以显著降低LLM应用的运营成本,同时保持服务质量。建议从简单的优化开始,逐步实施更复杂的策略。