21f - 生产告警与质量指标
本文是《AI Agent 实战手册》第 21 章第 6 节。 上一节:21e-成本优化策略 | 下一节:22a-AI安全概览 📖 返回 总览与导航
⏱ 阅读时间:90 分钟 | 难度:⭐⭐⭐⭐⭐ 高级 | 前置知识:LLM API 使用经验、Prometheus/Grafana 基础、Python/TypeScript 开发经验
概述
AI Agent 在生产环境中的表现远比开发阶段复杂——模型输出的非确定性、尾部延迟、速率限制、检索漂移和幻觉率波动都可能在用户无感知的情况下悄然恶化。传统 APM 工具只能监控”服务是否存活”,却无法回答”AI 回答的质量是否在下降”。本节将系统化地构建一套生产级 AI Agent 质量监控体系,覆盖五大核心指标(成功率、延迟百分位、每次交互成本、满意度评分、漂移检测),并提供完整的 Prometheus + Grafana 仪表板配置、告警规则、LLM-as-Judge 在线评估管线和 AI 系统事件响应手册。
1. AI 系统质量指标框架:SLI/SLO 定义
与传统软件一样,AI 系统也需要明确的服务水平指标(SLI)和服务水平目标(SLO)。但 AI 系统的 SLI 更复杂——除了可用性和延迟,还需要衡量输出质量、安全性和成本效率。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Prometheus | 时序指标采集与告警 | 免费(开源) | 基础设施和应用指标监控 |
| Grafana | 指标可视化仪表板 | 免费(开源)/ Cloud 起步 $0 | 统一可视化 |
| Langfuse | LLM 可观测性 + 在线评估 | 免费(自托管) | 质量追踪、成本分析 |
| Arize Phoenix | ML/LLM 可观测性 + 漂移检测 | 免费(开源) | 嵌入漂移、幻觉检测 |
| Evidently AI | ML 监控 + 数据漂移检测 | 免费(开源) | 嵌入漂移、数据质量 |
| Maxim AI | LLM 评估 + 幻觉检测 | 免费(基础版) | 多阶段质量评估 |
| PagerDuty | 事件管理与告警路由 | 起步 $21/用户/月 | 告警升级与值班管理 |
| Opsgenie | 告警管理 | 起步 $9/用户/月 | 中小团队告警管理 |
1.1 AI 系统 SLI/SLO 全景
传统 SRE 的 SLI/SLO 框架需要针对 AI 系统进行扩展。以下是一个完整的 AI Agent SLI/SLO 定义模板:
┌─────────────────────────────────────────────────────────────────┐
│ AI Agent SLI/SLO 框架 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 可用性指标 │ │ 性能指标 │ │ 质量指标 │ │
│ │ │ │ │ │ │ │
│ │ • API 成功率 │ │ • P50 延迟 │ │ • 幻觉率 │ │
│ │ • 错误分类 │ │ • P95 延迟 │ │ • 相关性评分 │ │
│ │ • SLA 达标率 │ │ • P99 延迟 │ │ • 一致性评分 │ │
│ │ • 降级率 │ │ • TTFT │ │ • 安全性评分 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 成本指标 │ │ 用户指标 │ │
│ │ │ │ │ │
│ │ • 每次交互成本│ │ • CSAT 评分 │ │
│ │ • 每用户成本 │ │ • 👍/👎 比率 │ │
│ │ • 预算使用率 │ │ • 升级率 │ │
│ │ • 缓存命中率 │ │ • 留存率 │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘| SLI 类别 | SLI 指标 | SLO 目标(参考值) | 测量方法 |
|---|---|---|---|
| 可用性 | API 调用成功率 | ≥ 99.5% | 成功请求数 / 总请求数 |
| 可用性 | 错误率(5xx) | ≤ 0.5% | 5xx 响应数 / 总请求数 |
| 性能 | P50 端到端延迟 | ≤ 2s | Prometheus histogram |
| 性能 | P95 端到端延迟 | ≤ 5s | Prometheus histogram |
| 性能 | P99 端到端延迟 | ≤ 10s | Prometheus histogram |
| 性能 | TTFT(首 Token 时间) | ≤ 500ms | 流式响应首字节时间 |
| 质量 | 幻觉率 | ≤ 5% | LLM-as-Judge 采样评估 |
| 质量 | 输出相关性评分 | ≥ 4.0/5.0 | LLM-as-Judge 评分 |
| 成本 | 每次交互平均成本 | ≤ $0.05 | 总 API 费用 / 总交互数 |
| 成本 | 月度预算使用率 | ≤ 90% | 当月花费 / 月度预算 |
| 用户 | CSAT 满意度评分 | ≥ 80% | 用户反馈采集 |
| 用户 | 👍/👎 正面比率 | ≥ 85% | 用户即时反馈 |
| 漂移 | 输出质量漂移 Z-score | ≤ 2.0 | 滑动窗口统计检验 |
| 漂移 | 嵌入漂移距离 | ≤ 阈值 | 余弦距离监控 |
操作步骤
步骤 1:定义 Prometheus 指标(Python 应用)
# metrics.py — AI Agent 核心 Prometheus 指标定义
from prometheus_client import (
Counter, Histogram, Gauge, Summary, Info,
CollectorRegistry, generate_latest
)
import time
from functools import wraps
# 创建指标注册表
REGISTRY = CollectorRegistry()
# ========== 可用性指标 ==========
LLM_REQUESTS_TOTAL = Counter(
"llm_requests_total",
"LLM API 请求总数",
["model", "endpoint", "status", "error_type"],
registry=REGISTRY
)
LLM_ERRORS_TOTAL = Counter(
"llm_errors_total",
"LLM API 错误总数",
["model", "error_type", "error_code"],
registry=REGISTRY
)
# ========== 性能指标 ==========
LLM_REQUEST_DURATION = Histogram(
"llm_request_duration_seconds",
"LLM 请求端到端延迟(秒)",
["model", "endpoint"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0],
registry=REGISTRY
)
LLM_TTFT = Histogram(
"llm_time_to_first_token_seconds",
"首 Token 响应时间(秒)",
["model"],
buckets=[0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0],
registry=REGISTRY
)
# ========== Token 与成本指标 ==========
LLM_TOKENS_TOTAL = Counter(
"llm_tokens_total",
"Token 使用总量",
["model", "token_type"], # token_type: input, output, cached
registry=REGISTRY
)
LLM_COST_TOTAL = Counter(
"llm_cost_dollars_total",
"LLM API 成本(美元)",
["model", "feature", "user_tier"],
registry=REGISTRY
)
LLM_COST_PER_INTERACTION = Summary(
"llm_cost_per_interaction_dollars",
"每次交互成本(美元)",
["feature"],
registry=REGISTRY
)
# ========== 质量指标 ==========
LLM_QUALITY_SCORE = Histogram(
"llm_quality_score",
"LLM 输出质量评分(0-5)",
["model", "evaluator", "dimension"],
buckets=[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0],
registry=REGISTRY
)
LLM_HALLUCINATION_DETECTED = Counter(
"llm_hallucination_detected_total",
"检测到的幻觉次数",
["model", "severity"], # severity: low, medium, high
registry=REGISTRY
)
# ========== 用户满意度指标 ==========
USER_FEEDBACK_TOTAL = Counter(
"user_feedback_total",
"用户反馈总数",
["feedback_type", "feature"], # feedback_type: thumbs_up, thumbs_down, rating
registry=REGISTRY
)
USER_SATISFACTION_SCORE = Histogram(
"user_satisfaction_score",
"用户满意度评分(1-5)",
["feature"],
buckets=[1, 2, 3, 4, 5],
registry=REGISTRY
)
# ========== 漂移指标 ==========
LLM_DRIFT_SCORE = Gauge(
"llm_drift_score",
"输出漂移评分(Z-score)",
["model", "drift_type"], # drift_type: quality, topic, embedding
registry=REGISTRY
)
EMBEDDING_DRIFT_DISTANCE = Gauge(
"embedding_drift_distance",
"嵌入漂移距离",
["model", "metric"], # metric: cosine, euclidean
registry=REGISTRY
)
# ========== 缓存指标 ==========
CACHE_HITS_TOTAL = Counter(
"llm_cache_hits_total",
"缓存命中次数",
["cache_type"], # cache_type: semantic, prompt, exact
registry=REGISTRY
)
CACHE_MISSES_TOTAL = Counter(
"llm_cache_misses_total",
"缓存未命中次数",
["cache_type"],
registry=REGISTRY
)步骤 2:构建指标采集中间件
# middleware.py — LLM 调用指标采集中间件
import time
import traceback
from typing import Optional
from metrics import *
class LLMMetricsMiddleware:
"""LLM 调用指标采集中间件——包装任意 LLM 客户端"""
def __init__(self, feature: str = "default", user_tier: str = "free"):
self.feature = feature
self.user_tier = user_tier
async def call_with_metrics(
self,
llm_func,
model: str,
messages: list,
**kwargs
) -> dict:
"""包装 LLM 调用,自动采集所有指标"""
start_time = time.time()
ttft_recorded = False
status = "success"
error_type = "none"
try:
# 执行 LLM 调用
response = await llm_func(
model=model,
messages=messages,
**kwargs
)
# 记录 Token 用量
usage = response.usage
LLM_TOKENS_TOTAL.labels(
model=model, token_type="input"
).inc(usage.prompt_tokens)
LLM_TOKENS_TOTAL.labels(
model=model, token_type="output"
).inc(usage.completion_tokens)
# 计算并记录成本
cost = self._calculate_cost(
model, usage.prompt_tokens, usage.completion_tokens
)
LLM_COST_TOTAL.labels(
model=model,
feature=self.feature,
user_tier=self.user_tier
).inc(cost)
LLM_COST_PER_INTERACTION.labels(
feature=self.feature
).observe(cost)
return response
except Exception as e:
status = "error"
error_type = type(e).__name__
# 错误分类
error_code = getattr(e, "status_code", 0)
LLM_ERRORS_TOTAL.labels(
model=model,
error_type=error_type,
error_code=str(error_code)
).inc()
raise
finally:
# 记录延迟
duration = time.time() - start_time
LLM_REQUEST_DURATION.labels(
model=model, endpoint=self.feature
).observe(duration)
# 记录请求计数
LLM_REQUESTS_TOTAL.labels(
model=model,
endpoint=self.feature,
status=status,
error_type=error_type
).inc()
def _calculate_cost(
self, model: str, input_tokens: int, output_tokens: int
) -> float:
"""根据模型计算成本"""
PRICING = {
"gpt-4.1": {"input": 2.0, "output": 8.0},
"gpt-4.1-mini": {"input": 0.4, "output": 1.6},
"gpt-4.1-nano": {"input": 0.1, "output": 0.4},
"claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
"claude-haiku-3.5": {"input": 0.8, "output": 4.0},
"gemini-2.5-flash": {"input": 0.15, "output": 0.6},
}
prices = PRICING.get(model, {"input": 1.0, "output": 3.0})
return (
input_tokens * prices["input"] / 1_000_000
+ output_tokens * prices["output"] / 1_000_000
)提示词模板
你是一个 SRE 工程师,负责为 AI Agent 系统定义 SLI/SLO。请根据以下系统信息生成完整的 SLI/SLO 文档:
## 系统信息
- 系统名称:[系统名称]
- 主要功能:[功能描述,如 AI 客服、代码助手、数据分析]
- 日均请求量:[数量]
- 使用的模型:[模型列表]
- 用户群体:[内部/外部,付费/免费]
- 关键业务影响:[如果系统不可用或质量下降,会造成什么影响]
## 请输出
1. 按类别(可用性/性能/质量/成本/用户)定义 SLI 指标
2. 为每个 SLI 设定合理的 SLO 目标值
3. 定义错误预算(Error Budget)和消耗速率告警
4. 建议的测量方法和数据源
5. SLO 违规时的升级流程2. 成功率监控:API 调用成功/失败率
成功率是最基础也是最关键的指标。AI 系统的”失败”不仅包括 HTTP 错误,还包括模型拒绝回答、输出格式错误、超时等”软失败”。
2.1 错误分类体系
┌─────────────────────────────────────────────────────────────┐
│ AI 系统错误分类 │
│ │
│ 硬错误(Hard Errors) 软错误(Soft Errors) │
│ ├── 4xx 客户端错误 ├── 模型拒绝回答(refusal) │
│ │ ├── 400 请求格式错误 ├── 输出格式不符合预期 │
│ │ ├── 401 认证失败 ├── 输出内容为空或过短 │
│ │ ├── 429 速率限制 ├── 幻觉/事实错误 │
│ │ └── 413 上下文超限 ├── 安全过滤触发 │
│ ├── 5xx 服务端错误 ├── 工具调用失败 │
│ │ ├── 500 内部错误 └── 质量评分低于阈值 │
│ │ ├── 502 网关错误 │
│ │ └── 503 服务不可用 降级事件(Degradation) │
│ └── 超时错误 ├── 降级到备用模型 │
│ ├── 连接超时 ├── 返回缓存响应 │
│ └── 读取超时 └── 返回预设兜底回复 │
└─────────────────────────────────────────────────────────────┘操作步骤
步骤 1:实现错误分类与追踪
# error_classifier.py — AI 系统错误分类器
from enum import Enum
from dataclasses import dataclass
from typing import Optional
from metrics import LLM_REQUESTS_TOTAL, LLM_ERRORS_TOTAL
class ErrorCategory(Enum):
# 硬错误
RATE_LIMIT = "rate_limit" # 429
AUTH_FAILURE = "auth_failure" # 401/403
CONTEXT_OVERFLOW = "context_overflow" # 413/400
SERVER_ERROR = "server_error" # 5xx
TIMEOUT = "timeout" # 超时
# 软错误
REFUSAL = "refusal" # 模型拒绝回答
EMPTY_RESPONSE = "empty_response" # 空响应
FORMAT_ERROR = "format_error" # 输出格式错误
SAFETY_FILTER = "safety_filter" # 安全过滤
TOOL_FAILURE = "tool_failure" # 工具调用失败
LOW_QUALITY = "low_quality" # 质量评分低
# 降级
MODEL_FALLBACK = "model_fallback" # 降级到备用模型
CACHE_FALLBACK = "cache_fallback" # 返回缓存
DEFAULT_RESPONSE = "default_response" # 兜底回复
@dataclass
class ErrorEvent:
category: ErrorCategory
model: str
message: str
status_code: Optional[int] = None
is_retriable: bool = False
severity: str = "medium" # low, medium, high, critical
class AIErrorClassifier:
"""AI 系统错误分类器"""
def classify(self, exception: Exception, response=None) -> ErrorEvent:
"""根据异常或响应分类错误"""
# HTTP 错误分类
status_code = getattr(exception, "status_code", None)
if status_code:
if status_code == 429:
return ErrorEvent(
category=ErrorCategory.RATE_LIMIT,
model="unknown",
message=str(exception),
status_code=429,
is_retriable=True,
severity="medium"
)
elif status_code in (401, 403):
return ErrorEvent(
category=ErrorCategory.AUTH_FAILURE,
model="unknown",
message="认证失败",
status_code=status_code,
severity="critical"
)
elif status_code >= 500:
return ErrorEvent(
category=ErrorCategory.SERVER_ERROR,
model="unknown",
message=str(exception),
status_code=status_code,
is_retriable=True,
severity="high"
)
# 超时错误
if "timeout" in str(exception).lower():
return ErrorEvent(
category=ErrorCategory.TIMEOUT,
model="unknown",
message="请求超时",
is_retriable=True,
severity="medium"
)
# 软错误分类(基于响应内容)
if response:
return self._classify_soft_error(response)
return ErrorEvent(
category=ErrorCategory.SERVER_ERROR,
model="unknown",
message=str(exception),
severity="high"
)
def _classify_soft_error(self, response) -> Optional[ErrorEvent]:
"""分类软错误"""
content = response.choices[0].message.content if response.choices else ""
model = response.model
# 空响应
if not content or len(content.strip()) < 10:
return ErrorEvent(
category=ErrorCategory.EMPTY_RESPONSE,
model=model,
message="响应内容为空或过短",
severity="medium"
)
# 模型拒绝
refusal_patterns = [
"I cannot", "I'm unable to", "I apologize",
"我无法", "抱歉,我不能", "作为 AI"
]
if any(p in content[:200] for p in refusal_patterns):
return ErrorEvent(
category=ErrorCategory.REFUSAL,
model=model,
message="模型拒绝回答",
severity="low"
)
return None # 无软错误
def record_error(self, event: ErrorEvent):
"""记录错误到 Prometheus"""
LLM_ERRORS_TOTAL.labels(
model=event.model,
error_type=event.category.value,
error_code=str(event.status_code or 0)
).inc()步骤 2:SLA 达标率追踪
# sla_tracker.py — SLA 达标率实时追踪
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class SLAWindow:
"""滑动窗口 SLA 追踪"""
window_seconds: int = 3600 # 1 小时窗口
events: list = field(default_factory=list)
def record(self, success: bool, latency: float):
now = time.time()
self.events.append({
"timestamp": now,
"success": success,
"latency": latency
})
# 清理过期事件
cutoff = now - self.window_seconds
self.events = [e for e in self.events if e["timestamp"] > cutoff]
def get_metrics(self) -> dict:
if not self.events:
return {"success_rate": 1.0, "p50": 0, "p95": 0, "p99": 0}
successes = sum(1 for e in self.events if e["success"])
latencies = sorted(e["latency"] for e in self.events)
n = len(latencies)
return {
"success_rate": successes / len(self.events),
"total_requests": len(self.events),
"p50_latency": latencies[int(n * 0.5)] if n > 0 else 0,
"p95_latency": latencies[int(n * 0.95)] if n > 0 else 0,
"p99_latency": latencies[int(n * 0.99)] if n > 0 else 0,
"error_budget_remaining": max(
0, 0.005 - (1 - successes / len(self.events))
) / 0.005 * 100 # 基于 99.5% SLO
}
class SLADashboard:
"""多维度 SLA 仪表板"""
def __init__(self):
self.windows = defaultdict(lambda: SLAWindow())
def record(self, dimension: str, success: bool, latency: float):
"""按维度记录(如 model、feature、user_tier)"""
self.windows[dimension].record(success, latency)
self.windows["global"].record(success, latency)
def get_dashboard(self) -> dict:
return {
dim: window.get_metrics()
for dim, window in self.windows.items()
}3. 延迟百分位监控:P50/P95/P99 与 TTFT
LLM 应用的延迟分布通常呈长尾特征——P50 可能只有 1 秒,但 P99 可能高达 15 秒。仅关注平均延迟会掩盖大量用户的糟糕体验。
3.1 延迟指标分解
┌─────────────────────────────────────────────────────────────┐
│ LLM 请求延迟分解 │
│ │
│ ┌──────┐ ┌──────┐ ┌──────────┐ ┌──────┐ ┌──────┐ │
│ │网络 │→│排队 │→│模型推理 │→│流式 │→│后处理 │ │
│ │延迟 │ │等待 │ │(TTFT) │ │传输 │ │ │ │
│ └──────┘ └──────┘ └──────────┘ └──────┘ └──────┘ │
│ ~50ms ~0-5s ~200ms-2s ~1-10s ~10-100ms │
│ │
│ TTFT = 网络 + 排队 + 首 Token 生成 │
│ E2E = TTFT + 流式传输 + 后处理 │
│ │
│ 关键指标: │
│ • TTFT(Time to First Token):用户感知的"开始响应"时间 │
│ • TBT(Time Between Tokens):流式输出的 Token 间隔 │
│ • E2E(End-to-End):完整请求的总耗时 │
└─────────────────────────────────────────────────────────────┘操作步骤
步骤 1:流式响应延迟采集
# latency_tracker.py — 流式响应延迟精确采集
import time
import asyncio
from dataclasses import dataclass, field
from typing import AsyncIterator
from metrics import LLM_TTFT, LLM_REQUEST_DURATION
@dataclass
class LatencyBreakdown:
"""延迟分解记录"""
start_time: float = 0
first_token_time: float = 0
last_token_time: float = 0
end_time: float = 0
token_count: int = 0
token_timestamps: list = field(default_factory=list)
@property
def ttft(self) -> float:
"""首 Token 时间"""
if self.first_token_time and self.start_time:
return self.first_token_time - self.start_time
return 0
@property
def e2e(self) -> float:
"""端到端延迟"""
if self.end_time and self.start_time:
return self.end_time - self.start_time
return 0
@property
def avg_tbt(self) -> float:
"""平均 Token 间隔"""
if len(self.token_timestamps) < 2:
return 0
intervals = [
self.token_timestamps[i] - self.token_timestamps[i-1]
for i in range(1, len(self.token_timestamps))
]
return sum(intervals) / len(intervals)
@property
def tokens_per_second(self) -> float:
"""Token 生成速率"""
duration = self.last_token_time - self.first_token_time
if duration > 0 and self.token_count > 1:
return (self.token_count - 1) / duration
return 0
async def track_streaming_latency(
stream: AsyncIterator,
model: str
) -> tuple[str, LatencyBreakdown]:
"""追踪流式响应的详细延迟"""
breakdown = LatencyBreakdown(start_time=time.time())
chunks = []
async for chunk in stream:
now = time.time()
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
breakdown.token_count += 1
breakdown.token_timestamps.append(now)
if breakdown.token_count == 1:
breakdown.first_token_time = now
# 记录 TTFT 到 Prometheus
LLM_TTFT.labels(model=model).observe(
breakdown.ttft
)
breakdown.last_token_time = now
breakdown.end_time = time.time()
# 记录 E2E 延迟到 Prometheus
LLM_REQUEST_DURATION.labels(
model=model, endpoint="streaming"
).observe(breakdown.e2e)
full_response = "".join(chunks)
return full_response, breakdown
# 使用示例
async def monitored_streaming_call(client, model: str, messages: list):
"""带延迟监控的流式调用"""
stream = await client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
response, latency = await track_streaming_latency(stream, model)
print(f"TTFT: {latency.ttft:.3f}s")
print(f"E2E: {latency.e2e:.3f}s")
print(f"TPS: {latency.tokens_per_second:.1f} tokens/s")
print(f"Avg TBT: {latency.avg_tbt*1000:.1f}ms")
return response, latency步骤 2:延迟百分位 Prometheus 查询
以下是用于 Grafana 仪表板的关键 PromQL 查询:
# P50 延迟(中位数)
histogram_quantile(0.50,
sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model)
)
# P95 延迟
histogram_quantile(0.95,
sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model)
)
# P99 延迟
histogram_quantile(0.99,
sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model)
)
# TTFT P95
histogram_quantile(0.95,
sum(rate(llm_time_to_first_token_seconds_bucket[5m])) by (le, model)
)
# 延迟 SLO 违规率(P95 > 5s 的请求比例)
1 - (
sum(rate(llm_request_duration_seconds_bucket{le="5.0"}[5m]))
/
sum(rate(llm_request_duration_seconds_count[5m]))
)
# 按模型的请求速率
sum(rate(llm_request_duration_seconds_count[5m])) by (model)4. 每次交互成本追踪
成本监控不仅是财务需求,更是产品健康度的关键信号。成本突然飙升可能意味着 Prompt 膨胀、缓存失效或模型路由异常。
4.1 多维度成本追踪模型
┌─────────────────────────────────────────────────────────────┐
│ 成本追踪维度 │
│ │
│ 按用户维度 按功能维度 按模型维度 │
│ ├── 每用户日成本 ├── 客服模块成本 ├── GPT-4.1 成本 │
│ ├── 每用户月成本 ├── 搜索模块成本 ├── Claude 成本 │
│ ├── 付费用户成本 ├── 代码助手成本 ├── Gemini 成本 │
│ └── 免费用户成本 └── 数据分析成本 └── 缓存节省额 │
│ │
│ 关键比率: │
│ • 每次交互成本 = 总 API 费用 / 总交互数 │
│ • 每用户成本 = 总 API 费用 / 活跃用户数 │
│ • 成本效率比 = 缓存节省额 / 总 API 费用 │
│ • 预算消耗速率 = 当日花费 / (月预算 / 30) │
└─────────────────────────────────────────────────────────────┘操作步骤
步骤 1:实时成本追踪系统
# cost_tracker.py — 多维度实时成本追踪
import time
from collections import defaultdict
from datetime import datetime, timedelta
from metrics import LLM_COST_TOTAL, LLM_COST_PER_INTERACTION
class CostTracker:
"""多维度成本追踪器"""
def __init__(self, monthly_budget: float = 5000.0):
self.monthly_budget = monthly_budget
self.daily_costs = defaultdict(float)
self.feature_costs = defaultdict(float)
self.user_costs = defaultdict(float)
self.model_costs = defaultdict(float)
self.interaction_count = 0
def record(
self,
cost: float,
model: str,
feature: str,
user_id: str,
user_tier: str = "free"
):
"""记录一次交互的成本"""
today = datetime.utcnow().strftime("%Y-%m-%d")
self.daily_costs[today] += cost
self.feature_costs[feature] += cost
self.user_costs[user_id] += cost
self.model_costs[model] += cost
self.interaction_count += 1
# 记录到 Prometheus
LLM_COST_TOTAL.labels(
model=model, feature=feature, user_tier=user_tier
).inc(cost)
LLM_COST_PER_INTERACTION.labels(feature=feature).observe(cost)
def get_budget_status(self) -> dict:
"""获取预算状态"""
month_key = datetime.utcnow().strftime("%Y-%m")
month_total = sum(
v for k, v in self.daily_costs.items()
if k.startswith(month_key)
)
days_in_month = 30
days_elapsed = datetime.utcnow().day
daily_avg = month_total / max(days_elapsed, 1)
projected_monthly = daily_avg * days_in_month
return {
"month_total": round(month_total, 2),
"monthly_budget": self.monthly_budget,
"usage_percent": round(month_total / self.monthly_budget * 100, 1),
"daily_average": round(daily_avg, 2),
"projected_monthly": round(projected_monthly, 2),
"projected_over_budget": projected_monthly > self.monthly_budget,
"avg_cost_per_interaction": round(
month_total / max(self.interaction_count, 1), 4
),
"top_features": dict(
sorted(self.feature_costs.items(),
key=lambda x: x[1], reverse=True)[:5]
),
"top_models": dict(
sorted(self.model_costs.items(),
key=lambda x: x[1], reverse=True)[:5]
)
}步骤 2:成本告警 PromQL 规则
# 每次交互平均成本(5 分钟窗口)
sum(rate(llm_cost_dollars_total[5m]))
/
sum(rate(llm_requests_total{status="success"}[5m]))
# 按功能的成本分布
sum(rate(llm_cost_dollars_total[1h])) by (feature) * 3600
# 月度预算消耗速率(日均 × 30 > 预算则告警)
sum(increase(llm_cost_dollars_total[24h])) * 30
# 成本异常检测(当前小时成本 > 过去 7 天同时段均值的 2 倍)
sum(increase(llm_cost_dollars_total[1h]))
> 2 * avg_over_time(
sum(increase(llm_cost_dollars_total[1h]))[7d:1h]
)
# 缓存节省率
sum(rate(llm_cache_hits_total[1h]))
/
(sum(rate(llm_cache_hits_total[1h])) + sum(rate(llm_cache_misses_total[1h])))5. 用户满意度评分
用户满意度是 AI 系统质量的终极衡量标准。技术指标再好,如果用户不满意,系统就是失败的。AI 交互的满意度采集需要结合即时反馈(👍/👎)、结构化评分(CSAT)和长期忠诚度(NPS)。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Langfuse Scores | LLM 交互评分采集 | 免费(自托管) | 与 trace 关联的反馈 |
| Hotjar | 用户行为分析 + 反馈 | 免费(基础版) | Web 应用反馈采集 |
| Delighted | NPS/CSAT 调查 | 起步 $224/月 | 专业满意度调查 |
| 自建方案 | 自定义反馈系统 | 免费 | 完全控制 |
操作步骤
步骤 1:构建多层反馈采集系统
# feedback_collector.py — 多层用户反馈采集
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
from metrics import USER_FEEDBACK_TOTAL, USER_SATISFACTION_SCORE
class FeedbackType(Enum):
THUMBS = "thumbs" # 👍/👎 即时反馈
RATING = "rating" # 1-5 星评分
CSAT = "csat" # 客户满意度调查
NPS = "nps" # 净推荐值
TEXT = "text" # 文本反馈
@dataclass
class FeedbackEvent:
trace_id: str # 关联的 LLM trace ID
user_id: str
feedback_type: FeedbackType
value: float # 标准化为 0-1 范围
raw_value: str # 原始值
feature: str
comment: Optional[str] = None
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow()
class FeedbackCollector:
"""多层反馈采集器"""
def __init__(self):
self.feedback_store = [] # 生产环境替换为数据库
def record_thumbs(
self, trace_id: str, user_id: str,
is_positive: bool, feature: str
):
"""记录 👍/👎 反馈"""
feedback = FeedbackEvent(
trace_id=trace_id,
user_id=user_id,
feedback_type=FeedbackType.THUMBS,
value=1.0 if is_positive else 0.0,
raw_value="thumbs_up" if is_positive else "thumbs_down",
feature=feature
)
self._store_and_record(feedback)
def record_rating(
self, trace_id: str, user_id: str,
rating: int, feature: str, comment: str = None
):
"""记录 1-5 星评分"""
feedback = FeedbackEvent(
trace_id=trace_id,
user_id=user_id,
feedback_type=FeedbackType.RATING,
value=rating / 5.0,
raw_value=str(rating),
feature=feature,
comment=comment
)
self._store_and_record(feedback)
USER_SATISFACTION_SCORE.labels(feature=feature).observe(rating)
def record_csat(
self, user_id: str, score: int,
feature: str, trace_id: str = ""
):
"""记录 CSAT 评分(1-5)"""
feedback = FeedbackEvent(
trace_id=trace_id,
user_id=user_id,
feedback_type=FeedbackType.CSAT,
value=score / 5.0,
raw_value=str(score),
feature=feature
)
self._store_and_record(feedback)
def record_nps(
self, user_id: str, score: int, feature: str
):
"""记录 NPS 评分(0-10)"""
# NPS 分类:0-6 贬损者,7-8 被动者,9-10 推荐者
category = (
"promoter" if score >= 9
else "passive" if score >= 7
else "detractor"
)
feedback = FeedbackEvent(
trace_id="",
user_id=user_id,
feedback_type=FeedbackType.NPS,
value=score / 10.0,
raw_value=f"{score}_{category}",
feature=feature
)
self._store_and_record(feedback)
def _store_and_record(self, feedback: FeedbackEvent):
"""存储反馈并记录 Prometheus 指标"""
self.feedback_store.append(feedback)
USER_FEEDBACK_TOTAL.labels(
feedback_type=feedback.raw_value,
feature=feedback.feature
).inc()
def get_satisfaction_metrics(self, feature: str = None) -> dict:
"""计算满意度指标"""
feedbacks = self.feedback_store
if feature:
feedbacks = [f for f in feedbacks if f.feature == feature]
if not feedbacks:
return {"no_data": True}
# 👍/👎 比率
thumbs = [f for f in feedbacks if f.feedback_type == FeedbackType.THUMBS]
thumbs_up_rate = (
sum(1 for f in thumbs if f.value == 1.0) / len(thumbs)
if thumbs else None
)
# CSAT 评分
csat_scores = [
f.value * 5 for f in feedbacks
if f.feedback_type == FeedbackType.CSAT
]
avg_csat = sum(csat_scores) / len(csat_scores) if csat_scores else None
csat_percent = (
sum(1 for s in csat_scores if s >= 4) / len(csat_scores) * 100
if csat_scores else None
)
# NPS 计算
nps_scores = [
f.value * 10 for f in feedbacks
if f.feedback_type == FeedbackType.NPS
]
if nps_scores:
promoters = sum(1 for s in nps_scores if s >= 9) / len(nps_scores)
detractors = sum(1 for s in nps_scores if s <= 6) / len(nps_scores)
nps = round((promoters - detractors) * 100)
else:
nps = None
return {
"thumbs_up_rate": round(thumbs_up_rate * 100, 1) if thumbs_up_rate else None,
"avg_csat": round(avg_csat, 2) if avg_csat else None,
"csat_satisfied_percent": round(csat_percent, 1) if csat_percent else None,
"nps": nps,
"total_feedbacks": len(feedbacks)
}步骤 2:反馈与 LLM Trace 关联(Langfuse 集成)
# langfuse_feedback.py — 将用户反馈关联到 Langfuse trace
from langfuse import Langfuse
langfuse = Langfuse()
def record_feedback_to_langfuse(
trace_id: str,
feedback_type: str,
value: float,
comment: str = None
):
"""将反馈评分关联到 Langfuse trace"""
langfuse.score(
trace_id=trace_id,
name=feedback_type, # "user_thumbs", "user_rating", "csat"
value=value,
comment=comment
)
# 在 API 端点中使用
# POST /api/feedback
async def handle_feedback(request):
data = request.json()
# 记录到 Langfuse(关联 trace)
record_feedback_to_langfuse(
trace_id=data["trace_id"],
feedback_type=data["type"],
value=data["value"],
comment=data.get("comment")
)
# 记录到 Prometheus(实时告警)
collector = FeedbackCollector()
if data["type"] == "thumbs":
collector.record_thumbs(
trace_id=data["trace_id"],
user_id=data["user_id"],
is_positive=data["value"] > 0,
feature=data["feature"]
)
return {"status": "ok"}提示词模板
你是一个用户体验分析师。请分析以下 AI 系统的用户反馈数据,生成改进建议:
## 反馈数据摘要
- 时间范围:[起始日期] 至 [结束日期]
- 总交互数:[数量]
- 👍/👎 比率:[正面比率]%
- CSAT 平均分:[分数]/5.0
- NPS 评分:[分数]
- 最常见的负面反馈关键词:[关键词列表]
- 负面反馈最多的功能模块:[模块名称]
## 典型负面反馈示例
1. "[反馈内容1]" — 功能:[模块],评分:[分数]
2. "[反馈内容2]" — 功能:[模块],评分:[分数]
3. "[反馈内容3]" — 功能:[模块],评分:[分数]
## 请输出
1. 负面反馈根因分析(按严重程度排序)
2. 每个根因的具体改进建议
3. 优先级排序(影响面 × 严重程度)
4. 预期改进效果(CSAT 提升预估)6. 漂移检测:输出质量漂移、主题漂移与嵌入漂移
AI 系统的一个独特挑战是”静默退化”——模型提供商的 API 更新、数据分布变化或 Prompt 微调都可能导致输出质量悄然下降,而传统监控无法捕捉这种变化。漂移检测是发现这类问题的关键手段。
6.1 漂移类型全景
┌─────────────────────────────────────────────────────────────┐
│ AI 系统漂移类型 │
│ │
│ 输出质量漂移 主题漂移 │
│ ├── 质量评分下降趋势 ├── 输出主题偏离预期范围 │
│ ├── 幻觉率上升 ├── 回答风格突变 │
│ ├── 格式合规率下降 └── 拒绝率异常变化 │
│ └── 一致性评分波动 │
│ │
│ 嵌入漂移 数据漂移 │
│ ├── 输入嵌入分布偏移 ├── 用户查询模式变化 │
│ ├── 输出嵌入分布偏移 ├── 新主题/新领域出现 │
│ └── 检索相关性下降 └── 季节性/事件性变化 │
└─────────────────────────────────────────────────────────────┘工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Evidently AI | 数据/嵌入漂移检测 | 免费(开源) | 统计检验驱动的漂移监控 |
| Arize Phoenix | LLM 可观测性 + 漂移 | 免费(开源) | 嵌入可视化、漂移检测 |
| WhyLabs | ML 监控平台 | 免费(基础版) | 自动漂移检测与告警 |
| Galileo | LLM 质量监控 | 联系销售 | 幻觉检测、一致性监控 |
操作步骤
步骤 1:输出质量漂移检测
# drift_detector.py — 输出质量漂移检测
import numpy as np
from collections import deque
from datetime import datetime, timedelta
from typing import Optional
from metrics import LLM_DRIFT_SCORE
class QualityDriftDetector:
"""基于滑动窗口的输出质量漂移检测器"""
def __init__(
self,
baseline_window: int = 1000, # 基线窗口大小
detection_window: int = 100, # 检测窗口大小
z_threshold: float = 2.0, # Z-score 告警阈值
model: str = "default"
):
self.baseline_scores = deque(maxlen=baseline_window)
self.recent_scores = deque(maxlen=detection_window)
self.z_threshold = z_threshold
self.model = model
self.drift_history = []
def add_score(self, score: float, dimension: str = "overall"):
"""添加一个质量评分"""
self.baseline_scores.append(score)
self.recent_scores.append(score)
# 检测漂移
drift_result = self._detect_drift(dimension)
if drift_result:
self.drift_history.append({
"timestamp": datetime.utcnow().isoformat(),
"dimension": dimension,
**drift_result
})
return drift_result
def _detect_drift(self, dimension: str) -> Optional[dict]:
"""使用 Z-score 检测漂移"""
if len(self.baseline_scores) < 100 or len(self.recent_scores) < 20:
return None
baseline_mean = np.mean(list(self.baseline_scores))
baseline_std = np.std(list(self.baseline_scores))
recent_mean = np.mean(list(self.recent_scores))
if baseline_std == 0:
return None
z_score = (recent_mean - baseline_mean) / (
baseline_std / np.sqrt(len(self.recent_scores))
)
# 更新 Prometheus 指标
LLM_DRIFT_SCORE.labels(
model=self.model,
drift_type=f"quality_{dimension}"
).set(abs(z_score))
is_drifting = abs(z_score) > self.z_threshold
return {
"z_score": round(z_score, 3),
"baseline_mean": round(baseline_mean, 3),
"recent_mean": round(recent_mean, 3),
"is_drifting": is_drifting,
"direction": "degrading" if z_score < 0 else "improving",
"severity": (
"critical" if abs(z_score) > 3.0
else "warning" if abs(z_score) > 2.0
else "normal"
)
}
class HallucinationRateMonitor:
"""幻觉率监控器"""
def __init__(self, window_size: int = 500):
self.results = deque(maxlen=window_size)
self.hourly_rates = {}
def record(self, is_hallucination: bool, severity: str = "medium"):
"""记录一次幻觉检测结果"""
self.results.append({
"is_hallucination": is_hallucination,
"severity": severity,
"timestamp": datetime.utcnow()
})
if is_hallucination:
from metrics import LLM_HALLUCINATION_DETECTED
LLM_HALLUCINATION_DETECTED.labels(
model="default", severity=severity
).inc()
def get_rate(self) -> dict:
"""获取当前幻觉率"""
if not self.results:
return {"rate": 0, "count": 0, "total": 0}
hallucinations = sum(
1 for r in self.results if r["is_hallucination"]
)
total = len(self.results)
return {
"rate": round(hallucinations / total * 100, 2),
"count": hallucinations,
"total": total,
"by_severity": {
sev: sum(
1 for r in self.results
if r["is_hallucination"] and r["severity"] == sev
)
for sev in ["low", "medium", "high"]
}
}步骤 2:嵌入漂移检测
# embedding_drift.py — 嵌入空间漂移检测
import numpy as np
from scipy import stats
from typing import List
from metrics import EMBEDDING_DRIFT_DISTANCE
class EmbeddingDriftDetector:
"""基于嵌入向量的漂移检测器"""
def __init__(self, reference_embeddings: np.ndarray = None):
self.reference = reference_embeddings # 基线嵌入集合
self.current_window = []
def set_reference(self, embeddings: List[List[float]]):
"""设置基线嵌入(通常来自验证集或首周生产数据)"""
self.reference = np.array(embeddings)
def add_embedding(self, embedding: List[float]):
"""添加新的嵌入向量"""
self.current_window.append(embedding)
def detect_drift(self, method: str = "cosine_centroid") -> dict:
"""检测嵌入漂移"""
if self.reference is None or len(self.current_window) < 50:
return {"status": "insufficient_data"}
current = np.array(self.current_window[-500:]) # 最近 500 个
if method == "cosine_centroid":
return self._cosine_centroid_drift(current)
elif method == "mmd":
return self._mmd_drift(current)
elif method == "ks_test":
return self._ks_test_drift(current)
return {"status": "unknown_method"}
def _cosine_centroid_drift(self, current: np.ndarray) -> dict:
"""基于质心余弦距离的漂移检测"""
ref_centroid = np.mean(self.reference, axis=0)
cur_centroid = np.mean(current, axis=0)
# 余弦相似度
similarity = np.dot(ref_centroid, cur_centroid) / (
np.linalg.norm(ref_centroid) * np.linalg.norm(cur_centroid)
)
distance = 1 - similarity
EMBEDDING_DRIFT_DISTANCE.labels(
model="default", metric="cosine"
).set(distance)
return {
"method": "cosine_centroid",
"distance": round(float(distance), 6),
"similarity": round(float(similarity), 6),
"is_drifting": distance > 0.05, # 阈值可调
"severity": (
"critical" if distance > 0.1
else "warning" if distance > 0.05
else "normal"
)
}
def _mmd_drift(self, current: np.ndarray) -> dict:
"""最大均值差异(MMD)漂移检测"""
# 简化的 MMD 计算
n_ref = min(len(self.reference), 500)
n_cur = min(len(current), 500)
ref_sample = self.reference[
np.random.choice(len(self.reference), n_ref, replace=False)
]
cur_sample = current[
np.random.choice(len(current), n_cur, replace=False)
]
# RBF 核 MMD
def rbf_kernel(X, Y, sigma=1.0):
dists = np.sum((X[:, None] - Y[None, :]) ** 2, axis=2)
return np.exp(-dists / (2 * sigma ** 2))
K_xx = rbf_kernel(ref_sample, ref_sample)
K_yy = rbf_kernel(cur_sample, cur_sample)
K_xy = rbf_kernel(ref_sample, cur_sample)
mmd = (
np.mean(K_xx) + np.mean(K_yy) - 2 * np.mean(K_xy)
)
return {
"method": "mmd",
"mmd_value": round(float(mmd), 6),
"is_drifting": mmd > 0.01,
"severity": (
"critical" if mmd > 0.05
else "warning" if mmd > 0.01
else "normal"
)
}
def _ks_test_drift(self, current: np.ndarray) -> dict:
"""Kolmogorov-Smirnov 检验(逐维度)"""
n_dims = min(self.reference.shape[1], 50) # 取前 50 维
p_values = []
for dim in range(n_dims):
stat, p_value = stats.ks_2samp(
self.reference[:, dim],
current[:, dim]
)
p_values.append(p_value)
# Bonferroni 校正
min_p = min(p_values) * n_dims
drifting_dims = sum(1 for p in p_values if p < 0.05 / n_dims)
return {
"method": "ks_test",
"min_p_value": round(float(min_p), 6),
"drifting_dimensions": drifting_dims,
"total_dimensions": n_dims,
"drift_ratio": round(drifting_dims / n_dims, 3),
"is_drifting": drifting_dims / n_dims > 0.1,
"severity": (
"critical" if drifting_dims / n_dims > 0.3
else "warning" if drifting_dims / n_dims > 0.1
else "normal"
)
}7. 告警体系搭建:Prometheus + Grafana + PagerDuty/Slack
有了指标,还需要一套完整的告警体系来确保问题被及时发现和处理。
7.1 告警分级策略
| 级别 | 名称 | 响应时间 | 通知方式 | 示例 |
|---|---|---|---|---|
| P0 | 严重 | 5 分钟 | PagerDuty 电话 + Slack | 成功率 < 95%、全面宕机 |
| P1 | 高 | 15 分钟 | PagerDuty + Slack | P99 延迟 > 30s、幻觉率 > 10% |
| P2 | 中 | 1 小时 | Slack 频道 | 成本超预算 80%、CSAT < 70% |
| P3 | 低 | 24 小时 | 邮件/Slack | 漂移 Z-score > 2、缓存命中率下降 |
操作步骤
步骤 1:Prometheus 告警规则配置
# prometheus/rules/llm_alerts.yml
groups:
- name: llm_availability
rules:
# P0: 成功率严重下降
- alert: LLMSuccessRateCritical
expr: |
(
sum(rate(llm_requests_total{status="success"}[5m]))
/
sum(rate(llm_requests_total[5m]))
) < 0.95
for: 2m
labels:
severity: critical
team: ai-platform
annotations:
summary: "🔴 LLM API 成功率低于 95%"
description: >
当前成功率: {{ $value | humanizePercentage }}。
持续 2 分钟以上,可能影响大量用户。
runbook_url: "https://wiki.internal/runbooks/llm-success-rate"
# P1: 速率限制频繁触发
- alert: LLMRateLimitHigh
expr: |
sum(rate(llm_errors_total{error_type="rate_limit"}[5m])) > 10
for: 5m
labels:
severity: high
team: ai-platform
annotations:
summary: "🟠 LLM 速率限制频繁触发"
description: >
过去 5 分钟内速率限制错误率: {{ $value }}/s。
检查是否需要增加 API 配额或启用请求队列。
- name: llm_latency
rules:
# P1: P99 延迟过高
- alert: LLMP99LatencyHigh
expr: |
histogram_quantile(0.99,
sum(rate(llm_request_duration_seconds_bucket[5m])) by (le)
) > 15
for: 5m
labels:
severity: high
team: ai-platform
annotations:
summary: "🟠 LLM P99 延迟超过 15 秒"
description: >
当前 P99 延迟: {{ $value | humanizeDuration }}。
检查模型提供商状态和网络连接。
# P2: TTFT 过高
- alert: LLMTTFTHigh
expr: |
histogram_quantile(0.95,
sum(rate(llm_time_to_first_token_seconds_bucket[5m])) by (le)
) > 2
for: 10m
labels:
severity: warning
team: ai-platform
annotations:
summary: "🟡 LLM 首 Token 时间 P95 超过 2 秒"
description: "当前 TTFT P95: {{ $value }}s"
- name: llm_cost
rules:
# P2: 月度预算即将超支
- alert: LLMBudgetWarning
expr: |
sum(increase(llm_cost_dollars_total[24h])) * 30 > 5000 * 0.8
for: 1h
labels:
severity: warning
team: ai-platform
annotations:
summary: "🟡 LLM 月度成本预计超过预算 80%"
description: >
按当前消耗速率,预计月度成本:
${{ $value | humanize }}(预算: $5000)
# P1: 成本异常飙升
- alert: LLMCostSpike
expr: |
sum(increase(llm_cost_dollars_total[1h]))
> 2 * avg_over_time(
sum(increase(llm_cost_dollars_total[1h]))[7d:1h]
)
for: 30m
labels:
severity: high
team: ai-platform
annotations:
summary: "🟠 LLM 成本异常飙升"
description: >
当前小时成本是过去 7 天同时段均值的 2 倍以上。
检查是否有异常流量或缓存失效。
- name: llm_quality
rules:
# P1: 幻觉率过高
- alert: LLMHallucinationRateHigh
expr: |
sum(rate(llm_hallucination_detected_total[1h]))
/
sum(rate(llm_requests_total{status="success"}[1h]))
> 0.10
for: 30m
labels:
severity: high
team: ai-platform
annotations:
summary: "🟠 LLM 幻觉率超过 10%"
description: >
当前幻觉率: {{ $value | humanizePercentage }}。
检查模型版本、Prompt 变更和检索质量。
# P2: 质量漂移
- alert: LLMQualityDrift
expr: |
llm_drift_score{drift_type=~"quality_.*"} > 2.0
for: 1h
labels:
severity: warning
team: ai-platform
annotations:
summary: "🟡 LLM 输出质量漂移检测"
description: >
漂移 Z-score: {{ $value }}(阈值: 2.0)。
输出质量可能正在下降,建议人工抽检。
# P2: 用户满意度下降
- alert: LLMSatisfactionLow
expr: |
(
sum(rate(user_feedback_total{feedback_type="thumbs_up"}[24h]))
/
(
sum(rate(user_feedback_total{feedback_type="thumbs_up"}[24h]))
+
sum(rate(user_feedback_total{feedback_type="thumbs_down"}[24h]))
)
) < 0.80
for: 6h
labels:
severity: warning
team: ai-platform
annotations:
summary: "🟡 用户满意度低于 80%"
description: >
过去 24 小时 👍 比率: {{ $value | humanizePercentage }}。
建议分析负面反馈并排查质量问题。
- name: llm_drift
rules:
# P3: 嵌入漂移
- alert: EmbeddingDriftDetected
expr: |
embedding_drift_distance{metric="cosine"} > 0.05
for: 2h
labels:
severity: info
team: ai-platform
annotations:
summary: "ℹ️ 嵌入漂移检测"
description: >
余弦漂移距离: {{ $value }}(阈值: 0.05)。
输入/输出分布可能发生变化。步骤 2:Grafana 仪表板配置
{
"dashboard": {
"title": "AI Agent 生产质量监控",
"tags": ["llm", "agentops", "quality"],
"timezone": "browser",
"panels": [
{
"title": "🟢 API 成功率(5 分钟窗口)",
"type": "gauge",
"gridPos": {"h": 6, "w": 6, "x": 0, "y": 0},
"targets": [{
"expr": "sum(rate(llm_requests_total{status='success'}[5m])) / sum(rate(llm_requests_total[5m])) * 100",
"legendFormat": "成功率"
}],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "orange", "value": 95},
{"color": "yellow", "value": 99},
{"color": "green", "value": 99.5}
]
},
"unit": "percent",
"min": 90,
"max": 100
}
}
},
{
"title": "⏱ 延迟百分位",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 6, "y": 0},
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "P50"
},
{
"expr": "histogram_quantile(0.95, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "P95"
},
{
"expr": "histogram_quantile(0.99, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "P99"
}
],
"fieldConfig": {
"defaults": {"unit": "s"}
}
},
{
"title": "💰 每小时成本",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
"targets": [
{
"expr": "sum(increase(llm_cost_dollars_total[1h])) by (model)",
"legendFormat": "{{model}}"
}
],
"fieldConfig": {
"defaults": {"unit": "currencyUSD"}
}
},
{
"title": "👍 用户满意度趋势",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 8},
"targets": [
{
"expr": "sum(rate(user_feedback_total{feedback_type='thumbs_up'}[1h])) / (sum(rate(user_feedback_total{feedback_type='thumbs_up'}[1h])) + sum(rate(user_feedback_total{feedback_type='thumbs_down'}[1h]))) * 100",
"legendFormat": "👍 比率"
}
],
"fieldConfig": {
"defaults": {"unit": "percent", "min": 0, "max": 100}
}
},
{
"title": "🔍 漂移检测",
"type": "stat",
"gridPos": {"h": 6, "w": 6, "x": 18, "y": 0},
"targets": [{
"expr": "max(llm_drift_score)",
"legendFormat": "最大 Z-score"
}],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 1.5},
{"color": "orange", "value": 2.0},
{"color": "red", "value": 3.0}
]
}
}
}
},
{
"title": "🎯 幻觉率",
"type": "gauge",
"gridPos": {"h": 6, "w": 6, "x": 0, "y": 16},
"targets": [{
"expr": "sum(rate(llm_hallucination_detected_total[1h])) / sum(rate(llm_requests_total{status='success'}[1h])) * 100",
"legendFormat": "幻觉率"
}],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 3},
{"color": "orange", "value": 5},
{"color": "red", "value": 10}
]
},
"unit": "percent",
"min": 0,
"max": 20
}
}
},
{
"title": "📊 错误分类分布",
"type": "piechart",
"gridPos": {"h": 8, "w": 8, "x": 6, "y": 16},
"targets": [{
"expr": "sum(increase(llm_errors_total[24h])) by (error_type)",
"legendFormat": "{{error_type}}"
}]
},
{
"title": "💾 缓存命中率",
"type": "gauge",
"gridPos": {"h": 6, "w": 6, "x": 14, "y": 16},
"targets": [{
"expr": "sum(rate(llm_cache_hits_total[1h])) / (sum(rate(llm_cache_hits_total[1h])) + sum(rate(llm_cache_misses_total[1h]))) * 100",
"legendFormat": "缓存命中率"
}],
"fieldConfig": {
"defaults": {
"unit": "percent",
"min": 0,
"max": 100
}
}
}
]
}
}步骤 3:Slack 告警集成
# alerting/slack_notifier.py — Slack 告警通知
import httpx
from typing import Optional
class SlackAlertNotifier:
"""Slack 告警通知器"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send_alert(
self,
title: str,
severity: str,
description: str,
metrics: dict = None,
runbook_url: str = None
):
"""发送告警到 Slack"""
color_map = {
"critical": "#FF0000",
"high": "#FF8C00",
"warning": "#FFD700",
"info": "#36A2EB"
}
emoji_map = {
"critical": "🔴",
"high": "🟠",
"warning": "🟡",
"info": "ℹ️"
}
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji_map.get(severity, '⚪')} {title}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": description
}
}
]
# 添加指标详情
if metrics:
fields = []
for key, value in metrics.items():
fields.append({
"type": "mrkdwn",
"text": f"*{key}:*\n{value}"
})
blocks.append({
"type": "section",
"fields": fields[:10] # Slack 限制 10 个字段
})
# 添加 Runbook 链接
if runbook_url:
blocks.append({
"type": "actions",
"elements": [{
"type": "button",
"text": {"type": "plain_text", "text": "📖 查看 Runbook"},
"url": runbook_url,
"style": "primary"
}]
})
payload = {
"attachments": [{
"color": color_map.get(severity, "#808080"),
"blocks": blocks
}]
}
async with httpx.AsyncClient() as client:
await client.post(self.webhook_url, json=payload)8. 自动化质量评估:LLM-as-Judge 在线评估管线
人工评估无法覆盖每一次 AI 交互。LLM-as-Judge 是一种使用强模型评估弱模型输出的方法,可以实现大规模自动化质量评估。研究表明,精心设计的 LLM-as-Judge 系统与人类评估者的一致率可达 90% 以上。
8.1 在线评估架构
┌─────────────────────────────────────────────────────────────┐
│ LLM-as-Judge 在线评估管线 │
│ │
│ 生产请求 ──→ 主模型响应 ──→ 返回用户 │
│ │ │
│ ▼ (异步采样) │
│ ┌──────────┐ │
│ │ 采样器 │ 采样率: 5-10% │
│ │ (随机/ │ │
│ │ 分层) │ │
│ └────┬─────┘ │
│ ▼ │
│ ┌──────────┐ │
│ │ 评估模型 │ Judge: GPT-4.1 / Claude Sonnet │
│ │ (Judge) │ │
│ └────┬─────┘ │
│ ▼ │
│ ┌──────────┐ │
│ │ 评分存储 │ → Langfuse / Prometheus │
│ │ + 告警 │ → 漂移检测 → 告警 │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────┘操作步骤
步骤 1:构建 LLM-as-Judge 评估器
# evaluator.py — LLM-as-Judge 在线评估器
import json
import random
import asyncio
from dataclasses import dataclass
from typing import Optional
from openai import AsyncOpenAI
from metrics import LLM_QUALITY_SCORE, LLM_HALLUCINATION_DETECTED
@dataclass
class EvaluationResult:
trace_id: str
relevance: float # 相关性 (1-5)
coherence: float # 连贯性 (1-5)
faithfulness: float # 忠实度 (1-5)
safety: float # 安全性 (1-5)
overall: float # 综合评分 (1-5)
is_hallucination: bool
reasoning: str
class LLMJudge:
"""LLM-as-Judge 在线评估器"""
EVALUATION_PROMPT = """你是一个严格的 AI 输出质量评估专家。请评估以下 AI 助手的回答质量。
## 用户问题
{query}
## AI 助手回答
{response}
{context_section}
## 评估维度(每项 1-5 分)
1. **相关性 (relevance)**:回答是否直接回应了用户的问题?
- 1分:完全无关
- 3分:部分相关但有偏题
- 5分:高度相关,精准回应
2. **连贯性 (coherence)**:回答是否逻辑清晰、结构合理?
- 1分:混乱无序
- 3分:基本可读但有跳跃
- 5分:逻辑严密,层次分明
3. **忠实度 (faithfulness)**:回答是否基于事实,没有编造信息?
- 1分:大量编造
- 3分:部分信息无法验证
- 5分:所有信息可验证或合理推断
4. **安全性 (safety)**:回答是否安全、无害、无偏见?
- 1分:包含有害内容
- 3分:存在轻微偏见
- 5分:完全安全中立
## 输出格式(严格 JSON)
```json
{{
"relevance": <1-5>,
"coherence": <1-5>,
"faithfulness": <1-5>,
"safety": <1-5>,
"overall": <1-5>,
"is_hallucination": <true/false>,
"reasoning": "<简要评估理由,50字以内>"
}}
```"""
def __init__(
self,
judge_model: str = "gpt-4.1-mini",
sample_rate: float = 0.05, # 5% 采样率
):
self.client = AsyncOpenAI()
self.judge_model = judge_model
self.sample_rate = sample_rate
def should_evaluate(self) -> bool:
"""根据采样率决定是否评估"""
return random.random() < self.sample_rate
async def evaluate(
self,
trace_id: str,
query: str,
response: str,
context: str = None
) -> Optional[EvaluationResult]:
"""评估一次 AI 交互的质量"""
if not self.should_evaluate():
return None
context_section = ""
if context:
context_section = f"## 参考上下文(用于判断忠实度)\n{context}"
prompt = self.EVALUATION_PROMPT.format(
query=query,
response=response,
context_section=context_section
)
try:
judge_response = await self.client.chat.completions.create(
model=self.judge_model,
messages=[{"role": "user", "content": prompt}],
temperature=0,
response_format={"type": "json_object"}
)
scores = json.loads(
judge_response.choices[0].message.content
)
result = EvaluationResult(
trace_id=trace_id,
relevance=scores["relevance"],
coherence=scores["coherence"],
faithfulness=scores["faithfulness"],
safety=scores["safety"],
overall=scores["overall"],
is_hallucination=scores.get("is_hallucination", False),
reasoning=scores.get("reasoning", "")
)
# 记录到 Prometheus
for dim in ["relevance", "coherence", "faithfulness", "safety", "overall"]:
LLM_QUALITY_SCORE.labels(
model=self.judge_model,
evaluator="llm_judge",
dimension=dim
).observe(getattr(result, dim))
if result.is_hallucination:
severity = (
"high" if result.faithfulness <= 2
else "medium" if result.faithfulness <= 3
else "low"
)
LLM_HALLUCINATION_DETECTED.labels(
model="production",
severity=severity
).inc()
return result
except Exception as e:
print(f"评估失败: {e}")
return None
# 集成到生产管线
judge = LLMJudge(sample_rate=0.05)
async def production_llm_call(query: str, context: str = None):
"""带在线评估的生产 LLM 调用"""
client = AsyncOpenAI()
# 1. 主模型调用
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
trace_id = response.id
# 2. 异步触发质量评估(不阻塞主流程)
asyncio.create_task(
judge.evaluate(
trace_id=trace_id,
query=query,
response=result,
context=context
)
)
return result步骤 2:批量离线评估管线
# batch_evaluator.py — 批量离线质量评估
import asyncio
from datetime import datetime, timedelta
class BatchQualityEvaluator:
"""批量离线质量评估——每日运行"""
def __init__(self, judge: LLMJudge):
self.judge = judge
self.judge.sample_rate = 1.0 # 离线评估不采样
async def evaluate_batch(
self, interactions: list[dict]
) -> dict:
"""批量评估一组交互"""
results = []
# 并发评估(限制并发数)
semaphore = asyncio.Semaphore(10)
async def eval_one(interaction):
async with semaphore:
return await self.judge.evaluate(
trace_id=interaction["trace_id"],
query=interaction["query"],
response=interaction["response"],
context=interaction.get("context")
)
tasks = [eval_one(i) for i in interactions]
results = await asyncio.gather(*tasks)
results = [r for r in results if r is not None]
# 汇总统计
if not results:
return {"no_data": True}
return {
"total_evaluated": len(results),
"avg_relevance": round(
sum(r.relevance for r in results) / len(results), 2
),
"avg_coherence": round(
sum(r.coherence for r in results) / len(results), 2
),
"avg_faithfulness": round(
sum(r.faithfulness for r in results) / len(results), 2
),
"avg_safety": round(
sum(r.safety for r in results) / len(results), 2
),
"avg_overall": round(
sum(r.overall for r in results) / len(results), 2
),
"hallucination_rate": round(
sum(1 for r in results if r.is_hallucination)
/ len(results) * 100, 2
),
"low_quality_count": sum(
1 for r in results if r.overall <= 2
),
"high_quality_count": sum(
1 for r in results if r.overall >= 4
)
}9. AI 系统事件响应:Runbook 与升级流程
AI 系统的事件响应与传统软件不同——模型行为的非确定性意味着”修复”可能不是改代码,而是调整 Prompt、切换模型或更新检索数据。
9.1 AI 系统事件分类
| 事件类型 | 示例 | 典型根因 | 响应策略 |
|---|---|---|---|
| 模型不可用 | API 返回 5xx | 提供商故障 | 切换备用模型 |
| 质量退化 | 幻觉率飙升 | 模型更新、Prompt 漂移 | 回滚 Prompt 版本 |
| 成本异常 | 日成本翻倍 | 缓存失效、流量激增 | 启用限流、修复缓存 |
| 安全事件 | Prompt 注入成功 | 输入验证不足 | 紧急加固过滤器 |
| 性能退化 | P99 延迟 > 30s | 提供商拥塞、上下文过长 | 降级模型、压缩上下文 |
| 数据泄露 | 输出包含 PII | 训练数据泄露、RAG 污染 | 紧急下线、审计日志 |
操作步骤
步骤 1:AI 系统事件响应 Runbook 模板
# AI 系统事件响应 Runbook
## 事件:LLM 输出质量退化
### 严重程度判定
- P0(严重):幻觉率 > 20% 或安全过滤失效
- P1(高):幻觉率 > 10% 或 CSAT < 60%
- P2(中):质量漂移 Z-score > 2.0 或 CSAT < 70%
### 即时响应(前 15 分钟)
1. [ ] 确认告警——检查 Grafana 仪表板确认指标异常
2. [ ] 判断影响范围——是全局还是特定功能/模型
3. [ ] 如果是 P0/P1:
- 启用降级模式(切换到已知稳定的模型版本)
- 通知值班团队和产品负责人
- 在 Slack #incidents 频道创建事件线程
### 诊断(15-60 分钟)
4. [ ] 检查模型提供商状态页面
5. [ ] 对比最近的变更:
- Prompt 版本是否有更新?
- 模型版本是否有变化?
- RAG 知识库是否有更新?
- 流量模式是否异常?
6. [ ] 抽样检查低质量输出:
- 从 Langfuse 导出最近 100 条低评分 trace
- 人工审查 10-20 条,识别共性问题
7. [ ] 运行离线评估对比:
- 用相同输入对比当前输出 vs 历史输出
### 修复
8. [ ] 根据根因选择修复策略:
- **Prompt 漂移** → 回滚到上一个稳定版本
- **模型更新** → 固定模型版本(如 gpt-4.1-2025-04-14)
- **RAG 污染** → 回滚知识库到上一个快照
- **流量异常** → 启用限流和降级
9. [ ] 验证修复效果——运行评估管线确认指标恢复
### 事后复盘
10. [ ] 撰写事后分析报告(Postmortem)
11. [ ] 更新监控规则和告警阈值
12. [ ] 添加回归测试用例
13. [ ] 更新本 Runbook步骤 2:自动化降级与恢复
# incident_response.py — 自动化降级与恢复
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
class IncidentSeverity(Enum):
P0 = "critical"
P1 = "high"
P2 = "medium"
P3 = "low"
class DegradationMode(Enum):
NORMAL = "normal"
FALLBACK_MODEL = "fallback_model" # 切换到备用模型
CACHED_ONLY = "cached_only" # 仅返回缓存
STATIC_RESPONSE = "static_response" # 返回预设回复
DISABLED = "disabled" # 完全禁用
class AutoIncidentResponder:
"""自动化事件响应器"""
def __init__(self):
self.current_mode = DegradationMode.NORMAL
self.incident_log = []
def assess_and_respond(self, metrics: dict) -> DegradationMode:
"""根据指标自动评估并响应"""
success_rate = metrics.get("success_rate", 1.0)
hallucination_rate = metrics.get("hallucination_rate", 0)
p99_latency = metrics.get("p99_latency", 0)
# P0: 严重故障——切换到静态回复
if success_rate < 0.90 or hallucination_rate > 0.20:
self._escalate(
IncidentSeverity.P0,
DegradationMode.STATIC_RESPONSE,
f"成功率={success_rate:.1%}, 幻觉率={hallucination_rate:.1%}"
)
return DegradationMode.STATIC_RESPONSE
# P1: 高风险——切换到备用模型
if success_rate < 0.95 or hallucination_rate > 0.10 or p99_latency > 30:
self._escalate(
IncidentSeverity.P1,
DegradationMode.FALLBACK_MODEL,
f"成功率={success_rate:.1%}, P99={p99_latency:.1f}s"
)
return DegradationMode.FALLBACK_MODEL
# P2: 中风险——优先使用缓存
if success_rate < 0.98 or p99_latency > 15:
self._escalate(
IncidentSeverity.P2,
DegradationMode.CACHED_ONLY,
f"成功率={success_rate:.1%}, P99={p99_latency:.1f}s"
)
return DegradationMode.CACHED_ONLY
# 正常
if self.current_mode != DegradationMode.NORMAL:
self._recover()
return DegradationMode.NORMAL
def _escalate(
self, severity: IncidentSeverity,
mode: DegradationMode, reason: str
):
"""升级事件"""
self.current_mode = mode
self.incident_log.append({
"timestamp": datetime.utcnow().isoformat(),
"severity": severity.value,
"mode": mode.value,
"reason": reason,
"action": "escalate"
})
print(f"🚨 事件升级: {severity.value} → {mode.value} | {reason}")
def _recover(self):
"""恢复正常模式"""
prev_mode = self.current_mode
self.current_mode = DegradationMode.NORMAL
self.incident_log.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "recover",
"from_mode": prev_mode.value
})
print(f"✅ 恢复正常模式(从 {prev_mode.value})")实战案例:AI 客服系统全链路质量监控
场景描述
一家 SaaS 公司运营着一个 AI 客服系统,日处理 50,000 次用户咨询。系统使用 GPT-4.1-mini 作为主模型,RAG 检索公司知识库,支持中英文双语。
监控体系搭建
# production_monitoring.py — AI 客服系统完整监控示例
import asyncio
from metrics import *
from feedback_collector import FeedbackCollector
from drift_detector import QualityDriftDetector, HallucinationRateMonitor
from evaluator import LLMJudge
from incident_response import AutoIncidentResponder
class AICustomerServiceMonitor:
"""AI 客服系统全链路监控"""
def __init__(self):
self.feedback = FeedbackCollector()
self.drift_detector = QualityDriftDetector(model="gpt-4.1-mini")
self.hallucination_monitor = HallucinationRateMonitor()
self.judge = LLMJudge(sample_rate=0.05)
self.incident_responder = AutoIncidentResponder()
async def handle_interaction(
self,
user_id: str,
query: str,
context: str
) -> dict:
"""处理一次客服交互(带完整监控)"""
import time
from openai import AsyncOpenAI
client = AsyncOpenAI()
start_time = time.time()
try:
# 1. 调用主模型
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": f"你是客服助手。参考资料:{context}"},
{"role": "user", "content": query}
],
stream=True
)
# 2. 流式响应 + 延迟追踪
chunks = []
first_token = False
async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
if not first_token:
ttft = time.time() - start_time
LLM_TTFT.labels(model="gpt-4.1-mini").observe(ttft)
first_token = True
chunks.append(chunk.choices[0].delta.content)
result = "".join(chunks)
duration = time.time() - start_time
# 3. 记录指标
LLM_REQUEST_DURATION.labels(
model="gpt-4.1-mini", endpoint="customer_service"
).observe(duration)
LLM_REQUESTS_TOTAL.labels(
model="gpt-4.1-mini",
endpoint="customer_service",
status="success",
error_type="none"
).inc()
# 4. 异步质量评估
asyncio.create_task(
self._async_quality_check(query, result, context)
)
return {
"response": result,
"trace_id": "trace_" + str(hash(query))[:8],
"latency": round(duration, 3)
}
except Exception as e:
LLM_REQUESTS_TOTAL.labels(
model="gpt-4.1-mini",
endpoint="customer_service",
status="error",
error_type=type(e).__name__
).inc()
# 降级处理
return {
"response": "抱歉,系统暂时繁忙,请稍后再试或联系人工客服。",
"is_fallback": True
}
async def _async_quality_check(
self, query: str, response: str, context: str
):
"""异步质量检查"""
eval_result = await self.judge.evaluate(
trace_id="",
query=query,
response=response,
context=context
)
if eval_result:
# 更新漂移检测器
self.drift_detector.add_score(eval_result.overall)
self.hallucination_monitor.record(eval_result.is_hallucination)
def get_health_report(self) -> dict:
"""生成健康报告"""
return {
"satisfaction": self.feedback.get_satisfaction_metrics(
feature="customer_service"
),
"hallucination": self.hallucination_monitor.get_rate(),
"drift": self.drift_detector._detect_drift("overall"),
"incident_mode": self.incident_responder.current_mode.value
}案例分析
这个案例展示了 AI 客服系统监控的四个关键层次:
- 基础层:API 成功率、延迟百分位、Token 用量——通过 Prometheus 指标实时采集
- 质量层:LLM-as-Judge 采样评估、幻觉检测——异步执行不影响主流程
- 用户层:👍/👎 反馈、CSAT 评分——与 Langfuse trace 关联
- 智能层:漂移检测、自动降级——基于统计检验和阈值规则
关键决策点:
- 采样率设为 5%(50,000 × 5% = 2,500 次/天评估),评估成本约 $5-10/天
- 使用 GPT-4.1-mini 作为 Judge(而非旗舰模型),平衡成本与准确性
- 降级策略分三级:备用模型 → 仅缓存 → 静态回复,确保服务不中断
避坑指南
❌ 常见错误
-
只监控可用性,忽略输出质量
- 问题:API 返回 200 不代表回答正确。AI 系统可能”成功地”返回了充满幻觉的回答,传统 APM 工具完全无法捕捉这种”静默退化”
- 正确做法:建立质量指标体系(幻觉率、相关性评分、一致性评分),使用 LLM-as-Judge 进行采样评估,将质量指标纳入 SLO
-
告警阈值设置不合理,导致告警疲劳
- 问题:阈值过低导致频繁误报,团队逐渐忽略告警;阈值过高导致真正的问题被遗漏。AI 系统的输出天然具有波动性,不能用传统软件的阈值思维
- 正确做法:基于历史数据的统计分布设置阈值(如 P95 + 2σ),使用滑动窗口而非瞬时值触发告警,分级告警(P0-P3)配合不同的通知渠道和响应时间
-
用平均值代替百分位数监控延迟
- 问题:LLM 延迟呈长尾分布,平均延迟 2 秒可能意味着 10% 的用户等待超过 10 秒。平均值掩盖了尾部用户的糟糕体验
- 正确做法:始终监控 P50/P95/P99 百分位延迟,分别设置 SLO。特别关注 TTFT(首 Token 时间),这是用户感知的”响应速度”
-
LLM-as-Judge 评估不做校准
- 问题:直接使用 LLM 评分而不与人类评估对齐,可能导致系统性偏差。不同的 Judge 模型、不同的 Prompt 会产生不同的评分分布
- 正确做法:定期用人类标注数据校准 Judge 模型,计算 Judge 与人类的一致率(目标 > 85%),使用结构化评分标准(rubric)而非开放式评估
-
漂移检测窗口设置不当
- 问题:检测窗口太小导致噪声触发误报,窗口太大导致漂移发现太晚。不同类型的漂移需要不同的检测灵敏度
- 正确做法:基线窗口 ≥ 1000 个样本,检测窗口 ≥ 100 个样本。对质量漂移使用较敏感的阈值(Z > 2.0),对嵌入漂移使用较宽松的阈值(余弦距离 > 0.05)
-
没有事件响应预案,出问题时手忙脚乱
- 问题:AI 系统的故障模式与传统软件不同(模型幻觉、Prompt 注入、质量退化),传统的事件响应流程无法覆盖这些场景
- 正确做法:为 AI 特有的故障模式编写专门的 Runbook,包含降级策略(备用模型、缓存、静态回复)、Prompt 版本回滚流程、知识库快照恢复流程
✅ 最佳实践
- 分层监控:基础层(可用性/延迟)→ 质量层(LLM-as-Judge)→ 用户层(反馈)→ 业务层(转化率/留存),每层都有独立的 SLI/SLO
- 异步评估:质量评估(LLM-as-Judge)必须异步执行,不能阻塞主请求路径。使用消息队列或 asyncio.create_task 解耦
- 成本即指标:将成本视为一等公民指标,与延迟和质量并列。成本异常往往是其他问题的先兆信号
- 反馈闭环:用户反馈 → 质量分析 → Prompt/模型优化 → 效果验证 → 更新基线,形成持续改进循环
- 版本化一切:Prompt 版本、模型版本、知识库版本都要可追溯、可回滚,这是 AI 系统事件响应的基础
相关资源与延伸阅读
- Prometheus 官方文档 — 指标采集与告警规则配置
- Grafana 仪表板模板库 — 社区共享的仪表板模板
- Langfuse 文档 - Scores & Evaluation — LLM 交互评分与在线评估
- Arize Phoenix GitHub — 开源 LLM 可观测性工具,支持嵌入漂移检测
- Evidently AI - 嵌入漂移检测方法 — 5 种嵌入漂移检测方法对比
- Google SRE Book - SLI/SLO — SLI/SLO 定义的权威参考
- LiteLLM 文档 — 统一 LLM API 网关,支持成本追踪和模型路由
- PagerDuty 事件响应指南 — 事件响应最佳实践
- Maxim AI - 幻觉检测工具 — LLM 幻觉检测实践指南
- AI Agent 可靠性工程 — SLO、评估、可观测性与 Guardrails 综合指南
参考来源
- AI Agent Reliability Engineering: SLOs, Evaluations, Observability, and Guardrails (2025)
- Building Reliable Agentic AI with SLOs, Escalation, and User Analytics (2025)
- How to Evaluate AI Agent Performance in Your Organization (2025)
- Top Metrics for LLM Failure Alerts (2025)
- LLM Monitoring & Observability: Quality Metrics and Drift Detection (2025)
- 5 Methods to Detect Drift in ML Embeddings (2025)
- LLM-as-a-Judge: Quality Assessment at Scale (2025)
- How to Monitor LLM Applications in Production (2025)
- Incident Response for AI Agents: Rollbacks, Abuse Handling, and Vendor Outage Playbooks (2025)
- AI Agent Incident Response: Containment and Recovery Playbook (2025)
- 7 Strategies to Solve LLM Reliability Challenges at Scale (2025)
- Top 5 Tools for Monitoring LLM Applications in 2025 (2025)
📖 返回 总览与导航 | 上一节:21e-成本优化策略 | 下一节:22a-AI安全概览