Skip to Content

21e - 成本优化策略

本文是《AI Agent 实战手册》第 21 章第 5 节。 上一节:21d-Langfuse设置指南 | 下一节:21f-生产告警与质量指标

⏱ 阅读时间:80 分钟 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:LLM API 使用经验、Python/TypeScript 开发经验

概述

LLM 应用的运营成本是制约规模化的核心瓶颈。一个日处理 10 万次请求的 AI 客服系统,如果全部使用旗舰模型,月度 API 账单可能高达 $5,000-$50,000。但通过系统化的成本优化——Token 监控与预算管理、语义缓存、智能模型路由、批处理、Prompt 缓存和 Prompt 压缩——大多数团队可以在不牺牲质量的前提下降低 60-80% 的 LLM 开支。本节将逐一拆解这些策略,提供完整的代码示例和实战案例。


1. 主流模型价格对比(2025-2026)

在优化之前,先了解当前的价格格局。以下是主流模型的 API 定价(截至 2025 年中):

工具推荐

模型提供商输入价格(/1M tokens)输出价格(/1M tokens)上下文窗口适用场景
GPT-4.1OpenAI$2.00$8.001M复杂推理、长上下文
GPT-4.1 miniOpenAI$0.40$1.601M日常任务、高性价比
GPT-4.1 nanoOpenAI$0.10$0.401M分类、提取、简单任务
GPT-4oOpenAI$2.50$10.00128K多模态、通用
GPT-4o miniOpenAI$0.15$0.60128K轻量任务
Claude Sonnet 4Anthropic$3.00$15.00200K编码、复杂分析
Claude Haiku 3.5Anthropic$0.80$4.00200K快速响应、分类
Gemini 2.5 ProGoogle$1.25-$2.50$10.00-$15.001M长上下文、多模态
Gemini 2.5 FlashGoogle$0.15$0.601M高速、低成本
Mistral MediumMistral$0.40$2.00128K欧洲合规、性价比
Llama 3.1 70BMeta (自托管)~$0.20~$0.20128K数据主权、自托管
DeepSeek V3DeepSeek$0.27$1.10128K极致性价比

💡 关键洞察:旗舰模型(GPT-4.1、Claude Sonnet 4)与轻量模型(GPT-4.1 nano、Gemini Flash)之间存在 10-30 倍的价格差距。成本优化的核心就是让合适的模型处理合适的任务。


2. Token 监控与预算管理

Token 用量是 LLM 成本的直接驱动因素。没有监控就没有优化——你需要知道钱花在了哪里。

工具推荐

工具用途价格适用场景
Langfuse开源 LLM 可观测性,含成本追踪免费(自托管)数据主权要求高的团队
LangSmithLangChain 官方追踪平台免费(开发者版)LangChain 生态用户
HeliconeLLM 代理网关,一行代码集成免费(10K 请求/月)快速集成、小团队
LiteLLM统一 API 网关 + 成本追踪免费(开源)多模型统一管理
OpenAI Usage Dashboard官方用量仪表板免费OpenAI 用户基础监控

操作步骤

步骤 1:使用 LiteLLM 统一追踪多模型成本

LiteLLM 是一个开源的 LLM API 网关,支持 100+ 模型的统一调用和成本追踪:

# pip install litellm import litellm from litellm import completion # 开启成本追踪回调 litellm.success_callback = ["langfuse"] # 或 "helicone", "lunary" # 调用任意模型——自动追踪成本 response = completion( model="gpt-4.1-mini", messages=[{"role": "user", "content": "总结这段文本"}], ) # 获取本次调用的成本信息 cost = litellm.completion_cost(completion_response=response) print(f"本次调用成本: ${cost:.6f}") print(f"输入 Token: {response.usage.prompt_tokens}") print(f"输出 Token: {response.usage.completion_tokens}")

步骤 2:构建 Token 预算管理系统

import redis import json from datetime import datetime, timedelta from typing import Optional class TokenBudgetManager: """Token 预算管理器——按用户/功能/模型追踪和限制用量""" def __init__(self, redis_url: str = "redis://localhost:6379"): self.redis = redis.from_url(redis_url) def _budget_key(self, entity_type: str, entity_id: str, period: str) -> str: """生成预算键:budget:{type}:{id}:{period}""" if period == "daily": date_str = datetime.utcnow().strftime("%Y-%m-%d") elif period == "monthly": date_str = datetime.utcnow().strftime("%Y-%m") return f"budget:{entity_type}:{entity_id}:{date_str}" def record_usage( self, user_id: str, feature: str, model: str, input_tokens: int, output_tokens: int, cost_usd: float ): """记录一次 LLM 调用的用量""" pipe = self.redis.pipeline() now = datetime.utcnow() # 按用户记录日/月用量 for period in ["daily", "monthly"]: key = self._budget_key("user", user_id, period) pipe.hincrbyfloat(key, "cost", cost_usd) pipe.hincrby(key, "input_tokens", input_tokens) pipe.hincrby(key, "output_tokens", output_tokens) pipe.hincrby(key, "requests", 1) pipe.expire(key, 86400 * 35) # 35 天过期 # 按功能记录 feature_key = self._budget_key("feature", feature, "daily") pipe.hincrbyfloat(feature_key, "cost", cost_usd) pipe.hincrby(feature_key, "requests", 1) pipe.expire(feature_key, 86400 * 7) # 按模型记录 model_key = self._budget_key("model", model, "daily") pipe.hincrbyfloat(model_key, "cost", cost_usd) pipe.hincrby(model_key, "input_tokens", input_tokens) pipe.expire(model_key, 86400 * 7) pipe.execute() def check_budget( self, user_id: str, monthly_limit_usd: float = 10.0 ) -> dict: """检查用户是否超出预算""" key = self._budget_key("user", user_id, "monthly") data = self.redis.hgetall(key) current_cost = float(data.get(b"cost", 0)) remaining = monthly_limit_usd - current_cost return { "user_id": user_id, "current_cost": round(current_cost, 4), "monthly_limit": monthly_limit_usd, "remaining": round(max(0, remaining), 4), "usage_percent": round(current_cost / monthly_limit_usd * 100, 1), "is_over_budget": current_cost >= monthly_limit_usd } def get_cost_breakdown(self, entity_type: str, period: str = "daily") -> dict: """获取成本分布(按用户/功能/模型)""" pattern = f"budget:{entity_type}:*:{datetime.utcnow().strftime('%Y-%m-%d' if period == 'daily' else '%Y-%m')}" breakdown = {} for key in self.redis.scan_iter(match=pattern): entity_id = key.decode().split(":")[2] data = self.redis.hgetall(key) breakdown[entity_id] = { "cost": round(float(data.get(b"cost", 0)), 4), "requests": int(data.get(b"requests", 0)) } return breakdown # 使用示例 budget = TokenBudgetManager() # 在每次 LLM 调用前检查预算 status = budget.check_budget("user_123", monthly_limit_usd=50.0) if status["is_over_budget"]: raise Exception(f"用户 {status['user_id']} 已超出月度预算 (${status['current_cost']}/${status['monthly_limit']})") # 调用后记录用量 budget.record_usage( user_id="user_123", feature="customer-service", model="gpt-4.1-mini", input_tokens=500, output_tokens=200, cost_usd=0.00052 )

步骤 3:设置成本告警

import smtplib from email.mime.text import MIMEText class CostAlertManager: """成本告警管理器""" def __init__(self, budget_manager: TokenBudgetManager): self.budget = budget_manager self.alert_thresholds = [0.5, 0.8, 0.95, 1.0] # 50%, 80%, 95%, 100% def check_and_alert(self, user_id: str, monthly_limit: float): status = self.budget.check_budget(user_id, monthly_limit) usage_ratio = status["current_cost"] / monthly_limit for threshold in self.alert_thresholds: if usage_ratio >= threshold: alert_key = f"alert:{user_id}:{threshold}" # 每个阈值只告警一次 if not self.budget.redis.exists(alert_key): self._send_alert(user_id, status, threshold) self.budget.redis.setex(alert_key, 86400 * 30, "sent") break def _send_alert(self, user_id: str, status: dict, threshold: float): """发送告警(可替换为 Slack/钉钉/企业微信)""" level = "🔴 严重" if threshold >= 0.95 else "🟡 警告" if threshold >= 0.8 else "🟢 提醒" message = ( f"{level} LLM 成本告警\n" f"用户: {user_id}\n" f"当前花费: ${status['current_cost']}\n" f"月度预算: ${status['monthly_limit']}\n" f"使用率: {status['usage_percent']}%\n" f"剩余额度: ${status['remaining']}" ) print(message) # 替换为实际的通知发送

提示词模板

你是一个 LLM 成本分析师。请根据以下用量数据生成优化报告: ## 当前用量数据 - 月度总成本:$[金额] - 日均请求量:[数量] - 模型使用分布:[GPT-4.1: X%, GPT-4.1-mini: Y%, Claude Sonnet: Z%] - 平均输入 Token:[N] / 平均输出 Token:[M] - 成本最高的功能模块:[模块名称] ($[金额]/月) - 缓存命中率:[百分比] ## 请输出 1. 成本异常点分析(哪些调用的单次成本异常高?) 2. 模型降级建议(哪些场景可以用更便宜的模型?) 3. Token 优化建议(如何减少输入/输出 Token?) 4. 缓存策略建议(哪些查询模式适合缓存?) 5. 预计优化后的月度成本和节省比例

3. 语义缓存(Semantic Caching)

语义缓存是 LLM 成本优化中投入产出比最高的策略之一。传统缓存要求查询完全匹配,而语义缓存通过向量相似度匹配语义相近的查询,复用已有的 LLM 响应。研究表明,语义缓存可以减少 50-90% 的 API 调用,同时将响应延迟从秒级降至毫秒级。

3.1 语义缓存原理

传统缓存(精确匹配): "北京天气怎么样?" → 缓存命中 ✅ "北京今天天气如何?" → 缓存未命中 ❌(不同的字符串) 语义缓存(向量相似度匹配): "北京天气怎么样?" → 缓存命中 ✅ "北京今天天气如何?" → 缓存命中 ✅(语义相似度 > 阈值) "上海天气怎么样?" → 缓存未命中 ❌(语义不同)
┌──────────────────────────────────────────────────────┐ │ 语义缓存工作流程 │ │ │ │ 用户查询 ──→ Embedding ──→ 向量相似度搜索 │ │ │ │ │ ┌─────────┴─────────┐ │ │ │ │ │ │ 相似度 ≥ 阈值 相似度 < 阈值 │ │ │ │ │ │ 返回缓存响应 调用 LLM API │ │ (延迟 ~5ms) (延迟 ~2000ms) │ │ │ │ │ 存入缓存(向量+响应) │ └──────────────────────────────────────────────────────┘

工具推荐

工具用途价格适用场景
Redis LangCacheRedis 官方语义缓存方案免费(开源)/ Cloud 起步 $7/月生产级语义缓存
GPTCache开源 LLM 缓存库免费(开源)快速原型、自定义缓存策略
LangChain CacheLangChain 内置缓存免费LangChain 生态用户
Momento无服务器缓存免费(5 GB)Serverless 架构

操作步骤

步骤 1:使用 Redis 构建语义缓存

# pip install redis langchain-openai numpy import json import hashlib import numpy as np from typing import Optional import redis import openai class SemanticCache: """基于 Redis 的语义缓存实现""" def __init__( self, redis_url: str = "redis://localhost:6379", similarity_threshold: float = 0.92, ttl_seconds: int = 3600, embedding_model: str = "text-embedding-3-small" ): self.redis = redis.from_url(redis_url) self.threshold = similarity_threshold self.ttl = ttl_seconds self.embedding_model = embedding_model self.client = openai.OpenAI() def _get_embedding(self, text: str) -> list[float]: """获取文本的向量嵌入""" response = self.client.embeddings.create( model=self.embedding_model, input=text ) return response.data[0].embedding def _cosine_similarity(self, a: list[float], b: list[float]) -> float: """计算余弦相似度""" a, b = np.array(a), np.array(b) return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) def get(self, query: str) -> Optional[str]: """查询语义缓存""" query_embedding = self._get_embedding(query) # 遍历缓存中的所有条目(生产环境应使用 Redis Vector Search) for key in self.redis.scan_iter(match="sem_cache:*"): cached = json.loads(self.redis.get(key)) similarity = self._cosine_similarity( query_embedding, cached["embedding"] ) if similarity >= self.threshold: # 更新访问时间(LRU) self.redis.expire(key, self.ttl) return cached["response"] return None def set(self, query: str, response: str): """存入语义缓存""" embedding = self._get_embedding(query) cache_key = f"sem_cache:{hashlib.md5(query.encode()).hexdigest()}" self.redis.setex( cache_key, self.ttl, json.dumps({ "query": query, "embedding": embedding, "response": response }) ) # 使用示例 cache = SemanticCache(similarity_threshold=0.92) def cached_llm_call(query: str) -> str: # 1. 先查缓存 cached = cache.get(query) if cached: print("✅ 缓存命中!节省一次 API 调用") return cached # 2. 缓存未命中,调用 LLM print("❌ 缓存未命中,调用 API...") client = openai.OpenAI() response = client.chat.completions.create( model="gpt-4.1-mini", messages=[{"role": "user", "content": query}] ) result = response.choices[0].message.content # 3. 存入缓存 cache.set(query, result) return result # 第一次调用——缓存未命中 result1 = cached_llm_call("Python 的 GIL 是什么?") # 第二次调用——语义相似,缓存命中 result2 = cached_llm_call("请解释一下 Python 中的全局解释器锁")

步骤 2:使用 Redis LangCache(生产推荐)

Redis 官方在 2025 年推出了 LangCache,专为 LLM 语义缓存设计:

# pip install redis redisvl from redisvl.extensions.llmcache import SemanticCache as RedisSemanticCache # 初始化 Redis 语义缓存 cache = RedisSemanticCache( name="llm_cache", redis_url="redis://localhost:6379", distance_threshold=0.08, # 越小越严格(L2 距离) ttl=3600 ) # 检查缓存 results = cache.check(prompt="什么是机器学习?") if results: print(f"缓存命中: {results[0]['response']}") else: # 调用 LLM 并存入缓存 response = call_llm("什么是机器学习?") cache.store( prompt="什么是机器学习?", response=response )

步骤 3:使用 GPTCache

# pip install gptcache from gptcache import cache as gpt_cache from gptcache.adapter import openai as cached_openai from gptcache.embedding import Onnx from gptcache.manager import CacheBase, VectorBase, get_data_manager from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation # 初始化 GPTCache onnx = Onnx() cache_base = CacheBase("sqlite") vector_base = VectorBase("faiss", dimension=onnx.dimension) data_manager = get_data_manager(cache_base, vector_base) gpt_cache.init( embedding_func=onnx.to_embeddings, data_manager=data_manager, similarity_evaluation=SearchDistanceEvaluation() ) # 使用缓存的 OpenAI 调用 response = cached_openai.ChatCompletion.create( model="gpt-4.1-mini", messages=[{"role": "user", "content": "解释量子计算"}] )

缓存失效策略

class CacheInvalidationStrategy: """缓存失效策略管理""" def __init__(self, cache: SemanticCache): self.cache = cache def invalidate_by_pattern(self, pattern: str): """按模式失效(如某个功能的所有缓存)""" for key in self.cache.redis.scan_iter(match=f"sem_cache:{pattern}*"): self.cache.redis.delete(key) def invalidate_stale(self, max_age_hours: int = 24): """失效过期缓存""" for key in self.cache.redis.scan_iter(match="sem_cache:*"): ttl = self.cache.redis.ttl(key) if ttl < 0: # 无过期时间 self.cache.redis.expire(key, max_age_hours * 3600) def warm_cache(self, common_queries: list[str]): """预热缓存——用高频查询预先填充""" for query in common_queries: if not self.cache.get(query): response = call_llm(query) self.cache.set(query, response) print(f"预热: {query[:50]}...")

4. Prompt 缓存(Provider-Level Caching)

Prompt 缓存是由模型提供商在服务端实现的缓存机制,与语义缓存不同——它缓存的是 Prompt 前缀的内部计算结果(KV Cache),而非最终响应。对于包含大量系统提示、文档上下文的应用,Prompt 缓存可以降低 50-90% 的输入 Token 成本。

4.1 各提供商 Prompt 缓存对比

特性AnthropicOpenAIGoogle
缓存折扣90%(缓存 Token 仅收 10%)50%(缓存 Token 收 50%)75%(缓存 Token 收 25%)
最小前缀长度1,024 tokens1,024 tokens自动
缓存 TTL5 分钟(可延长)自动管理自动管理
启用方式手动标记 cache_control自动(默认开启)自动
延迟改善最高 85%最高 50%最高 70%

操作步骤

步骤 1:Anthropic Prompt 缓存

Anthropic 的 Prompt 缓存需要手动标记缓存断点,缓存命中时输入 Token 成本降低 90%:

import anthropic client = anthropic.Anthropic() # 大型系统提示 + 文档上下文(适合缓存) system_prompt = """你是一个专业的法律顾问 AI 助手。 以下是公司的完整法律政策文档(约 50,000 字): ... [大量文档内容] ... """ response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, system=[ { "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"} # 标记为可缓存 } ], messages=[ {"role": "user", "content": "员工离职需要提前多久通知?"} ] ) # 查看缓存状态 print(f"缓存创建 Token: {response.usage.cache_creation_input_tokens}") print(f"缓存命中 Token: {response.usage.cache_read_input_tokens}") print(f"普通输入 Token: {response.usage.input_tokens}") # 第一次调用:cache_creation_input_tokens = 15000(创建缓存,收 25% 溢价) # 后续调用:cache_read_input_tokens = 15000(命中缓存,仅收 10%)

步骤 2:OpenAI 自动 Prompt 缓存

OpenAI 的 Prompt 缓存是自动的,无需额外代码。当连续请求共享相同的 Prompt 前缀(≥1,024 tokens)时,自动触发缓存:

import openai client = openai.OpenAI() # 大型系统提示(自动缓存) large_system_prompt = """你是一个代码审查助手。 以下是项目的编码规范(约 10,000 字): ... [大量规范内容] ... """ # 多次调用共享相同的系统提示——自动触发缓存 for code_snippet in code_snippets_to_review: response = client.chat.completions.create( model="gpt-4.1", messages=[ {"role": "system", "content": large_system_prompt}, {"role": "user", "content": f"请审查以下代码:\n{code_snippet}"} ] ) # 查看缓存状态 usage = response.usage if hasattr(usage, "prompt_tokens_details"): cached = usage.prompt_tokens_details.cached_tokens print(f"缓存命中 Token: {cached} / 总输入: {usage.prompt_tokens}") # 缓存命中的 Token 按 50% 价格计费

步骤 3:最大化 Prompt 缓存命中率

# ✅ 好的做法:将不变的内容放在前面,变化的内容放在后面 messages = [ {"role": "system", "content": LARGE_STATIC_SYSTEM_PROMPT}, # 不变 → 缓存 {"role": "system", "content": STATIC_KNOWLEDGE_BASE}, # 不变 → 缓存 {"role": "user", "content": dynamic_user_query} # 变化 → 不缓存 ] # ❌ 坏的做法:将动态内容插入到静态内容中间 messages = [ {"role": "system", "content": f"当前时间:{datetime.now()}"}, # 每次变化 → 破坏缓存 {"role": "system", "content": LARGE_STATIC_SYSTEM_PROMPT}, # 前缀已变 → 无法缓存 {"role": "user", "content": dynamic_user_query} ] # ✅ 优化后:动态内容移到最后 messages = [ {"role": "system", "content": LARGE_STATIC_SYSTEM_PROMPT}, # 不变 → 缓存 {"role": "system", "content": f"当前时间:{datetime.now()}"}, # 变化但在后面 {"role": "user", "content": dynamic_user_query} ]

5. 智能模型路由(Model Routing)

智能模型路由是成本优化中效果最显著的策略之一。核心思想:不是所有请求都需要旗舰模型。将 70% 的简单查询路由到廉价模型,只让 30% 的复杂查询使用旗舰模型,可以降低 60%+ 的成本。

工具推荐

工具用途价格适用场景
LiteLLM开源 LLM 网关 + 路由免费(开源)自托管、完全控制
OpenRouter统一 API 网关按模型定价(无额外费用)快速接入 300+ 模型
MartianAI 驱动的智能路由按用量计费自动质量-成本优化
Unify AI模型路由优化平台免费(基础版)基准测试驱动的路由

操作步骤

步骤 1:基于规则的模型路由

import re from enum import Enum from typing import Optional import litellm class TaskComplexity(Enum): SIMPLE = "simple" # 分类、提取、格式化 MEDIUM = "medium" # 摘要、翻译、一般问答 COMPLEX = "complex" # 推理、编码、创意写作 CRITICAL = "critical" # 高风险决策、法律/医疗 class ModelRouter: """基于规则的智能模型路由器""" # 模型配置:按复杂度映射到不同模型 MODEL_MAP = { TaskComplexity.SIMPLE: { "model": "gpt-4.1-nano", "cost_per_1m_input": 0.10, "cost_per_1m_output": 0.40, }, TaskComplexity.MEDIUM: { "model": "gpt-4.1-mini", "cost_per_1m_input": 0.40, "cost_per_1m_output": 1.60, }, TaskComplexity.COMPLEX: { "model": "gpt-4.1", "cost_per_1m_input": 2.00, "cost_per_1m_output": 8.00, }, TaskComplexity.CRITICAL: { "model": "claude-sonnet-4-20250514", "cost_per_1m_input": 3.00, "cost_per_1m_output": 15.00, }, } # 任务类型到复杂度的映射规则 TASK_RULES = { "classify": TaskComplexity.SIMPLE, "extract": TaskComplexity.SIMPLE, "format": TaskComplexity.SIMPLE, "translate": TaskComplexity.MEDIUM, "summarize": TaskComplexity.MEDIUM, "qa": TaskComplexity.MEDIUM, "code_generate": TaskComplexity.COMPLEX, "reason": TaskComplexity.COMPLEX, "creative": TaskComplexity.COMPLEX, "legal": TaskComplexity.CRITICAL, "medical": TaskComplexity.CRITICAL, "financial": TaskComplexity.CRITICAL, } def classify_complexity( self, query: str, task_type: Optional[str] = None, token_count: int = 0 ) -> TaskComplexity: """判断查询的复杂度""" # 1. 如果明确指定了任务类型 if task_type and task_type in self.TASK_RULES: return self.TASK_RULES[task_type] # 2. 基于启发式规则判断 query_lower = query.lower() # 简单任务特征 simple_patterns = [ r"分类|归类|标签", r"提取|抽取|找出", r"格式化|转换格式", r"是否|是不是|对不对", ] for pattern in simple_patterns: if re.search(pattern, query_lower): return TaskComplexity.SIMPLE # 复杂任务特征 complex_patterns = [ r"写代码|编程|实现.*功能", r"分析.*原因|为什么.*", r"设计.*架构|方案", r"比较.*优缺点", ] for pattern in complex_patterns: if re.search(pattern, query_lower): return TaskComplexity.COMPLEX # 默认中等复杂度 return TaskComplexity.MEDIUM def route( self, messages: list[dict], task_type: Optional[str] = None, force_model: Optional[str] = None ) -> dict: """路由请求到合适的模型""" if force_model: return {"model": force_model, "complexity": "forced"} # 获取用户查询 user_query = "" for msg in reversed(messages): if msg["role"] == "user": user_query = msg["content"] break complexity = self.classify_complexity(user_query, task_type) config = self.MODEL_MAP[complexity] return { "model": config["model"], "complexity": complexity.value, "estimated_cost_per_1m_input": config["cost_per_1m_input"] } def call( self, messages: list[dict], task_type: Optional[str] = None, **kwargs ) -> dict: """路由并调用模型""" route_info = self.route(messages, task_type) response = litellm.completion( model=route_info["model"], messages=messages, **kwargs ) # 记录路由决策(用于后续优化) cost = litellm.completion_cost(completion_response=response) print(f"路由: {route_info['complexity']}{route_info['model']} | 成本: ${cost:.6f}") return { "response": response, "route_info": route_info, "cost": cost } # 使用示例 router = ModelRouter() # 简单分类任务 → 自动路由到 GPT-4.1 nano ($0.10/1M) result = router.call( messages=[{"role": "user", "content": "这条评论是正面还是负面的:'产品质量很好'"}], task_type="classify" ) # 复杂编码任务 → 自动路由到 GPT-4.1 ($2.00/1M) result = router.call( messages=[{"role": "user", "content": "用 Python 实现一个带重试机制的异步 HTTP 客户端"}], task_type="code_generate" )

步骤 2:使用 LiteLLM Router(带负载均衡和降级)

from litellm import Router # 配置 LiteLLM Router router = Router( model_list=[ { "model_name": "fast-model", # 逻辑名称 "litellm_params": { "model": "gpt-4.1-mini", "api_key": "sk-xxx", }, "model_info": {"id": "gpt-4.1-mini-1"} }, { "model_name": "fast-model", # 同名 = 负载均衡 "litellm_params": { "model": "gemini/gemini-2.5-flash", "api_key": "xxx", }, "model_info": {"id": "gemini-flash-1"} }, { "model_name": "powerful-model", "litellm_params": { "model": "gpt-4.1", "api_key": "sk-xxx", }, }, ], # 路由策略 routing_strategy="least-busy", # 或 "simple-shuffle", "latency-based-routing" # 降级配置 fallbacks=[ {"fast-model": ["powerful-model"]}, # fast-model 失败时降级到 powerful-model ], # 重试配置 num_retries=2, retry_after=5, # 超时配置 timeout=30, ) # 使用路由器调用 response = await router.acompletion( model="fast-model", messages=[{"role": "user", "content": "你好"}] )

步骤 3:级联路由(Cascading)——先用便宜模型,不满意再升级

import json class CascadingRouter: """级联路由:先用便宜模型,质量不达标时自动升级""" MODEL_CASCADE = [ {"model": "gpt-4.1-nano", "cost_factor": 1}, {"model": "gpt-4.1-mini", "cost_factor": 4}, {"model": "gpt-4.1", "cost_factor": 20}, ] def __init__(self, quality_threshold: float = 0.7): self.quality_threshold = quality_threshold def _assess_quality(self, response: str, query: str) -> float: """快速评估响应质量(使用启发式规则)""" score = 0.5 # 基础分 # 长度合理性 if 50 < len(response) < 5000: score += 0.1 # 包含结构化内容 if any(marker in response for marker in ["1.", "- ", "```", "**"]): score += 0.1 # 不包含不确定表述 uncertain = ["我不确定", "我不知道", "可能是", "也许"] if not any(u in response for u in uncertain): score += 0.1 # 与查询相关性(简单关键词匹配) query_words = set(query[:100].split()) response_words = set(response[:200].split()) overlap = len(query_words & response_words) / max(len(query_words), 1) score += overlap * 0.2 return min(score, 1.0) async def call(self, messages: list[dict]) -> dict: """级联调用:从最便宜的模型开始""" query = messages[-1]["content"] if messages else "" for i, config in enumerate(self.MODEL_CASCADE): response = await litellm.acompletion( model=config["model"], messages=messages ) result = response.choices[0].message.content quality = self._assess_quality(result, query) if quality >= self.quality_threshold or i == len(self.MODEL_CASCADE) - 1: return { "response": result, "model_used": config["model"], "cascade_level": i, "quality_score": quality, "cost": litellm.completion_cost(completion_response=response) } return {"response": result, "model_used": config["model"]}

6. 批处理(Batch Processing)

对于非实时场景(数据分析、内容生成、批量分类),批处理可以显著降低成本。OpenAI Batch API 提供 50% 的价格折扣,且支持大规模并行处理。

工具推荐

工具用途价格适用场景
OpenAI Batch API官方批处理接口50% 折扣非实时大批量处理
Anthropic Message BatchesAnthropic 批处理50% 折扣Claude 模型批量调用
Celery + Redis异步任务队列免费(开源)自建批处理管线
AWS SQS + Lambda云原生队列按用量(极低)Serverless 批处理

操作步骤

步骤 1:使用 OpenAI Batch API

OpenAI Batch API 接受 JSONL 文件作为输入,异步处理后返回结果,价格为标准 API 的 50%:

import json import time from openai import OpenAI client = OpenAI() # 1. 准备批处理输入文件(JSONL 格式) batch_requests = [] products = [ "iPhone 16 Pro Max 256GB 深空黑", "MacBook Air M4 15寸 16GB", "AirPods Pro 3 带听力健康功能", # ... 假设有 10,000 个产品 ] for i, product in enumerate(products): batch_requests.append({ "custom_id": f"product-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4.1-mini", "messages": [ {"role": "system", "content": "你是一个电商文案专家。为产品生成 50 字以内的营销描述。"}, {"role": "user", "content": f"产品:{product}"} ], "max_tokens": 200 } }) # 写入 JSONL 文件 with open("batch_input.jsonl", "w") as f: for req in batch_requests: f.write(json.dumps(req, ensure_ascii=False) + "\n") # 2. 上传文件 batch_file = client.files.create( file=open("batch_input.jsonl", "rb"), purpose="batch" ) # 3. 创建批处理任务 batch_job = client.batches.create( input_file_id=batch_file.id, endpoint="/v1/chat/completions", completion_window="24h" # 24 小时内完成 ) print(f"批处理任务已创建: {batch_job.id}") # 4. 轮询检查状态 while True: status = client.batches.retrieve(batch_job.id) print(f"状态: {status.status} | 完成: {status.request_counts.completed}/{status.request_counts.total}") if status.status in ["completed", "failed", "expired"]: break time.sleep(60) # 5. 下载结果 if status.status == "completed": result_file = client.files.content(status.output_file_id) results = [] for line in result_file.text.strip().split("\n"): result = json.loads(line) results.append({ "id": result["custom_id"], "description": result["response"]["body"]["choices"][0]["message"]["content"] }) print(f"成功处理 {len(results)} 个产品描述")

步骤 2:构建异步批处理队列

对于需要更灵活控制的场景,使用 Celery + Redis 构建自定义批处理管线:

# tasks.py from celery import Celery import litellm app = Celery("llm_tasks", broker="redis://localhost:6379/0") # 配置并发限制(避免触发 API 速率限制) app.conf.worker_concurrency = 10 app.conf.task_rate_limit = "50/m" # 每分钟最多 50 个任务 @app.task( bind=True, max_retries=3, default_retry_delay=60, rate_limit="50/m" ) def process_llm_request(self, request_id: str, messages: list, model: str = "gpt-4.1-mini"): """异步处理单个 LLM 请求""" try: response = litellm.completion( model=model, messages=messages, timeout=30 ) return { "request_id": request_id, "status": "success", "result": response.choices[0].message.content, "cost": litellm.completion_cost(completion_response=response) } except Exception as e: # 自动重试(指数退避) raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) # 批量提交任务 def submit_batch(items: list[dict]): """提交批量 LLM 任务""" from celery import group tasks = [] for item in items: task = process_llm_request.s( request_id=item["id"], messages=item["messages"], model=item.get("model", "gpt-4.1-mini") ) tasks.append(task) # 并行执行所有任务 job = group(tasks)() return job

步骤 3:批处理成本对比

场景:处理 10,000 个产品描述生成任务 模型:GPT-4.1 mini 平均输入:200 tokens / 平均输出:100 tokens 标准 API: 输入成本:10,000 × 200 / 1,000,000 × $0.40 = $0.80 输出成本:10,000 × 100 / 1,000,000 × $1.60 = $1.60 总成本:$2.40 Batch API(50% 折扣): 输入成本:$0.80 × 0.5 = $0.40 输出成本:$1.60 × 0.5 = $0.80 总成本:$1.20 节省:$1.20(50%)

7. Prompt 优化与压缩

减少输入 Token 数量是最直接的成本优化手段。通过 Prompt 压缩、结构优化和输出约束,可以在不影响质量的前提下减少 30-50% 的 Token 消耗。

操作步骤

步骤 1:Prompt 压缩技术

# 方法 1:移除冗余指令 # ❌ 冗余 Prompt(约 150 tokens) verbose_prompt = """ 我希望你能够帮助我完成以下任务。这个任务非常重要,请你认真对待。 我需要你分析一段文本,然后给出你的分析结果。 请确保你的分析是准确的、全面的、有深度的。 在分析的时候,请注意以下几点: 1. 请仔细阅读文本 2. 请从多个角度进行分析 3. 请给出具体的例子来支持你的观点 4. 请用清晰的语言表达你的分析结果 以下是需要分析的文本: """ # ✅ 精简 Prompt(约 30 tokens) concise_prompt = """分析以下文本,从多角度给出带具体例子的深度分析: """ # 方法 2:使用缩写和符号 # ❌ 冗长 long_format = """ 请按照以下格式输出: - 第一部分:总结(不超过100字) - 第二部分:关键发现(列出3-5个要点) - 第三部分:建议(列出2-3个可行建议) """ # ✅ 精简 short_format = """输出格式: 1) 总结(≤100字) 2) 关键发现(3-5点) 3) 建议(2-3条)""" # 方法 3:使用 JSON Schema 约束输出(减少输出 Token) response = client.chat.completions.create( model="gpt-4.1-mini", messages=[{"role": "user", "content": "分析这段评论的情感"}], response_format={ "type": "json_schema", "json_schema": { "name": "sentiment", "schema": { "type": "object", "properties": { "sentiment": {"type": "string", "enum": ["positive", "negative", "neutral"]}, "confidence": {"type": "number"}, "keywords": {"type": "array", "items": {"type": "string"}, "maxItems": 3} }, "required": ["sentiment", "confidence"] } } } )

步骤 2:上下文窗口优化

def optimize_context( documents: list[str], query: str, max_context_tokens: int = 4000, model: str = "gpt-4.1-mini" ) -> str: """优化上下文:只保留与查询最相关的内容""" import tiktoken encoder = tiktoken.encoding_for_model(model) # 1. 按相关性排序文档(简化版,生产环境用向量检索) scored_docs = [] query_words = set(query.lower().split()) for doc in documents: doc_words = set(doc.lower().split()) relevance = len(query_words & doc_words) / max(len(query_words), 1) scored_docs.append((relevance, doc)) scored_docs.sort(reverse=True) # 2. 贪心选择:按相关性依次加入,直到达到 Token 限制 selected = [] current_tokens = 0 for _, doc in scored_docs: doc_tokens = len(encoder.encode(doc)) if current_tokens + doc_tokens <= max_context_tokens: selected.append(doc) current_tokens += doc_tokens else: break return "\n---\n".join(selected)

步骤 3:输出 Token 控制

# 策略 1:明确限制输出长度 response = client.chat.completions.create( model="gpt-4.1-mini", messages=[{"role": "user", "content": "用一句话总结这篇文章"}], max_tokens=100 # 硬限制输出长度 ) # 策略 2:在 Prompt 中约束输出格式 constrained_prompt = """ 分类以下文本的主题。只输出类别名称,不要解释。 可选类别:技术、商业、娱乐、体育、科学 文本:{text} 类别:""" # 策略 3:使用 stop 序列提前终止 response = client.chat.completions.create( model="gpt-4.1-mini", messages=[{"role": "user", "content": "列出前3个要点:"}], stop=["4.", "4、"] # 在第4点之前停止 )

8. 速率限制与流量控制

速率限制不仅是防止超支的安全网,也是优化成本的重要手段。合理的限流可以平滑流量峰值、避免触发提供商的速率限制(导致重试成本),并为批处理留出配额。

操作步骤

步骤 1:实现令牌桶限流器

import time import asyncio from dataclasses import dataclass @dataclass class RateLimitConfig: requests_per_minute: int = 60 tokens_per_minute: int = 100_000 max_concurrent: int = 10 class TokenBucketRateLimiter: """令牌桶限流器——支持请求数和 Token 数双重限制""" def __init__(self, config: RateLimitConfig): self.config = config self.request_tokens = config.requests_per_minute self.token_tokens = config.tokens_per_minute self.last_refill = time.monotonic() self.semaphore = asyncio.Semaphore(config.max_concurrent) self._lock = asyncio.Lock() async def _refill(self): """补充令牌""" now = time.monotonic() elapsed = now - self.last_refill # 按时间比例补充 self.request_tokens = min( self.config.requests_per_minute, self.request_tokens + elapsed * self.config.requests_per_minute / 60 ) self.token_tokens = min( self.config.tokens_per_minute, self.token_tokens + elapsed * self.config.tokens_per_minute / 60 ) self.last_refill = now async def acquire(self, estimated_tokens: int = 1000): """获取调用许可""" while True: async with self._lock: await self._refill() if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens: self.request_tokens -= 1 self.token_tokens -= estimated_tokens return True # 等待令牌补充 await asyncio.sleep(0.5) async def call_with_limit(self, func, *args, estimated_tokens=1000, **kwargs): """带限流的调用""" await self.acquire(estimated_tokens) async with self.semaphore: return await func(*args, **kwargs) # 使用示例 limiter = TokenBucketRateLimiter(RateLimitConfig( requests_per_minute=50, tokens_per_minute=80_000, max_concurrent=5 )) async def rate_limited_call(messages): await limiter.acquire(estimated_tokens=500) return await litellm.acompletion( model="gpt-4.1-mini", messages=messages )

步骤 2:分层限流策略

class TieredRateLimiter: """分层限流:不同用户等级不同配额""" TIER_LIMITS = { "free": RateLimitConfig( requests_per_minute=10, tokens_per_minute=10_000, max_concurrent=2 ), "pro": RateLimitConfig( requests_per_minute=60, tokens_per_minute=100_000, max_concurrent=10 ), "enterprise": RateLimitConfig( requests_per_minute=300, tokens_per_minute=500_000, max_concurrent=50 ), } def __init__(self): self.limiters: dict[str, TokenBucketRateLimiter] = {} def get_limiter(self, user_id: str, tier: str = "free") -> TokenBucketRateLimiter: key = f"{user_id}:{tier}" if key not in self.limiters: config = self.TIER_LIMITS.get(tier, self.TIER_LIMITS["free"]) self.limiters[key] = TokenBucketRateLimiter(config) return self.limiters[key]

9. 综合成本优化架构

将以上所有策略组合成一个完整的成本优化管线:

┌─────────────────────────────────────────────────────────────┐ │ LLM 成本优化管线 │ │ │ │ 用户请求 │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 1. 速率限制 │ ← 令牌桶 + 用户配额 │ │ └──────┬───────┘ │ │ ▼ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 2. 语义缓存 │────→│ 缓存命中? │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ 是 │ │ │ 否 ▼ │ │ │ 返回缓存响应(~5ms) │ │ ▼ │ │ ┌──────────────┐ │ │ │ 3. Prompt 优化│ ← 压缩 + 上下文裁剪 │ │ └──────┬───────┘ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 4. 模型路由 │ ← 复杂度分类 → 选择模型 │ │ └──────┬───────┘ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 5. Prompt 缓存│ ← 提供商级别前缀缓存 │ │ └──────┬───────┘ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 6. API 调用 │ → 记录成本 → 更新预算 │ │ └──────┬───────┘ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 7. 存入缓存 │ → 更新语义缓存 │ │ └──────┬───────┘ │ │ ▼ │ │ 返回响应 │ └─────────────────────────────────────────────────────────────┘
class CostOptimizedLLMPipeline: """综合成本优化管线""" def __init__(self): self.rate_limiter = TokenBucketRateLimiter(RateLimitConfig()) self.semantic_cache = SemanticCache(similarity_threshold=0.92) self.model_router = ModelRouter() self.budget_manager = TokenBudgetManager() async def call( self, messages: list[dict], user_id: str, task_type: str = None, use_cache: bool = True ) -> dict: """完整的成本优化调用流程""" # 1. 预算检查 budget = self.budget_manager.check_budget(user_id) if budget["is_over_budget"]: return {"error": "月度预算已用尽", "budget": budget} # 2. 速率限制 await self.rate_limiter.acquire() # 3. 语义缓存查询 user_query = messages[-1]["content"] if messages else "" if use_cache: cached = self.semantic_cache.get(user_query) if cached: return { "response": cached, "source": "cache", "cost": 0.0 } # 4. 模型路由 route = self.model_router.route(messages, task_type) # 5. 调用 LLM import litellm response = await litellm.acompletion( model=route["model"], messages=messages ) result = response.choices[0].message.content cost = litellm.completion_cost(completion_response=response) # 6. 记录用量 self.budget_manager.record_usage( user_id=user_id, feature=task_type or "general", model=route["model"], input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens, cost_usd=cost ) # 7. 存入缓存 if use_cache: self.semantic_cache.set(user_query, result) return { "response": result, "source": "api", "model": route["model"], "complexity": route["complexity"], "cost": cost }

实战案例:AI 客服系统从 $47,000/月 降至 $8,200/月

场景描述

一家 SaaS 公司的 AI 客服系统,日均处理 50,000 次对话,全部使用 GPT-4o($2.50/$10.00 per 1M tokens)。月度 API 账单 $47,000,单次交互成本 $0.031。CEO 要求在不降低客户满意度的前提下,将成本降至 $15,000/月以内。

案例的具体操作流程

第 1 步:成本分析(使用 Langfuse)

# 接入 Langfuse 追踪,分析成本分布 # 发现: # - 45% 的查询是重复性问题(退货政策、配送时间、账户问题) # - 30% 的查询是简单分类/路由(判断问题类型后转人工) # - 15% 的查询需要知识库检索 + 生成 # - 10% 的查询涉及复杂推理(投诉处理、特殊情况)

第 2 步:实施语义缓存(节省 40%)

# 针对 45% 的重复性查询,部署语义缓存 cache = SemanticCache( similarity_threshold=0.90, # 客服场景可以稍微宽松 ttl_seconds=86400 # 24 小时缓存 ) # 预热缓存:用 Top 500 高频问题预填充 common_questions = [ "怎么退货?", "退款多久到账?", "配送需要几天?", "如何修改收货地址?", "会员有什么优惠?", ... ] invalidation = CacheInvalidationStrategy(cache) invalidation.warm_cache(common_questions) # 效果:缓存命中率 42%,月度节省 ~$19,740

第 3 步:实施模型路由(再节省 35%)

# 对未命中缓存的 58% 请求进行智能路由 router = ModelRouter() # 路由规则: # - 简单分类/路由(30%)→ GPT-4.1 nano ($0.10/$0.40) # - 知识库问答(15%)→ GPT-4.1 mini ($0.40/$1.60) # - 复杂推理(10%)→ GPT-4.1 ($2.00/$8.00) # - 投诉/敏感(3%)→ Claude Sonnet 4 ($3.00/$15.00) # 效果:平均模型成本从 $6.25/1M tokens 降至 $1.20/1M tokens

第 4 步:Prompt 优化(再节省 25%)

# 优化前:系统提示 2,000 tokens + 对话历史平均 1,500 tokens # 优化后: # 1. 精简系统提示:2,000 → 800 tokens # 2. 对话历史只保留最近 3 轮:1,500 → 600 tokens # 3. 使用 JSON Schema 约束输出格式 # 4. 利用 Anthropic Prompt 缓存(系统提示部分) # 效果:平均输入 Token 减少 45%

第 5 步:批处理非实时任务(再节省 10%)

# 将以下任务改为批处理: # - 每日对话质量评估(LLM-as-Judge) # - 每周客户反馈摘要生成 # - 知识库内容更新和索引 # 使用 OpenAI Batch API,享受 50% 折扣

案例分析

优化措施月度节省累计成本节省比例
优化前基线$47,000
+ 语义缓存-$19,740$27,26042%
+ 模型路由-$12,800$14,46069%
+ Prompt 优化-$3,600$10,86077%
+ 批处理-$1,200$9,66079%
+ Prompt 缓存-$1,460$8,20083%

最终结果:月度成本从 $47,000 降至 $8,200,节省 83%。单次交互成本从 $0.031 降至 $0.0054。客户满意度评分从 4.2 微升至 4.3(因为缓存命中时响应更快)。


避坑指南

❌ 常见错误

  1. 只关注模型价格,忽略总体架构

    • 问题:盲目选择最便宜的模型,导致质量下降、用户投诉增加、人工介入成本上升
    • 正确做法:先分析流量分布和任务类型,用模型路由让合适的模型处理合适的任务。便宜模型处理简单任务,旗舰模型处理复杂任务
  2. 语义缓存阈值设置不当

    • 问题:阈值太低(如 0.80)导致返回不相关的缓存结果,用户体验差;阈值太高(如 0.99)导致几乎没有缓存命中
    • 正确做法:从 0.92 开始,用真实查询日志做 A/B 测试,逐步调整。不同场景用不同阈值——FAQ 可以宽松(0.88),专业问答要严格(0.95)
  3. 忽略 Prompt 缓存的前缀顺序

    • 问题:将动态内容(时间戳、用户 ID)放在系统提示开头,导致每次请求的前缀都不同,Prompt 缓存完全失效
    • 正确做法:静态内容(系统提示、知识库)放前面,动态内容(用户查询、时间)放后面。Anthropic 的 cache_control 标记要放在静态内容的末尾
  4. 没有设置成本告警和预算上限

    • 问题:一个 bug 导致无限循环调用 API,一夜之间产生数千美元账单
    • 正确做法:在 OpenAI/Anthropic 后台设置月度硬限制。在应用层实现用户级和功能级预算管理,设置 50%/80%/95% 三级告警
  5. 批处理任务使用实时 API

    • 问题:数据分析、内容批量生成等非实时任务使用标准 API,白白多花 50% 的钱
    • 正确做法:所有不需要实时响应的任务(报告生成、数据标注、批量分类)都使用 Batch API。OpenAI 和 Anthropic 的 Batch API 都提供 50% 折扣
  6. 缓存没有失效策略

    • 问题:知识库更新后,缓存仍然返回旧信息,导致用户获得过时的回答
    • 正确做法:实现基于事件的缓存失效(知识库更新时清除相关缓存)、基于时间的 TTL(FAQ 缓存 24 小时、实时数据缓存 5 分钟)、定期预热高频查询

✅ 最佳实践

  1. 先监控,再优化:接入 Langfuse/Helicone 等工具,至少收集 1 周的成本数据,找到成本热点后再针对性优化
  2. 分层优化:按”缓存 → 路由 → Prompt 优化 → 批处理”的顺序实施,每步验证效果后再进入下一步
  3. 保持质量监控:每次优化后都要监控输出质量指标(用户满意度、任务完成率),确保成本优化不以质量为代价
  4. 定期审查模型定价:LLM 市场价格变化快(2024-2025 年间主流模型价格下降了 5-10 倍),每季度审查一次模型选择
  5. 为成本优化建立 Dashboard:在 Grafana 或 Langfuse 中建立成本仪表板,追踪日/周/月成本趋势、缓存命中率、模型分布

相关资源与延伸阅读


参考来源


📖 返回 总览与导航 | 上一节:21d-Langfuse设置指南 | 下一节:21f-生产告警与质量指标

Last updated on