21e - 成本优化策略
本文是《AI Agent 实战手册》第 21 章第 5 节。 上一节:21d-Langfuse设置指南 | 下一节:21f-生产告警与质量指标
⏱ 阅读时间:80 分钟 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:LLM API 使用经验、Python/TypeScript 开发经验
概述
LLM 应用的运营成本是制约规模化的核心瓶颈。一个日处理 10 万次请求的 AI 客服系统,如果全部使用旗舰模型,月度 API 账单可能高达 $5,000-$50,000。但通过系统化的成本优化——Token 监控与预算管理、语义缓存、智能模型路由、批处理、Prompt 缓存和 Prompt 压缩——大多数团队可以在不牺牲质量的前提下降低 60-80% 的 LLM 开支。本节将逐一拆解这些策略,提供完整的代码示例和实战案例。
1. 主流模型价格对比(2025-2026)
在优化之前,先了解当前的价格格局。以下是主流模型的 API 定价(截至 2025 年中):
工具推荐
| 模型 | 提供商 | 输入价格(/1M tokens) | 输出价格(/1M tokens) | 上下文窗口 | 适用场景 |
|---|---|---|---|---|---|
| GPT-4.1 | OpenAI | $2.00 | $8.00 | 1M | 复杂推理、长上下文 |
| GPT-4.1 mini | OpenAI | $0.40 | $1.60 | 1M | 日常任务、高性价比 |
| GPT-4.1 nano | OpenAI | $0.10 | $0.40 | 1M | 分类、提取、简单任务 |
| GPT-4o | OpenAI | $2.50 | $10.00 | 128K | 多模态、通用 |
| GPT-4o mini | OpenAI | $0.15 | $0.60 | 128K | 轻量任务 |
| Claude Sonnet 4 | Anthropic | $3.00 | $15.00 | 200K | 编码、复杂分析 |
| Claude Haiku 3.5 | Anthropic | $0.80 | $4.00 | 200K | 快速响应、分类 |
| Gemini 2.5 Pro | $1.25-$2.50 | $10.00-$15.00 | 1M | 长上下文、多模态 | |
| Gemini 2.5 Flash | $0.15 | $0.60 | 1M | 高速、低成本 | |
| Mistral Medium | Mistral | $0.40 | $2.00 | 128K | 欧洲合规、性价比 |
| Llama 3.1 70B | Meta (自托管) | ~$0.20 | ~$0.20 | 128K | 数据主权、自托管 |
| DeepSeek V3 | DeepSeek | $0.27 | $1.10 | 128K | 极致性价比 |
💡 关键洞察:旗舰模型(GPT-4.1、Claude Sonnet 4)与轻量模型(GPT-4.1 nano、Gemini Flash)之间存在 10-30 倍的价格差距。成本优化的核心就是让合适的模型处理合适的任务。
2. Token 监控与预算管理
Token 用量是 LLM 成本的直接驱动因素。没有监控就没有优化——你需要知道钱花在了哪里。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Langfuse | 开源 LLM 可观测性,含成本追踪 | 免费(自托管) | 数据主权要求高的团队 |
| LangSmith | LangChain 官方追踪平台 | 免费(开发者版) | LangChain 生态用户 |
| Helicone | LLM 代理网关,一行代码集成 | 免费(10K 请求/月) | 快速集成、小团队 |
| LiteLLM | 统一 API 网关 + 成本追踪 | 免费(开源) | 多模型统一管理 |
| OpenAI Usage Dashboard | 官方用量仪表板 | 免费 | OpenAI 用户基础监控 |
操作步骤
步骤 1:使用 LiteLLM 统一追踪多模型成本
LiteLLM 是一个开源的 LLM API 网关,支持 100+ 模型的统一调用和成本追踪:
# pip install litellm
import litellm
from litellm import completion
# 开启成本追踪回调
litellm.success_callback = ["langfuse"] # 或 "helicone", "lunary"
# 调用任意模型——自动追踪成本
response = completion(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": "总结这段文本"}],
)
# 获取本次调用的成本信息
cost = litellm.completion_cost(completion_response=response)
print(f"本次调用成本: ${cost:.6f}")
print(f"输入 Token: {response.usage.prompt_tokens}")
print(f"输出 Token: {response.usage.completion_tokens}")步骤 2:构建 Token 预算管理系统
import redis
import json
from datetime import datetime, timedelta
from typing import Optional
class TokenBudgetManager:
"""Token 预算管理器——按用户/功能/模型追踪和限制用量"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
def _budget_key(self, entity_type: str, entity_id: str, period: str) -> str:
"""生成预算键:budget:{type}:{id}:{period}"""
if period == "daily":
date_str = datetime.utcnow().strftime("%Y-%m-%d")
elif period == "monthly":
date_str = datetime.utcnow().strftime("%Y-%m")
return f"budget:{entity_type}:{entity_id}:{date_str}"
def record_usage(
self,
user_id: str,
feature: str,
model: str,
input_tokens: int,
output_tokens: int,
cost_usd: float
):
"""记录一次 LLM 调用的用量"""
pipe = self.redis.pipeline()
now = datetime.utcnow()
# 按用户记录日/月用量
for period in ["daily", "monthly"]:
key = self._budget_key("user", user_id, period)
pipe.hincrbyfloat(key, "cost", cost_usd)
pipe.hincrby(key, "input_tokens", input_tokens)
pipe.hincrby(key, "output_tokens", output_tokens)
pipe.hincrby(key, "requests", 1)
pipe.expire(key, 86400 * 35) # 35 天过期
# 按功能记录
feature_key = self._budget_key("feature", feature, "daily")
pipe.hincrbyfloat(feature_key, "cost", cost_usd)
pipe.hincrby(feature_key, "requests", 1)
pipe.expire(feature_key, 86400 * 7)
# 按模型记录
model_key = self._budget_key("model", model, "daily")
pipe.hincrbyfloat(model_key, "cost", cost_usd)
pipe.hincrby(model_key, "input_tokens", input_tokens)
pipe.expire(model_key, 86400 * 7)
pipe.execute()
def check_budget(
self,
user_id: str,
monthly_limit_usd: float = 10.0
) -> dict:
"""检查用户是否超出预算"""
key = self._budget_key("user", user_id, "monthly")
data = self.redis.hgetall(key)
current_cost = float(data.get(b"cost", 0))
remaining = monthly_limit_usd - current_cost
return {
"user_id": user_id,
"current_cost": round(current_cost, 4),
"monthly_limit": monthly_limit_usd,
"remaining": round(max(0, remaining), 4),
"usage_percent": round(current_cost / monthly_limit_usd * 100, 1),
"is_over_budget": current_cost >= monthly_limit_usd
}
def get_cost_breakdown(self, entity_type: str, period: str = "daily") -> dict:
"""获取成本分布(按用户/功能/模型)"""
pattern = f"budget:{entity_type}:*:{datetime.utcnow().strftime('%Y-%m-%d' if period == 'daily' else '%Y-%m')}"
breakdown = {}
for key in self.redis.scan_iter(match=pattern):
entity_id = key.decode().split(":")[2]
data = self.redis.hgetall(key)
breakdown[entity_id] = {
"cost": round(float(data.get(b"cost", 0)), 4),
"requests": int(data.get(b"requests", 0))
}
return breakdown
# 使用示例
budget = TokenBudgetManager()
# 在每次 LLM 调用前检查预算
status = budget.check_budget("user_123", monthly_limit_usd=50.0)
if status["is_over_budget"]:
raise Exception(f"用户 {status['user_id']} 已超出月度预算 (${status['current_cost']}/${status['monthly_limit']})")
# 调用后记录用量
budget.record_usage(
user_id="user_123",
feature="customer-service",
model="gpt-4.1-mini",
input_tokens=500,
output_tokens=200,
cost_usd=0.00052
)步骤 3:设置成本告警
import smtplib
from email.mime.text import MIMEText
class CostAlertManager:
"""成本告警管理器"""
def __init__(self, budget_manager: TokenBudgetManager):
self.budget = budget_manager
self.alert_thresholds = [0.5, 0.8, 0.95, 1.0] # 50%, 80%, 95%, 100%
def check_and_alert(self, user_id: str, monthly_limit: float):
status = self.budget.check_budget(user_id, monthly_limit)
usage_ratio = status["current_cost"] / monthly_limit
for threshold in self.alert_thresholds:
if usage_ratio >= threshold:
alert_key = f"alert:{user_id}:{threshold}"
# 每个阈值只告警一次
if not self.budget.redis.exists(alert_key):
self._send_alert(user_id, status, threshold)
self.budget.redis.setex(alert_key, 86400 * 30, "sent")
break
def _send_alert(self, user_id: str, status: dict, threshold: float):
"""发送告警(可替换为 Slack/钉钉/企业微信)"""
level = "🔴 严重" if threshold >= 0.95 else "🟡 警告" if threshold >= 0.8 else "🟢 提醒"
message = (
f"{level} LLM 成本告警\n"
f"用户: {user_id}\n"
f"当前花费: ${status['current_cost']}\n"
f"月度预算: ${status['monthly_limit']}\n"
f"使用率: {status['usage_percent']}%\n"
f"剩余额度: ${status['remaining']}"
)
print(message) # 替换为实际的通知发送提示词模板
你是一个 LLM 成本分析师。请根据以下用量数据生成优化报告:
## 当前用量数据
- 月度总成本:$[金额]
- 日均请求量:[数量]
- 模型使用分布:[GPT-4.1: X%, GPT-4.1-mini: Y%, Claude Sonnet: Z%]
- 平均输入 Token:[N] / 平均输出 Token:[M]
- 成本最高的功能模块:[模块名称] ($[金额]/月)
- 缓存命中率:[百分比]
## 请输出
1. 成本异常点分析(哪些调用的单次成本异常高?)
2. 模型降级建议(哪些场景可以用更便宜的模型?)
3. Token 优化建议(如何减少输入/输出 Token?)
4. 缓存策略建议(哪些查询模式适合缓存?)
5. 预计优化后的月度成本和节省比例3. 语义缓存(Semantic Caching)
语义缓存是 LLM 成本优化中投入产出比最高的策略之一。传统缓存要求查询完全匹配,而语义缓存通过向量相似度匹配语义相近的查询,复用已有的 LLM 响应。研究表明,语义缓存可以减少 50-90% 的 API 调用,同时将响应延迟从秒级降至毫秒级。
3.1 语义缓存原理
传统缓存(精确匹配):
"北京天气怎么样?" → 缓存命中 ✅
"北京今天天气如何?" → 缓存未命中 ❌(不同的字符串)
语义缓存(向量相似度匹配):
"北京天气怎么样?" → 缓存命中 ✅
"北京今天天气如何?" → 缓存命中 ✅(语义相似度 > 阈值)
"上海天气怎么样?" → 缓存未命中 ❌(语义不同)┌──────────────────────────────────────────────────────┐
│ 语义缓存工作流程 │
│ │
│ 用户查询 ──→ Embedding ──→ 向量相似度搜索 │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ │ │
│ 相似度 ≥ 阈值 相似度 < 阈值 │
│ │ │ │
│ 返回缓存响应 调用 LLM API │
│ (延迟 ~5ms) (延迟 ~2000ms) │
│ │ │
│ 存入缓存(向量+响应) │
└──────────────────────────────────────────────────────┘工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Redis LangCache | Redis 官方语义缓存方案 | 免费(开源)/ Cloud 起步 $7/月 | 生产级语义缓存 |
| GPTCache | 开源 LLM 缓存库 | 免费(开源) | 快速原型、自定义缓存策略 |
| LangChain Cache | LangChain 内置缓存 | 免费 | LangChain 生态用户 |
| Momento | 无服务器缓存 | 免费(5 GB) | Serverless 架构 |
操作步骤
步骤 1:使用 Redis 构建语义缓存
# pip install redis langchain-openai numpy
import json
import hashlib
import numpy as np
from typing import Optional
import redis
import openai
class SemanticCache:
"""基于 Redis 的语义缓存实现"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
similarity_threshold: float = 0.92,
ttl_seconds: int = 3600,
embedding_model: str = "text-embedding-3-small"
):
self.redis = redis.from_url(redis_url)
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.embedding_model = embedding_model
self.client = openai.OpenAI()
def _get_embedding(self, text: str) -> list[float]:
"""获取文本的向量嵌入"""
response = self.client.embeddings.create(
model=self.embedding_model,
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""计算余弦相似度"""
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def get(self, query: str) -> Optional[str]:
"""查询语义缓存"""
query_embedding = self._get_embedding(query)
# 遍历缓存中的所有条目(生产环境应使用 Redis Vector Search)
for key in self.redis.scan_iter(match="sem_cache:*"):
cached = json.loads(self.redis.get(key))
similarity = self._cosine_similarity(
query_embedding, cached["embedding"]
)
if similarity >= self.threshold:
# 更新访问时间(LRU)
self.redis.expire(key, self.ttl)
return cached["response"]
return None
def set(self, query: str, response: str):
"""存入语义缓存"""
embedding = self._get_embedding(query)
cache_key = f"sem_cache:{hashlib.md5(query.encode()).hexdigest()}"
self.redis.setex(
cache_key,
self.ttl,
json.dumps({
"query": query,
"embedding": embedding,
"response": response
})
)
# 使用示例
cache = SemanticCache(similarity_threshold=0.92)
def cached_llm_call(query: str) -> str:
# 1. 先查缓存
cached = cache.get(query)
if cached:
print("✅ 缓存命中!节省一次 API 调用")
return cached
# 2. 缓存未命中,调用 LLM
print("❌ 缓存未命中,调用 API...")
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
# 3. 存入缓存
cache.set(query, result)
return result
# 第一次调用——缓存未命中
result1 = cached_llm_call("Python 的 GIL 是什么?")
# 第二次调用——语义相似,缓存命中
result2 = cached_llm_call("请解释一下 Python 中的全局解释器锁")步骤 2:使用 Redis LangCache(生产推荐)
Redis 官方在 2025 年推出了 LangCache,专为 LLM 语义缓存设计:
# pip install redis redisvl
from redisvl.extensions.llmcache import SemanticCache as RedisSemanticCache
# 初始化 Redis 语义缓存
cache = RedisSemanticCache(
name="llm_cache",
redis_url="redis://localhost:6379",
distance_threshold=0.08, # 越小越严格(L2 距离)
ttl=3600
)
# 检查缓存
results = cache.check(prompt="什么是机器学习?")
if results:
print(f"缓存命中: {results[0]['response']}")
else:
# 调用 LLM 并存入缓存
response = call_llm("什么是机器学习?")
cache.store(
prompt="什么是机器学习?",
response=response
)步骤 3:使用 GPTCache
# pip install gptcache
from gptcache import cache as gpt_cache
from gptcache.adapter import openai as cached_openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation
# 初始化 GPTCache
onnx = Onnx()
cache_base = CacheBase("sqlite")
vector_base = VectorBase("faiss", dimension=onnx.dimension)
data_manager = get_data_manager(cache_base, vector_base)
gpt_cache.init(
embedding_func=onnx.to_embeddings,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation()
)
# 使用缓存的 OpenAI 调用
response = cached_openai.ChatCompletion.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": "解释量子计算"}]
)缓存失效策略
class CacheInvalidationStrategy:
"""缓存失效策略管理"""
def __init__(self, cache: SemanticCache):
self.cache = cache
def invalidate_by_pattern(self, pattern: str):
"""按模式失效(如某个功能的所有缓存)"""
for key in self.cache.redis.scan_iter(match=f"sem_cache:{pattern}*"):
self.cache.redis.delete(key)
def invalidate_stale(self, max_age_hours: int = 24):
"""失效过期缓存"""
for key in self.cache.redis.scan_iter(match="sem_cache:*"):
ttl = self.cache.redis.ttl(key)
if ttl < 0: # 无过期时间
self.cache.redis.expire(key, max_age_hours * 3600)
def warm_cache(self, common_queries: list[str]):
"""预热缓存——用高频查询预先填充"""
for query in common_queries:
if not self.cache.get(query):
response = call_llm(query)
self.cache.set(query, response)
print(f"预热: {query[:50]}...")4. Prompt 缓存(Provider-Level Caching)
Prompt 缓存是由模型提供商在服务端实现的缓存机制,与语义缓存不同——它缓存的是 Prompt 前缀的内部计算结果(KV Cache),而非最终响应。对于包含大量系统提示、文档上下文的应用,Prompt 缓存可以降低 50-90% 的输入 Token 成本。
4.1 各提供商 Prompt 缓存对比
| 特性 | Anthropic | OpenAI | |
|---|---|---|---|
| 缓存折扣 | 90%(缓存 Token 仅收 10%) | 50%(缓存 Token 收 50%) | 75%(缓存 Token 收 25%) |
| 最小前缀长度 | 1,024 tokens | 1,024 tokens | 自动 |
| 缓存 TTL | 5 分钟(可延长) | 自动管理 | 自动管理 |
| 启用方式 | 手动标记 cache_control | 自动(默认开启) | 自动 |
| 延迟改善 | 最高 85% | 最高 50% | 最高 70% |
操作步骤
步骤 1:Anthropic Prompt 缓存
Anthropic 的 Prompt 缓存需要手动标记缓存断点,缓存命中时输入 Token 成本降低 90%:
import anthropic
client = anthropic.Anthropic()
# 大型系统提示 + 文档上下文(适合缓存)
system_prompt = """你是一个专业的法律顾问 AI 助手。
以下是公司的完整法律政策文档(约 50,000 字):
... [大量文档内容] ...
"""
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=[
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"} # 标记为可缓存
}
],
messages=[
{"role": "user", "content": "员工离职需要提前多久通知?"}
]
)
# 查看缓存状态
print(f"缓存创建 Token: {response.usage.cache_creation_input_tokens}")
print(f"缓存命中 Token: {response.usage.cache_read_input_tokens}")
print(f"普通输入 Token: {response.usage.input_tokens}")
# 第一次调用:cache_creation_input_tokens = 15000(创建缓存,收 25% 溢价)
# 后续调用:cache_read_input_tokens = 15000(命中缓存,仅收 10%)步骤 2:OpenAI 自动 Prompt 缓存
OpenAI 的 Prompt 缓存是自动的,无需额外代码。当连续请求共享相同的 Prompt 前缀(≥1,024 tokens)时,自动触发缓存:
import openai
client = openai.OpenAI()
# 大型系统提示(自动缓存)
large_system_prompt = """你是一个代码审查助手。
以下是项目的编码规范(约 10,000 字):
... [大量规范内容] ...
"""
# 多次调用共享相同的系统提示——自动触发缓存
for code_snippet in code_snippets_to_review:
response = client.chat.completions.create(
model="gpt-4.1",
messages=[
{"role": "system", "content": large_system_prompt},
{"role": "user", "content": f"请审查以下代码:\n{code_snippet}"}
]
)
# 查看缓存状态
usage = response.usage
if hasattr(usage, "prompt_tokens_details"):
cached = usage.prompt_tokens_details.cached_tokens
print(f"缓存命中 Token: {cached} / 总输入: {usage.prompt_tokens}")
# 缓存命中的 Token 按 50% 价格计费步骤 3:最大化 Prompt 缓存命中率
# ✅ 好的做法:将不变的内容放在前面,变化的内容放在后面
messages = [
{"role": "system", "content": LARGE_STATIC_SYSTEM_PROMPT}, # 不变 → 缓存
{"role": "system", "content": STATIC_KNOWLEDGE_BASE}, # 不变 → 缓存
{"role": "user", "content": dynamic_user_query} # 变化 → 不缓存
]
# ❌ 坏的做法:将动态内容插入到静态内容中间
messages = [
{"role": "system", "content": f"当前时间:{datetime.now()}"}, # 每次变化 → 破坏缓存
{"role": "system", "content": LARGE_STATIC_SYSTEM_PROMPT}, # 前缀已变 → 无法缓存
{"role": "user", "content": dynamic_user_query}
]
# ✅ 优化后:动态内容移到最后
messages = [
{"role": "system", "content": LARGE_STATIC_SYSTEM_PROMPT}, # 不变 → 缓存
{"role": "system", "content": f"当前时间:{datetime.now()}"}, # 变化但在后面
{"role": "user", "content": dynamic_user_query}
]5. 智能模型路由(Model Routing)
智能模型路由是成本优化中效果最显著的策略之一。核心思想:不是所有请求都需要旗舰模型。将 70% 的简单查询路由到廉价模型,只让 30% 的复杂查询使用旗舰模型,可以降低 60%+ 的成本。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| LiteLLM | 开源 LLM 网关 + 路由 | 免费(开源) | 自托管、完全控制 |
| OpenRouter | 统一 API 网关 | 按模型定价(无额外费用) | 快速接入 300+ 模型 |
| Martian | AI 驱动的智能路由 | 按用量计费 | 自动质量-成本优化 |
| Unify AI | 模型路由优化平台 | 免费(基础版) | 基准测试驱动的路由 |
操作步骤
步骤 1:基于规则的模型路由
import re
from enum import Enum
from typing import Optional
import litellm
class TaskComplexity(Enum):
SIMPLE = "simple" # 分类、提取、格式化
MEDIUM = "medium" # 摘要、翻译、一般问答
COMPLEX = "complex" # 推理、编码、创意写作
CRITICAL = "critical" # 高风险决策、法律/医疗
class ModelRouter:
"""基于规则的智能模型路由器"""
# 模型配置:按复杂度映射到不同模型
MODEL_MAP = {
TaskComplexity.SIMPLE: {
"model": "gpt-4.1-nano",
"cost_per_1m_input": 0.10,
"cost_per_1m_output": 0.40,
},
TaskComplexity.MEDIUM: {
"model": "gpt-4.1-mini",
"cost_per_1m_input": 0.40,
"cost_per_1m_output": 1.60,
},
TaskComplexity.COMPLEX: {
"model": "gpt-4.1",
"cost_per_1m_input": 2.00,
"cost_per_1m_output": 8.00,
},
TaskComplexity.CRITICAL: {
"model": "claude-sonnet-4-20250514",
"cost_per_1m_input": 3.00,
"cost_per_1m_output": 15.00,
},
}
# 任务类型到复杂度的映射规则
TASK_RULES = {
"classify": TaskComplexity.SIMPLE,
"extract": TaskComplexity.SIMPLE,
"format": TaskComplexity.SIMPLE,
"translate": TaskComplexity.MEDIUM,
"summarize": TaskComplexity.MEDIUM,
"qa": TaskComplexity.MEDIUM,
"code_generate": TaskComplexity.COMPLEX,
"reason": TaskComplexity.COMPLEX,
"creative": TaskComplexity.COMPLEX,
"legal": TaskComplexity.CRITICAL,
"medical": TaskComplexity.CRITICAL,
"financial": TaskComplexity.CRITICAL,
}
def classify_complexity(
self,
query: str,
task_type: Optional[str] = None,
token_count: int = 0
) -> TaskComplexity:
"""判断查询的复杂度"""
# 1. 如果明确指定了任务类型
if task_type and task_type in self.TASK_RULES:
return self.TASK_RULES[task_type]
# 2. 基于启发式规则判断
query_lower = query.lower()
# 简单任务特征
simple_patterns = [
r"分类|归类|标签",
r"提取|抽取|找出",
r"格式化|转换格式",
r"是否|是不是|对不对",
]
for pattern in simple_patterns:
if re.search(pattern, query_lower):
return TaskComplexity.SIMPLE
# 复杂任务特征
complex_patterns = [
r"写代码|编程|实现.*功能",
r"分析.*原因|为什么.*会",
r"设计.*架构|方案",
r"比较.*优缺点",
]
for pattern in complex_patterns:
if re.search(pattern, query_lower):
return TaskComplexity.COMPLEX
# 默认中等复杂度
return TaskComplexity.MEDIUM
def route(
self,
messages: list[dict],
task_type: Optional[str] = None,
force_model: Optional[str] = None
) -> dict:
"""路由请求到合适的模型"""
if force_model:
return {"model": force_model, "complexity": "forced"}
# 获取用户查询
user_query = ""
for msg in reversed(messages):
if msg["role"] == "user":
user_query = msg["content"]
break
complexity = self.classify_complexity(user_query, task_type)
config = self.MODEL_MAP[complexity]
return {
"model": config["model"],
"complexity": complexity.value,
"estimated_cost_per_1m_input": config["cost_per_1m_input"]
}
def call(
self,
messages: list[dict],
task_type: Optional[str] = None,
**kwargs
) -> dict:
"""路由并调用模型"""
route_info = self.route(messages, task_type)
response = litellm.completion(
model=route_info["model"],
messages=messages,
**kwargs
)
# 记录路由决策(用于后续优化)
cost = litellm.completion_cost(completion_response=response)
print(f"路由: {route_info['complexity']} → {route_info['model']} | 成本: ${cost:.6f}")
return {
"response": response,
"route_info": route_info,
"cost": cost
}
# 使用示例
router = ModelRouter()
# 简单分类任务 → 自动路由到 GPT-4.1 nano ($0.10/1M)
result = router.call(
messages=[{"role": "user", "content": "这条评论是正面还是负面的:'产品质量很好'"}],
task_type="classify"
)
# 复杂编码任务 → 自动路由到 GPT-4.1 ($2.00/1M)
result = router.call(
messages=[{"role": "user", "content": "用 Python 实现一个带重试机制的异步 HTTP 客户端"}],
task_type="code_generate"
)步骤 2:使用 LiteLLM Router(带负载均衡和降级)
from litellm import Router
# 配置 LiteLLM Router
router = Router(
model_list=[
{
"model_name": "fast-model", # 逻辑名称
"litellm_params": {
"model": "gpt-4.1-mini",
"api_key": "sk-xxx",
},
"model_info": {"id": "gpt-4.1-mini-1"}
},
{
"model_name": "fast-model", # 同名 = 负载均衡
"litellm_params": {
"model": "gemini/gemini-2.5-flash",
"api_key": "xxx",
},
"model_info": {"id": "gemini-flash-1"}
},
{
"model_name": "powerful-model",
"litellm_params": {
"model": "gpt-4.1",
"api_key": "sk-xxx",
},
},
],
# 路由策略
routing_strategy="least-busy", # 或 "simple-shuffle", "latency-based-routing"
# 降级配置
fallbacks=[
{"fast-model": ["powerful-model"]}, # fast-model 失败时降级到 powerful-model
],
# 重试配置
num_retries=2,
retry_after=5,
# 超时配置
timeout=30,
)
# 使用路由器调用
response = await router.acompletion(
model="fast-model",
messages=[{"role": "user", "content": "你好"}]
)步骤 3:级联路由(Cascading)——先用便宜模型,不满意再升级
import json
class CascadingRouter:
"""级联路由:先用便宜模型,质量不达标时自动升级"""
MODEL_CASCADE = [
{"model": "gpt-4.1-nano", "cost_factor": 1},
{"model": "gpt-4.1-mini", "cost_factor": 4},
{"model": "gpt-4.1", "cost_factor": 20},
]
def __init__(self, quality_threshold: float = 0.7):
self.quality_threshold = quality_threshold
def _assess_quality(self, response: str, query: str) -> float:
"""快速评估响应质量(使用启发式规则)"""
score = 0.5 # 基础分
# 长度合理性
if 50 < len(response) < 5000:
score += 0.1
# 包含结构化内容
if any(marker in response for marker in ["1.", "- ", "```", "**"]):
score += 0.1
# 不包含不确定表述
uncertain = ["我不确定", "我不知道", "可能是", "也许"]
if not any(u in response for u in uncertain):
score += 0.1
# 与查询相关性(简单关键词匹配)
query_words = set(query[:100].split())
response_words = set(response[:200].split())
overlap = len(query_words & response_words) / max(len(query_words), 1)
score += overlap * 0.2
return min(score, 1.0)
async def call(self, messages: list[dict]) -> dict:
"""级联调用:从最便宜的模型开始"""
query = messages[-1]["content"] if messages else ""
for i, config in enumerate(self.MODEL_CASCADE):
response = await litellm.acompletion(
model=config["model"],
messages=messages
)
result = response.choices[0].message.content
quality = self._assess_quality(result, query)
if quality >= self.quality_threshold or i == len(self.MODEL_CASCADE) - 1:
return {
"response": result,
"model_used": config["model"],
"cascade_level": i,
"quality_score": quality,
"cost": litellm.completion_cost(completion_response=response)
}
return {"response": result, "model_used": config["model"]}6. 批处理(Batch Processing)
对于非实时场景(数据分析、内容生成、批量分类),批处理可以显著降低成本。OpenAI Batch API 提供 50% 的价格折扣,且支持大规模并行处理。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| OpenAI Batch API | 官方批处理接口 | 50% 折扣 | 非实时大批量处理 |
| Anthropic Message Batches | Anthropic 批处理 | 50% 折扣 | Claude 模型批量调用 |
| Celery + Redis | 异步任务队列 | 免费(开源) | 自建批处理管线 |
| AWS SQS + Lambda | 云原生队列 | 按用量(极低) | Serverless 批处理 |
操作步骤
步骤 1:使用 OpenAI Batch API
OpenAI Batch API 接受 JSONL 文件作为输入,异步处理后返回结果,价格为标准 API 的 50%:
import json
import time
from openai import OpenAI
client = OpenAI()
# 1. 准备批处理输入文件(JSONL 格式)
batch_requests = []
products = [
"iPhone 16 Pro Max 256GB 深空黑",
"MacBook Air M4 15寸 16GB",
"AirPods Pro 3 带听力健康功能",
# ... 假设有 10,000 个产品
]
for i, product in enumerate(products):
batch_requests.append({
"custom_id": f"product-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4.1-mini",
"messages": [
{"role": "system", "content": "你是一个电商文案专家。为产品生成 50 字以内的营销描述。"},
{"role": "user", "content": f"产品:{product}"}
],
"max_tokens": 200
}
})
# 写入 JSONL 文件
with open("batch_input.jsonl", "w") as f:
for req in batch_requests:
f.write(json.dumps(req, ensure_ascii=False) + "\n")
# 2. 上传文件
batch_file = client.files.create(
file=open("batch_input.jsonl", "rb"),
purpose="batch"
)
# 3. 创建批处理任务
batch_job = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h" # 24 小时内完成
)
print(f"批处理任务已创建: {batch_job.id}")
# 4. 轮询检查状态
while True:
status = client.batches.retrieve(batch_job.id)
print(f"状态: {status.status} | 完成: {status.request_counts.completed}/{status.request_counts.total}")
if status.status in ["completed", "failed", "expired"]:
break
time.sleep(60)
# 5. 下载结果
if status.status == "completed":
result_file = client.files.content(status.output_file_id)
results = []
for line in result_file.text.strip().split("\n"):
result = json.loads(line)
results.append({
"id": result["custom_id"],
"description": result["response"]["body"]["choices"][0]["message"]["content"]
})
print(f"成功处理 {len(results)} 个产品描述")步骤 2:构建异步批处理队列
对于需要更灵活控制的场景,使用 Celery + Redis 构建自定义批处理管线:
# tasks.py
from celery import Celery
import litellm
app = Celery("llm_tasks", broker="redis://localhost:6379/0")
# 配置并发限制(避免触发 API 速率限制)
app.conf.worker_concurrency = 10
app.conf.task_rate_limit = "50/m" # 每分钟最多 50 个任务
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
rate_limit="50/m"
)
def process_llm_request(self, request_id: str, messages: list, model: str = "gpt-4.1-mini"):
"""异步处理单个 LLM 请求"""
try:
response = litellm.completion(
model=model,
messages=messages,
timeout=30
)
return {
"request_id": request_id,
"status": "success",
"result": response.choices[0].message.content,
"cost": litellm.completion_cost(completion_response=response)
}
except Exception as e:
# 自动重试(指数退避)
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
# 批量提交任务
def submit_batch(items: list[dict]):
"""提交批量 LLM 任务"""
from celery import group
tasks = []
for item in items:
task = process_llm_request.s(
request_id=item["id"],
messages=item["messages"],
model=item.get("model", "gpt-4.1-mini")
)
tasks.append(task)
# 并行执行所有任务
job = group(tasks)()
return job步骤 3:批处理成本对比
场景:处理 10,000 个产品描述生成任务
模型:GPT-4.1 mini
平均输入:200 tokens / 平均输出:100 tokens
标准 API:
输入成本:10,000 × 200 / 1,000,000 × $0.40 = $0.80
输出成本:10,000 × 100 / 1,000,000 × $1.60 = $1.60
总成本:$2.40
Batch API(50% 折扣):
输入成本:$0.80 × 0.5 = $0.40
输出成本:$1.60 × 0.5 = $0.80
总成本:$1.20
节省:$1.20(50%)7. Prompt 优化与压缩
减少输入 Token 数量是最直接的成本优化手段。通过 Prompt 压缩、结构优化和输出约束,可以在不影响质量的前提下减少 30-50% 的 Token 消耗。
操作步骤
步骤 1:Prompt 压缩技术
# 方法 1:移除冗余指令
# ❌ 冗余 Prompt(约 150 tokens)
verbose_prompt = """
我希望你能够帮助我完成以下任务。这个任务非常重要,请你认真对待。
我需要你分析一段文本,然后给出你的分析结果。
请确保你的分析是准确的、全面的、有深度的。
在分析的时候,请注意以下几点:
1. 请仔细阅读文本
2. 请从多个角度进行分析
3. 请给出具体的例子来支持你的观点
4. 请用清晰的语言表达你的分析结果
以下是需要分析的文本:
"""
# ✅ 精简 Prompt(约 30 tokens)
concise_prompt = """分析以下文本,从多角度给出带具体例子的深度分析:
"""
# 方法 2:使用缩写和符号
# ❌ 冗长
long_format = """
请按照以下格式输出:
- 第一部分:总结(不超过100字)
- 第二部分:关键发现(列出3-5个要点)
- 第三部分:建议(列出2-3个可行建议)
"""
# ✅ 精简
short_format = """输出格式:
1) 总结(≤100字)
2) 关键发现(3-5点)
3) 建议(2-3条)"""
# 方法 3:使用 JSON Schema 约束输出(减少输出 Token)
response = client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": "分析这段评论的情感"}],
response_format={
"type": "json_schema",
"json_schema": {
"name": "sentiment",
"schema": {
"type": "object",
"properties": {
"sentiment": {"type": "string", "enum": ["positive", "negative", "neutral"]},
"confidence": {"type": "number"},
"keywords": {"type": "array", "items": {"type": "string"}, "maxItems": 3}
},
"required": ["sentiment", "confidence"]
}
}
}
)步骤 2:上下文窗口优化
def optimize_context(
documents: list[str],
query: str,
max_context_tokens: int = 4000,
model: str = "gpt-4.1-mini"
) -> str:
"""优化上下文:只保留与查询最相关的内容"""
import tiktoken
encoder = tiktoken.encoding_for_model(model)
# 1. 按相关性排序文档(简化版,生产环境用向量检索)
scored_docs = []
query_words = set(query.lower().split())
for doc in documents:
doc_words = set(doc.lower().split())
relevance = len(query_words & doc_words) / max(len(query_words), 1)
scored_docs.append((relevance, doc))
scored_docs.sort(reverse=True)
# 2. 贪心选择:按相关性依次加入,直到达到 Token 限制
selected = []
current_tokens = 0
for _, doc in scored_docs:
doc_tokens = len(encoder.encode(doc))
if current_tokens + doc_tokens <= max_context_tokens:
selected.append(doc)
current_tokens += doc_tokens
else:
break
return "\n---\n".join(selected)步骤 3:输出 Token 控制
# 策略 1:明确限制输出长度
response = client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": "用一句话总结这篇文章"}],
max_tokens=100 # 硬限制输出长度
)
# 策略 2:在 Prompt 中约束输出格式
constrained_prompt = """
分类以下文本的主题。只输出类别名称,不要解释。
可选类别:技术、商业、娱乐、体育、科学
文本:{text}
类别:"""
# 策略 3:使用 stop 序列提前终止
response = client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": "列出前3个要点:"}],
stop=["4.", "4、"] # 在第4点之前停止
)8. 速率限制与流量控制
速率限制不仅是防止超支的安全网,也是优化成本的重要手段。合理的限流可以平滑流量峰值、避免触发提供商的速率限制(导致重试成本),并为批处理留出配额。
操作步骤
步骤 1:实现令牌桶限流器
import time
import asyncio
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
requests_per_minute: int = 60
tokens_per_minute: int = 100_000
max_concurrent: int = 10
class TokenBucketRateLimiter:
"""令牌桶限流器——支持请求数和 Token 数双重限制"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.request_tokens = config.requests_per_minute
self.token_tokens = config.tokens_per_minute
self.last_refill = time.monotonic()
self.semaphore = asyncio.Semaphore(config.max_concurrent)
self._lock = asyncio.Lock()
async def _refill(self):
"""补充令牌"""
now = time.monotonic()
elapsed = now - self.last_refill
# 按时间比例补充
self.request_tokens = min(
self.config.requests_per_minute,
self.request_tokens + elapsed * self.config.requests_per_minute / 60
)
self.token_tokens = min(
self.config.tokens_per_minute,
self.token_tokens + elapsed * self.config.tokens_per_minute / 60
)
self.last_refill = now
async def acquire(self, estimated_tokens: int = 1000):
"""获取调用许可"""
while True:
async with self._lock:
await self._refill()
if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens:
self.request_tokens -= 1
self.token_tokens -= estimated_tokens
return True
# 等待令牌补充
await asyncio.sleep(0.5)
async def call_with_limit(self, func, *args, estimated_tokens=1000, **kwargs):
"""带限流的调用"""
await self.acquire(estimated_tokens)
async with self.semaphore:
return await func(*args, **kwargs)
# 使用示例
limiter = TokenBucketRateLimiter(RateLimitConfig(
requests_per_minute=50,
tokens_per_minute=80_000,
max_concurrent=5
))
async def rate_limited_call(messages):
await limiter.acquire(estimated_tokens=500)
return await litellm.acompletion(
model="gpt-4.1-mini",
messages=messages
)步骤 2:分层限流策略
class TieredRateLimiter:
"""分层限流:不同用户等级不同配额"""
TIER_LIMITS = {
"free": RateLimitConfig(
requests_per_minute=10,
tokens_per_minute=10_000,
max_concurrent=2
),
"pro": RateLimitConfig(
requests_per_minute=60,
tokens_per_minute=100_000,
max_concurrent=10
),
"enterprise": RateLimitConfig(
requests_per_minute=300,
tokens_per_minute=500_000,
max_concurrent=50
),
}
def __init__(self):
self.limiters: dict[str, TokenBucketRateLimiter] = {}
def get_limiter(self, user_id: str, tier: str = "free") -> TokenBucketRateLimiter:
key = f"{user_id}:{tier}"
if key not in self.limiters:
config = self.TIER_LIMITS.get(tier, self.TIER_LIMITS["free"])
self.limiters[key] = TokenBucketRateLimiter(config)
return self.limiters[key]9. 综合成本优化架构
将以上所有策略组合成一个完整的成本优化管线:
┌─────────────────────────────────────────────────────────────┐
│ LLM 成本优化管线 │
│ │
│ 用户请求 │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 1. 速率限制 │ ← 令牌桶 + 用户配额 │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 2. 语义缓存 │────→│ 缓存命中? │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ 是 │
│ │ 否 ▼ │
│ │ 返回缓存响应(~5ms) │
│ ▼ │
│ ┌──────────────┐ │
│ │ 3. Prompt 优化│ ← 压缩 + 上下文裁剪 │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 4. 模型路由 │ ← 复杂度分类 → 选择模型 │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 5. Prompt 缓存│ ← 提供商级别前缀缓存 │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 6. API 调用 │ → 记录成本 → 更新预算 │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 7. 存入缓存 │ → 更新语义缓存 │
│ └──────┬───────┘ │
│ ▼ │
│ 返回响应 │
└─────────────────────────────────────────────────────────────┘class CostOptimizedLLMPipeline:
"""综合成本优化管线"""
def __init__(self):
self.rate_limiter = TokenBucketRateLimiter(RateLimitConfig())
self.semantic_cache = SemanticCache(similarity_threshold=0.92)
self.model_router = ModelRouter()
self.budget_manager = TokenBudgetManager()
async def call(
self,
messages: list[dict],
user_id: str,
task_type: str = None,
use_cache: bool = True
) -> dict:
"""完整的成本优化调用流程"""
# 1. 预算检查
budget = self.budget_manager.check_budget(user_id)
if budget["is_over_budget"]:
return {"error": "月度预算已用尽", "budget": budget}
# 2. 速率限制
await self.rate_limiter.acquire()
# 3. 语义缓存查询
user_query = messages[-1]["content"] if messages else ""
if use_cache:
cached = self.semantic_cache.get(user_query)
if cached:
return {
"response": cached,
"source": "cache",
"cost": 0.0
}
# 4. 模型路由
route = self.model_router.route(messages, task_type)
# 5. 调用 LLM
import litellm
response = await litellm.acompletion(
model=route["model"],
messages=messages
)
result = response.choices[0].message.content
cost = litellm.completion_cost(completion_response=response)
# 6. 记录用量
self.budget_manager.record_usage(
user_id=user_id,
feature=task_type or "general",
model=route["model"],
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
cost_usd=cost
)
# 7. 存入缓存
if use_cache:
self.semantic_cache.set(user_query, result)
return {
"response": result,
"source": "api",
"model": route["model"],
"complexity": route["complexity"],
"cost": cost
}实战案例:AI 客服系统从 $47,000/月 降至 $8,200/月
场景描述
一家 SaaS 公司的 AI 客服系统,日均处理 50,000 次对话,全部使用 GPT-4o($2.50/$10.00 per 1M tokens)。月度 API 账单 $47,000,单次交互成本 $0.031。CEO 要求在不降低客户满意度的前提下,将成本降至 $15,000/月以内。
案例的具体操作流程
第 1 步:成本分析(使用 Langfuse)
# 接入 Langfuse 追踪,分析成本分布
# 发现:
# - 45% 的查询是重复性问题(退货政策、配送时间、账户问题)
# - 30% 的查询是简单分类/路由(判断问题类型后转人工)
# - 15% 的查询需要知识库检索 + 生成
# - 10% 的查询涉及复杂推理(投诉处理、特殊情况)第 2 步:实施语义缓存(节省 40%)
# 针对 45% 的重复性查询,部署语义缓存
cache = SemanticCache(
similarity_threshold=0.90, # 客服场景可以稍微宽松
ttl_seconds=86400 # 24 小时缓存
)
# 预热缓存:用 Top 500 高频问题预填充
common_questions = [
"怎么退货?", "退款多久到账?", "配送需要几天?",
"如何修改收货地址?", "会员有什么优惠?", ...
]
invalidation = CacheInvalidationStrategy(cache)
invalidation.warm_cache(common_questions)
# 效果:缓存命中率 42%,月度节省 ~$19,740第 3 步:实施模型路由(再节省 35%)
# 对未命中缓存的 58% 请求进行智能路由
router = ModelRouter()
# 路由规则:
# - 简单分类/路由(30%)→ GPT-4.1 nano ($0.10/$0.40)
# - 知识库问答(15%)→ GPT-4.1 mini ($0.40/$1.60)
# - 复杂推理(10%)→ GPT-4.1 ($2.00/$8.00)
# - 投诉/敏感(3%)→ Claude Sonnet 4 ($3.00/$15.00)
# 效果:平均模型成本从 $6.25/1M tokens 降至 $1.20/1M tokens第 4 步:Prompt 优化(再节省 25%)
# 优化前:系统提示 2,000 tokens + 对话历史平均 1,500 tokens
# 优化后:
# 1. 精简系统提示:2,000 → 800 tokens
# 2. 对话历史只保留最近 3 轮:1,500 → 600 tokens
# 3. 使用 JSON Schema 约束输出格式
# 4. 利用 Anthropic Prompt 缓存(系统提示部分)
# 效果:平均输入 Token 减少 45%第 5 步:批处理非实时任务(再节省 10%)
# 将以下任务改为批处理:
# - 每日对话质量评估(LLM-as-Judge)
# - 每周客户反馈摘要生成
# - 知识库内容更新和索引
# 使用 OpenAI Batch API,享受 50% 折扣案例分析
| 优化措施 | 月度节省 | 累计成本 | 节省比例 |
|---|---|---|---|
| 优化前基线 | — | $47,000 | — |
| + 语义缓存 | -$19,740 | $27,260 | 42% |
| + 模型路由 | -$12,800 | $14,460 | 69% |
| + Prompt 优化 | -$3,600 | $10,860 | 77% |
| + 批处理 | -$1,200 | $9,660 | 79% |
| + Prompt 缓存 | -$1,460 | $8,200 | 83% |
最终结果:月度成本从 $47,000 降至 $8,200,节省 83%。单次交互成本从 $0.031 降至 $0.0054。客户满意度评分从 4.2 微升至 4.3(因为缓存命中时响应更快)。
避坑指南
❌ 常见错误
-
只关注模型价格,忽略总体架构
- 问题:盲目选择最便宜的模型,导致质量下降、用户投诉增加、人工介入成本上升
- 正确做法:先分析流量分布和任务类型,用模型路由让合适的模型处理合适的任务。便宜模型处理简单任务,旗舰模型处理复杂任务
-
语义缓存阈值设置不当
- 问题:阈值太低(如 0.80)导致返回不相关的缓存结果,用户体验差;阈值太高(如 0.99)导致几乎没有缓存命中
- 正确做法:从 0.92 开始,用真实查询日志做 A/B 测试,逐步调整。不同场景用不同阈值——FAQ 可以宽松(0.88),专业问答要严格(0.95)
-
忽略 Prompt 缓存的前缀顺序
- 问题:将动态内容(时间戳、用户 ID)放在系统提示开头,导致每次请求的前缀都不同,Prompt 缓存完全失效
- 正确做法:静态内容(系统提示、知识库)放前面,动态内容(用户查询、时间)放后面。Anthropic 的
cache_control标记要放在静态内容的末尾
-
没有设置成本告警和预算上限
- 问题:一个 bug 导致无限循环调用 API,一夜之间产生数千美元账单
- 正确做法:在 OpenAI/Anthropic 后台设置月度硬限制。在应用层实现用户级和功能级预算管理,设置 50%/80%/95% 三级告警
-
批处理任务使用实时 API
- 问题:数据分析、内容批量生成等非实时任务使用标准 API,白白多花 50% 的钱
- 正确做法:所有不需要实时响应的任务(报告生成、数据标注、批量分类)都使用 Batch API。OpenAI 和 Anthropic 的 Batch API 都提供 50% 折扣
-
缓存没有失效策略
- 问题:知识库更新后,缓存仍然返回旧信息,导致用户获得过时的回答
- 正确做法:实现基于事件的缓存失效(知识库更新时清除相关缓存)、基于时间的 TTL(FAQ 缓存 24 小时、实时数据缓存 5 分钟)、定期预热高频查询
✅ 最佳实践
- 先监控,再优化:接入 Langfuse/Helicone 等工具,至少收集 1 周的成本数据,找到成本热点后再针对性优化
- 分层优化:按”缓存 → 路由 → Prompt 优化 → 批处理”的顺序实施,每步验证效果后再进入下一步
- 保持质量监控:每次优化后都要监控输出质量指标(用户满意度、任务完成率),确保成本优化不以质量为代价
- 定期审查模型定价:LLM 市场价格变化快(2024-2025 年间主流模型价格下降了 5-10 倍),每季度审查一次模型选择
- 为成本优化建立 Dashboard:在 Grafana 或 Langfuse 中建立成本仪表板,追踪日/周/月成本趋势、缓存命中率、模型分布
相关资源与延伸阅读
- LiteLLM — 开源 LLM API 网关 — 支持 100+ 模型的统一调用、成本追踪和路由
- Redis LangCache — 语义缓存方案 — Redis 官方 AI 语义缓存产品
- GPTCache — 开源 LLM 缓存库 — Zilliz 开源的 LLM 语义缓存框架
- OpenAI Batch API 文档 — 官方批处理 API 指南
- Anthropic Prompt Caching 文档 — Claude Prompt 缓存使用指南
- Helicone — LLM 可观测性网关 — 一行代码接入的 LLM 监控和成本追踪
- OpenRouter — 统一 AI API 网关 — 300+ 模型的统一接入和价格比较
- PricePerToken — AI 模型价格对比 — 实时更新的 300+ AI 模型价格对比工具
- Langfuse 成本追踪文档 — Langfuse 成本追踪和分析功能
- LLM 成本优化论文:GPT Semantic Cache — 语义缓存减少 68.8% API 调用的研究
参考来源
- 5 Ways to Optimize Costs and Latency in LLM-Powered Applications (2025-12)
- Reducing LLM Costs and Latency via Semantic Embedding Caching (2024-11)
- How to Save Costs and Improve Latency: Semantic Cache with LangChain and Redis (2025-06)
- Redis LangCache — Semantic Caching for AI (2025-06)
- Prompt Caching Infrastructure: Reducing LLM Costs and Latency (2025-12)
- An Evaluation of Prompt Caching for Long-Horizon Agentic Tasks (2026-01)
- LLM Batch Inference: Cut Costs 50% Production Guide 2026 (2026-01)
- Best AI Model Routers for Multi-Provider LLM Cost Optimization (2025-12)
- OpenRouter vs LiteLLM: Features, Pricing, and Use Cases (2025-10)
- LLM API Pricing Comparison 2025: OpenAI, Gemini, Claude (2025-12)
- How to Cut Your LLM Costs by 70% Without Losing Quality (2025-12)
- Cost Optimization for LLM Applications: Token Management (2025-10)
📖 返回 总览与导航 | 上一节:21d-Langfuse设置指南 | 下一节:21f-生产告警与质量指标