Skip to Content

21f - 生产告警与质量指标

本文是《AI Agent 实战手册》第 21 章第 6 节。 上一节:21e-成本优化策略 | 下一节:22a-AI安全概览 📖 返回 总览与导航

⏱ 阅读时间:90 分钟 | 难度:⭐⭐⭐⭐⭐ 高级 | 前置知识:LLM API 使用经验、Prometheus/Grafana 基础、Python/TypeScript 开发经验

概述

AI Agent 在生产环境中的表现远比开发阶段复杂——模型输出的非确定性、尾部延迟、速率限制、检索漂移和幻觉率波动都可能在用户无感知的情况下悄然恶化。传统 APM 工具只能监控”服务是否存活”,却无法回答”AI 回答的质量是否在下降”。本节将系统化地构建一套生产级 AI Agent 质量监控体系,覆盖五大核心指标(成功率、延迟百分位、每次交互成本、满意度评分、漂移检测),并提供完整的 Prometheus + Grafana 仪表板配置、告警规则、LLM-as-Judge 在线评估管线和 AI 系统事件响应手册。


1. AI 系统质量指标框架:SLI/SLO 定义

与传统软件一样,AI 系统也需要明确的服务水平指标(SLI)和服务水平目标(SLO)。但 AI 系统的 SLI 更复杂——除了可用性和延迟,还需要衡量输出质量、安全性和成本效率。

工具推荐

工具用途价格适用场景
Prometheus时序指标采集与告警免费(开源)基础设施和应用指标监控
Grafana指标可视化仪表板免费(开源)/ Cloud 起步 $0统一可视化
LangfuseLLM 可观测性 + 在线评估免费(自托管)质量追踪、成本分析
Arize PhoenixML/LLM 可观测性 + 漂移检测免费(开源)嵌入漂移、幻觉检测
Evidently AIML 监控 + 数据漂移检测免费(开源)嵌入漂移、数据质量
Maxim AILLM 评估 + 幻觉检测免费(基础版)多阶段质量评估
PagerDuty事件管理与告警路由起步 $21/用户/月告警升级与值班管理
Opsgenie告警管理起步 $9/用户/月中小团队告警管理

1.1 AI 系统 SLI/SLO 全景

传统 SRE 的 SLI/SLO 框架需要针对 AI 系统进行扩展。以下是一个完整的 AI Agent SLI/SLO 定义模板:

┌─────────────────────────────────────────────────────────────────┐ │ AI Agent SLI/SLO 框架 │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 可用性指标 │ │ 性能指标 │ │ 质量指标 │ │ │ │ │ │ │ │ │ │ │ │ • API 成功率 │ │ • P50 延迟 │ │ • 幻觉率 │ │ │ │ • 错误分类 │ │ • P95 延迟 │ │ • 相关性评分 │ │ │ │ • SLA 达标率 │ │ • P99 延迟 │ │ • 一致性评分 │ │ │ │ • 降级率 │ │ • TTFT │ │ • 安全性评分 │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 成本指标 │ │ 用户指标 │ │ │ │ │ │ │ │ │ │ • 每次交互成本│ │ • CSAT 评分 │ │ │ │ • 每用户成本 │ │ • 👍/👎 比率 │ │ │ │ • 预算使用率 │ │ • 升级率 │ │ │ │ • 缓存命中率 │ │ • 留存率 │ │ │ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────────────┘
SLI 类别SLI 指标SLO 目标(参考值)测量方法
可用性API 调用成功率≥ 99.5%成功请求数 / 总请求数
可用性错误率(5xx)≤ 0.5%5xx 响应数 / 总请求数
性能P50 端到端延迟≤ 2sPrometheus histogram
性能P95 端到端延迟≤ 5sPrometheus histogram
性能P99 端到端延迟≤ 10sPrometheus histogram
性能TTFT(首 Token 时间)≤ 500ms流式响应首字节时间
质量幻觉率≤ 5%LLM-as-Judge 采样评估
质量输出相关性评分≥ 4.0/5.0LLM-as-Judge 评分
成本每次交互平均成本≤ $0.05总 API 费用 / 总交互数
成本月度预算使用率≤ 90%当月花费 / 月度预算
用户CSAT 满意度评分≥ 80%用户反馈采集
用户👍/👎 正面比率≥ 85%用户即时反馈
漂移输出质量漂移 Z-score≤ 2.0滑动窗口统计检验
漂移嵌入漂移距离≤ 阈值余弦距离监控

操作步骤

步骤 1:定义 Prometheus 指标(Python 应用)

# metrics.py — AI Agent 核心 Prometheus 指标定义 from prometheus_client import ( Counter, Histogram, Gauge, Summary, Info, CollectorRegistry, generate_latest ) import time from functools import wraps # 创建指标注册表 REGISTRY = CollectorRegistry() # ========== 可用性指标 ========== LLM_REQUESTS_TOTAL = Counter( "llm_requests_total", "LLM API 请求总数", ["model", "endpoint", "status", "error_type"], registry=REGISTRY ) LLM_ERRORS_TOTAL = Counter( "llm_errors_total", "LLM API 错误总数", ["model", "error_type", "error_code"], registry=REGISTRY ) # ========== 性能指标 ========== LLM_REQUEST_DURATION = Histogram( "llm_request_duration_seconds", "LLM 请求端到端延迟(秒)", ["model", "endpoint"], buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0], registry=REGISTRY ) LLM_TTFT = Histogram( "llm_time_to_first_token_seconds", "首 Token 响应时间(秒)", ["model"], buckets=[0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0], registry=REGISTRY ) # ========== Token 与成本指标 ========== LLM_TOKENS_TOTAL = Counter( "llm_tokens_total", "Token 使用总量", ["model", "token_type"], # token_type: input, output, cached registry=REGISTRY ) LLM_COST_TOTAL = Counter( "llm_cost_dollars_total", "LLM API 成本(美元)", ["model", "feature", "user_tier"], registry=REGISTRY ) LLM_COST_PER_INTERACTION = Summary( "llm_cost_per_interaction_dollars", "每次交互成本(美元)", ["feature"], registry=REGISTRY ) # ========== 质量指标 ========== LLM_QUALITY_SCORE = Histogram( "llm_quality_score", "LLM 输出质量评分(0-5)", ["model", "evaluator", "dimension"], buckets=[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0], registry=REGISTRY ) LLM_HALLUCINATION_DETECTED = Counter( "llm_hallucination_detected_total", "检测到的幻觉次数", ["model", "severity"], # severity: low, medium, high registry=REGISTRY ) # ========== 用户满意度指标 ========== USER_FEEDBACK_TOTAL = Counter( "user_feedback_total", "用户反馈总数", ["feedback_type", "feature"], # feedback_type: thumbs_up, thumbs_down, rating registry=REGISTRY ) USER_SATISFACTION_SCORE = Histogram( "user_satisfaction_score", "用户满意度评分(1-5)", ["feature"], buckets=[1, 2, 3, 4, 5], registry=REGISTRY ) # ========== 漂移指标 ========== LLM_DRIFT_SCORE = Gauge( "llm_drift_score", "输出漂移评分(Z-score)", ["model", "drift_type"], # drift_type: quality, topic, embedding registry=REGISTRY ) EMBEDDING_DRIFT_DISTANCE = Gauge( "embedding_drift_distance", "嵌入漂移距离", ["model", "metric"], # metric: cosine, euclidean registry=REGISTRY ) # ========== 缓存指标 ========== CACHE_HITS_TOTAL = Counter( "llm_cache_hits_total", "缓存命中次数", ["cache_type"], # cache_type: semantic, prompt, exact registry=REGISTRY ) CACHE_MISSES_TOTAL = Counter( "llm_cache_misses_total", "缓存未命中次数", ["cache_type"], registry=REGISTRY )

步骤 2:构建指标采集中间件

# middleware.py — LLM 调用指标采集中间件 import time import traceback from typing import Optional from metrics import * class LLMMetricsMiddleware: """LLM 调用指标采集中间件——包装任意 LLM 客户端""" def __init__(self, feature: str = "default", user_tier: str = "free"): self.feature = feature self.user_tier = user_tier async def call_with_metrics( self, llm_func, model: str, messages: list, **kwargs ) -> dict: """包装 LLM 调用,自动采集所有指标""" start_time = time.time() ttft_recorded = False status = "success" error_type = "none" try: # 执行 LLM 调用 response = await llm_func( model=model, messages=messages, **kwargs ) # 记录 Token 用量 usage = response.usage LLM_TOKENS_TOTAL.labels( model=model, token_type="input" ).inc(usage.prompt_tokens) LLM_TOKENS_TOTAL.labels( model=model, token_type="output" ).inc(usage.completion_tokens) # 计算并记录成本 cost = self._calculate_cost( model, usage.prompt_tokens, usage.completion_tokens ) LLM_COST_TOTAL.labels( model=model, feature=self.feature, user_tier=self.user_tier ).inc(cost) LLM_COST_PER_INTERACTION.labels( feature=self.feature ).observe(cost) return response except Exception as e: status = "error" error_type = type(e).__name__ # 错误分类 error_code = getattr(e, "status_code", 0) LLM_ERRORS_TOTAL.labels( model=model, error_type=error_type, error_code=str(error_code) ).inc() raise finally: # 记录延迟 duration = time.time() - start_time LLM_REQUEST_DURATION.labels( model=model, endpoint=self.feature ).observe(duration) # 记录请求计数 LLM_REQUESTS_TOTAL.labels( model=model, endpoint=self.feature, status=status, error_type=error_type ).inc() def _calculate_cost( self, model: str, input_tokens: int, output_tokens: int ) -> float: """根据模型计算成本""" PRICING = { "gpt-4.1": {"input": 2.0, "output": 8.0}, "gpt-4.1-mini": {"input": 0.4, "output": 1.6}, "gpt-4.1-nano": {"input": 0.1, "output": 0.4}, "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0}, "claude-haiku-3.5": {"input": 0.8, "output": 4.0}, "gemini-2.5-flash": {"input": 0.15, "output": 0.6}, } prices = PRICING.get(model, {"input": 1.0, "output": 3.0}) return ( input_tokens * prices["input"] / 1_000_000 + output_tokens * prices["output"] / 1_000_000 )

提示词模板

你是一个 SRE 工程师,负责为 AI Agent 系统定义 SLI/SLO。请根据以下系统信息生成完整的 SLI/SLO 文档: ## 系统信息 - 系统名称:[系统名称] - 主要功能:[功能描述,如 AI 客服、代码助手、数据分析] - 日均请求量:[数量] - 使用的模型:[模型列表] - 用户群体:[内部/外部,付费/免费] - 关键业务影响:[如果系统不可用或质量下降,会造成什么影响] ## 请输出 1. 按类别(可用性/性能/质量/成本/用户)定义 SLI 指标 2. 为每个 SLI 设定合理的 SLO 目标值 3. 定义错误预算(Error Budget)和消耗速率告警 4. 建议的测量方法和数据源 5. SLO 违规时的升级流程

2. 成功率监控:API 调用成功/失败率

成功率是最基础也是最关键的指标。AI 系统的”失败”不仅包括 HTTP 错误,还包括模型拒绝回答、输出格式错误、超时等”软失败”。

2.1 错误分类体系

┌─────────────────────────────────────────────────────────────┐ │ AI 系统错误分类 │ │ │ │ 硬错误(Hard Errors) 软错误(Soft Errors) │ │ ├── 4xx 客户端错误 ├── 模型拒绝回答(refusal) │ │ │ ├── 400 请求格式错误 ├── 输出格式不符合预期 │ │ │ ├── 401 认证失败 ├── 输出内容为空或过短 │ │ │ ├── 429 速率限制 ├── 幻觉/事实错误 │ │ │ └── 413 上下文超限 ├── 安全过滤触发 │ │ ├── 5xx 服务端错误 ├── 工具调用失败 │ │ │ ├── 500 内部错误 └── 质量评分低于阈值 │ │ │ ├── 502 网关错误 │ │ │ └── 503 服务不可用 降级事件(Degradation) │ │ └── 超时错误 ├── 降级到备用模型 │ │ ├── 连接超时 ├── 返回缓存响应 │ │ └── 读取超时 └── 返回预设兜底回复 │ └─────────────────────────────────────────────────────────────┘

操作步骤

步骤 1:实现错误分类与追踪

# error_classifier.py — AI 系统错误分类器 from enum import Enum from dataclasses import dataclass from typing import Optional from metrics import LLM_REQUESTS_TOTAL, LLM_ERRORS_TOTAL class ErrorCategory(Enum): # 硬错误 RATE_LIMIT = "rate_limit" # 429 AUTH_FAILURE = "auth_failure" # 401/403 CONTEXT_OVERFLOW = "context_overflow" # 413/400 SERVER_ERROR = "server_error" # 5xx TIMEOUT = "timeout" # 超时 # 软错误 REFUSAL = "refusal" # 模型拒绝回答 EMPTY_RESPONSE = "empty_response" # 空响应 FORMAT_ERROR = "format_error" # 输出格式错误 SAFETY_FILTER = "safety_filter" # 安全过滤 TOOL_FAILURE = "tool_failure" # 工具调用失败 LOW_QUALITY = "low_quality" # 质量评分低 # 降级 MODEL_FALLBACK = "model_fallback" # 降级到备用模型 CACHE_FALLBACK = "cache_fallback" # 返回缓存 DEFAULT_RESPONSE = "default_response" # 兜底回复 @dataclass class ErrorEvent: category: ErrorCategory model: str message: str status_code: Optional[int] = None is_retriable: bool = False severity: str = "medium" # low, medium, high, critical class AIErrorClassifier: """AI 系统错误分类器""" def classify(self, exception: Exception, response=None) -> ErrorEvent: """根据异常或响应分类错误""" # HTTP 错误分类 status_code = getattr(exception, "status_code", None) if status_code: if status_code == 429: return ErrorEvent( category=ErrorCategory.RATE_LIMIT, model="unknown", message=str(exception), status_code=429, is_retriable=True, severity="medium" ) elif status_code in (401, 403): return ErrorEvent( category=ErrorCategory.AUTH_FAILURE, model="unknown", message="认证失败", status_code=status_code, severity="critical" ) elif status_code >= 500: return ErrorEvent( category=ErrorCategory.SERVER_ERROR, model="unknown", message=str(exception), status_code=status_code, is_retriable=True, severity="high" ) # 超时错误 if "timeout" in str(exception).lower(): return ErrorEvent( category=ErrorCategory.TIMEOUT, model="unknown", message="请求超时", is_retriable=True, severity="medium" ) # 软错误分类(基于响应内容) if response: return self._classify_soft_error(response) return ErrorEvent( category=ErrorCategory.SERVER_ERROR, model="unknown", message=str(exception), severity="high" ) def _classify_soft_error(self, response) -> Optional[ErrorEvent]: """分类软错误""" content = response.choices[0].message.content if response.choices else "" model = response.model # 空响应 if not content or len(content.strip()) < 10: return ErrorEvent( category=ErrorCategory.EMPTY_RESPONSE, model=model, message="响应内容为空或过短", severity="medium" ) # 模型拒绝 refusal_patterns = [ "I cannot", "I'm unable to", "I apologize", "我无法", "抱歉,我不能", "作为 AI" ] if any(p in content[:200] for p in refusal_patterns): return ErrorEvent( category=ErrorCategory.REFUSAL, model=model, message="模型拒绝回答", severity="low" ) return None # 无软错误 def record_error(self, event: ErrorEvent): """记录错误到 Prometheus""" LLM_ERRORS_TOTAL.labels( model=event.model, error_type=event.category.value, error_code=str(event.status_code or 0) ).inc()

步骤 2:SLA 达标率追踪

# sla_tracker.py — SLA 达标率实时追踪 import time from collections import defaultdict from dataclasses import dataclass, field @dataclass class SLAWindow: """滑动窗口 SLA 追踪""" window_seconds: int = 3600 # 1 小时窗口 events: list = field(default_factory=list) def record(self, success: bool, latency: float): now = time.time() self.events.append({ "timestamp": now, "success": success, "latency": latency }) # 清理过期事件 cutoff = now - self.window_seconds self.events = [e for e in self.events if e["timestamp"] > cutoff] def get_metrics(self) -> dict: if not self.events: return {"success_rate": 1.0, "p50": 0, "p95": 0, "p99": 0} successes = sum(1 for e in self.events if e["success"]) latencies = sorted(e["latency"] for e in self.events) n = len(latencies) return { "success_rate": successes / len(self.events), "total_requests": len(self.events), "p50_latency": latencies[int(n * 0.5)] if n > 0 else 0, "p95_latency": latencies[int(n * 0.95)] if n > 0 else 0, "p99_latency": latencies[int(n * 0.99)] if n > 0 else 0, "error_budget_remaining": max( 0, 0.005 - (1 - successes / len(self.events)) ) / 0.005 * 100 # 基于 99.5% SLO } class SLADashboard: """多维度 SLA 仪表板""" def __init__(self): self.windows = defaultdict(lambda: SLAWindow()) def record(self, dimension: str, success: bool, latency: float): """按维度记录(如 model、feature、user_tier)""" self.windows[dimension].record(success, latency) self.windows["global"].record(success, latency) def get_dashboard(self) -> dict: return { dim: window.get_metrics() for dim, window in self.windows.items() }

3. 延迟百分位监控:P50/P95/P99 与 TTFT

LLM 应用的延迟分布通常呈长尾特征——P50 可能只有 1 秒,但 P99 可能高达 15 秒。仅关注平均延迟会掩盖大量用户的糟糕体验。

3.1 延迟指标分解

┌─────────────────────────────────────────────────────────────┐ │ LLM 请求延迟分解 │ │ │ │ ┌──────┐ ┌──────┐ ┌──────────┐ ┌──────┐ ┌──────┐ │ │ │网络 │→│排队 │→│模型推理 │→│流式 │→│后处理 │ │ │ │延迟 │ │等待 │ │(TTFT) │ │传输 │ │ │ │ │ └──────┘ └──────┘ └──────────┘ └──────┘ └──────┘ │ │ ~50ms ~0-5s ~200ms-2s ~1-10s ~10-100ms │ │ │ │ TTFT = 网络 + 排队 + 首 Token 生成 │ │ E2E = TTFT + 流式传输 + 后处理 │ │ │ │ 关键指标: │ │ • TTFT(Time to First Token):用户感知的"开始响应"时间 │ │ • TBT(Time Between Tokens):流式输出的 Token 间隔 │ │ • E2E(End-to-End):完整请求的总耗时 │ └─────────────────────────────────────────────────────────────┘

操作步骤

步骤 1:流式响应延迟采集

# latency_tracker.py — 流式响应延迟精确采集 import time import asyncio from dataclasses import dataclass, field from typing import AsyncIterator from metrics import LLM_TTFT, LLM_REQUEST_DURATION @dataclass class LatencyBreakdown: """延迟分解记录""" start_time: float = 0 first_token_time: float = 0 last_token_time: float = 0 end_time: float = 0 token_count: int = 0 token_timestamps: list = field(default_factory=list) @property def ttft(self) -> float: """首 Token 时间""" if self.first_token_time and self.start_time: return self.first_token_time - self.start_time return 0 @property def e2e(self) -> float: """端到端延迟""" if self.end_time and self.start_time: return self.end_time - self.start_time return 0 @property def avg_tbt(self) -> float: """平均 Token 间隔""" if len(self.token_timestamps) < 2: return 0 intervals = [ self.token_timestamps[i] - self.token_timestamps[i-1] for i in range(1, len(self.token_timestamps)) ] return sum(intervals) / len(intervals) @property def tokens_per_second(self) -> float: """Token 生成速率""" duration = self.last_token_time - self.first_token_time if duration > 0 and self.token_count > 1: return (self.token_count - 1) / duration return 0 async def track_streaming_latency( stream: AsyncIterator, model: str ) -> tuple[str, LatencyBreakdown]: """追踪流式响应的详细延迟""" breakdown = LatencyBreakdown(start_time=time.time()) chunks = [] async for chunk in stream: now = time.time() if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content chunks.append(content) breakdown.token_count += 1 breakdown.token_timestamps.append(now) if breakdown.token_count == 1: breakdown.first_token_time = now # 记录 TTFT 到 Prometheus LLM_TTFT.labels(model=model).observe( breakdown.ttft ) breakdown.last_token_time = now breakdown.end_time = time.time() # 记录 E2E 延迟到 Prometheus LLM_REQUEST_DURATION.labels( model=model, endpoint="streaming" ).observe(breakdown.e2e) full_response = "".join(chunks) return full_response, breakdown # 使用示例 async def monitored_streaming_call(client, model: str, messages: list): """带延迟监控的流式调用""" stream = await client.chat.completions.create( model=model, messages=messages, stream=True ) response, latency = await track_streaming_latency(stream, model) print(f"TTFT: {latency.ttft:.3f}s") print(f"E2E: {latency.e2e:.3f}s") print(f"TPS: {latency.tokens_per_second:.1f} tokens/s") print(f"Avg TBT: {latency.avg_tbt*1000:.1f}ms") return response, latency

步骤 2:延迟百分位 Prometheus 查询

以下是用于 Grafana 仪表板的关键 PromQL 查询:

# P50 延迟(中位数) histogram_quantile(0.50, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model) ) # P95 延迟 histogram_quantile(0.95, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model) ) # P99 延迟 histogram_quantile(0.99, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, model) ) # TTFT P95 histogram_quantile(0.95, sum(rate(llm_time_to_first_token_seconds_bucket[5m])) by (le, model) ) # 延迟 SLO 违规率(P95 > 5s 的请求比例) 1 - ( sum(rate(llm_request_duration_seconds_bucket{le="5.0"}[5m])) / sum(rate(llm_request_duration_seconds_count[5m])) ) # 按模型的请求速率 sum(rate(llm_request_duration_seconds_count[5m])) by (model)

4. 每次交互成本追踪

成本监控不仅是财务需求,更是产品健康度的关键信号。成本突然飙升可能意味着 Prompt 膨胀、缓存失效或模型路由异常。

4.1 多维度成本追踪模型

┌─────────────────────────────────────────────────────────────┐ │ 成本追踪维度 │ │ │ │ 按用户维度 按功能维度 按模型维度 │ │ ├── 每用户日成本 ├── 客服模块成本 ├── GPT-4.1 成本 │ │ ├── 每用户月成本 ├── 搜索模块成本 ├── Claude 成本 │ │ ├── 付费用户成本 ├── 代码助手成本 ├── Gemini 成本 │ │ └── 免费用户成本 └── 数据分析成本 └── 缓存节省额 │ │ │ │ 关键比率: │ │ • 每次交互成本 = 总 API 费用 / 总交互数 │ │ • 每用户成本 = 总 API 费用 / 活跃用户数 │ │ • 成本效率比 = 缓存节省额 / 总 API 费用 │ │ • 预算消耗速率 = 当日花费 / (月预算 / 30) │ └─────────────────────────────────────────────────────────────┘

操作步骤

步骤 1:实时成本追踪系统

# cost_tracker.py — 多维度实时成本追踪 import time from collections import defaultdict from datetime import datetime, timedelta from metrics import LLM_COST_TOTAL, LLM_COST_PER_INTERACTION class CostTracker: """多维度成本追踪器""" def __init__(self, monthly_budget: float = 5000.0): self.monthly_budget = monthly_budget self.daily_costs = defaultdict(float) self.feature_costs = defaultdict(float) self.user_costs = defaultdict(float) self.model_costs = defaultdict(float) self.interaction_count = 0 def record( self, cost: float, model: str, feature: str, user_id: str, user_tier: str = "free" ): """记录一次交互的成本""" today = datetime.utcnow().strftime("%Y-%m-%d") self.daily_costs[today] += cost self.feature_costs[feature] += cost self.user_costs[user_id] += cost self.model_costs[model] += cost self.interaction_count += 1 # 记录到 Prometheus LLM_COST_TOTAL.labels( model=model, feature=feature, user_tier=user_tier ).inc(cost) LLM_COST_PER_INTERACTION.labels(feature=feature).observe(cost) def get_budget_status(self) -> dict: """获取预算状态""" month_key = datetime.utcnow().strftime("%Y-%m") month_total = sum( v for k, v in self.daily_costs.items() if k.startswith(month_key) ) days_in_month = 30 days_elapsed = datetime.utcnow().day daily_avg = month_total / max(days_elapsed, 1) projected_monthly = daily_avg * days_in_month return { "month_total": round(month_total, 2), "monthly_budget": self.monthly_budget, "usage_percent": round(month_total / self.monthly_budget * 100, 1), "daily_average": round(daily_avg, 2), "projected_monthly": round(projected_monthly, 2), "projected_over_budget": projected_monthly > self.monthly_budget, "avg_cost_per_interaction": round( month_total / max(self.interaction_count, 1), 4 ), "top_features": dict( sorted(self.feature_costs.items(), key=lambda x: x[1], reverse=True)[:5] ), "top_models": dict( sorted(self.model_costs.items(), key=lambda x: x[1], reverse=True)[:5] ) }

步骤 2:成本告警 PromQL 规则

# 每次交互平均成本(5 分钟窗口) sum(rate(llm_cost_dollars_total[5m])) / sum(rate(llm_requests_total{status="success"}[5m])) # 按功能的成本分布 sum(rate(llm_cost_dollars_total[1h])) by (feature) * 3600 # 月度预算消耗速率(日均 × 30 > 预算则告警) sum(increase(llm_cost_dollars_total[24h])) * 30 # 成本异常检测(当前小时成本 > 过去 7 天同时段均值的 2 倍) sum(increase(llm_cost_dollars_total[1h])) > 2 * avg_over_time( sum(increase(llm_cost_dollars_total[1h]))[7d:1h] ) # 缓存节省率 sum(rate(llm_cache_hits_total[1h])) / (sum(rate(llm_cache_hits_total[1h])) + sum(rate(llm_cache_misses_total[1h])))

5. 用户满意度评分

用户满意度是 AI 系统质量的终极衡量标准。技术指标再好,如果用户不满意,系统就是失败的。AI 交互的满意度采集需要结合即时反馈(👍/👎)、结构化评分(CSAT)和长期忠诚度(NPS)。

工具推荐

工具用途价格适用场景
Langfuse ScoresLLM 交互评分采集免费(自托管)与 trace 关联的反馈
Hotjar用户行为分析 + 反馈免费(基础版)Web 应用反馈采集
DelightedNPS/CSAT 调查起步 $224/月专业满意度调查
自建方案自定义反馈系统免费完全控制

操作步骤

步骤 1:构建多层反馈采集系统

# feedback_collector.py — 多层用户反馈采集 from dataclasses import dataclass from datetime import datetime from enum import Enum from typing import Optional from metrics import USER_FEEDBACK_TOTAL, USER_SATISFACTION_SCORE class FeedbackType(Enum): THUMBS = "thumbs" # 👍/👎 即时反馈 RATING = "rating" # 1-5 星评分 CSAT = "csat" # 客户满意度调查 NPS = "nps" # 净推荐值 TEXT = "text" # 文本反馈 @dataclass class FeedbackEvent: trace_id: str # 关联的 LLM trace ID user_id: str feedback_type: FeedbackType value: float # 标准化为 0-1 范围 raw_value: str # 原始值 feature: str comment: Optional[str] = None timestamp: datetime = None def __post_init__(self): if self.timestamp is None: self.timestamp = datetime.utcnow() class FeedbackCollector: """多层反馈采集器""" def __init__(self): self.feedback_store = [] # 生产环境替换为数据库 def record_thumbs( self, trace_id: str, user_id: str, is_positive: bool, feature: str ): """记录 👍/👎 反馈""" feedback = FeedbackEvent( trace_id=trace_id, user_id=user_id, feedback_type=FeedbackType.THUMBS, value=1.0 if is_positive else 0.0, raw_value="thumbs_up" if is_positive else "thumbs_down", feature=feature ) self._store_and_record(feedback) def record_rating( self, trace_id: str, user_id: str, rating: int, feature: str, comment: str = None ): """记录 1-5 星评分""" feedback = FeedbackEvent( trace_id=trace_id, user_id=user_id, feedback_type=FeedbackType.RATING, value=rating / 5.0, raw_value=str(rating), feature=feature, comment=comment ) self._store_and_record(feedback) USER_SATISFACTION_SCORE.labels(feature=feature).observe(rating) def record_csat( self, user_id: str, score: int, feature: str, trace_id: str = "" ): """记录 CSAT 评分(1-5)""" feedback = FeedbackEvent( trace_id=trace_id, user_id=user_id, feedback_type=FeedbackType.CSAT, value=score / 5.0, raw_value=str(score), feature=feature ) self._store_and_record(feedback) def record_nps( self, user_id: str, score: int, feature: str ): """记录 NPS 评分(0-10)""" # NPS 分类:0-6 贬损者,7-8 被动者,9-10 推荐者 category = ( "promoter" if score >= 9 else "passive" if score >= 7 else "detractor" ) feedback = FeedbackEvent( trace_id="", user_id=user_id, feedback_type=FeedbackType.NPS, value=score / 10.0, raw_value=f"{score}_{category}", feature=feature ) self._store_and_record(feedback) def _store_and_record(self, feedback: FeedbackEvent): """存储反馈并记录 Prometheus 指标""" self.feedback_store.append(feedback) USER_FEEDBACK_TOTAL.labels( feedback_type=feedback.raw_value, feature=feedback.feature ).inc() def get_satisfaction_metrics(self, feature: str = None) -> dict: """计算满意度指标""" feedbacks = self.feedback_store if feature: feedbacks = [f for f in feedbacks if f.feature == feature] if not feedbacks: return {"no_data": True} # 👍/👎 比率 thumbs = [f for f in feedbacks if f.feedback_type == FeedbackType.THUMBS] thumbs_up_rate = ( sum(1 for f in thumbs if f.value == 1.0) / len(thumbs) if thumbs else None ) # CSAT 评分 csat_scores = [ f.value * 5 for f in feedbacks if f.feedback_type == FeedbackType.CSAT ] avg_csat = sum(csat_scores) / len(csat_scores) if csat_scores else None csat_percent = ( sum(1 for s in csat_scores if s >= 4) / len(csat_scores) * 100 if csat_scores else None ) # NPS 计算 nps_scores = [ f.value * 10 for f in feedbacks if f.feedback_type == FeedbackType.NPS ] if nps_scores: promoters = sum(1 for s in nps_scores if s >= 9) / len(nps_scores) detractors = sum(1 for s in nps_scores if s <= 6) / len(nps_scores) nps = round((promoters - detractors) * 100) else: nps = None return { "thumbs_up_rate": round(thumbs_up_rate * 100, 1) if thumbs_up_rate else None, "avg_csat": round(avg_csat, 2) if avg_csat else None, "csat_satisfied_percent": round(csat_percent, 1) if csat_percent else None, "nps": nps, "total_feedbacks": len(feedbacks) }

步骤 2:反馈与 LLM Trace 关联(Langfuse 集成)

# langfuse_feedback.py — 将用户反馈关联到 Langfuse trace from langfuse import Langfuse langfuse = Langfuse() def record_feedback_to_langfuse( trace_id: str, feedback_type: str, value: float, comment: str = None ): """将反馈评分关联到 Langfuse trace""" langfuse.score( trace_id=trace_id, name=feedback_type, # "user_thumbs", "user_rating", "csat" value=value, comment=comment ) # 在 API 端点中使用 # POST /api/feedback async def handle_feedback(request): data = request.json() # 记录到 Langfuse(关联 trace) record_feedback_to_langfuse( trace_id=data["trace_id"], feedback_type=data["type"], value=data["value"], comment=data.get("comment") ) # 记录到 Prometheus(实时告警) collector = FeedbackCollector() if data["type"] == "thumbs": collector.record_thumbs( trace_id=data["trace_id"], user_id=data["user_id"], is_positive=data["value"] > 0, feature=data["feature"] ) return {"status": "ok"}

提示词模板

你是一个用户体验分析师。请分析以下 AI 系统的用户反馈数据,生成改进建议: ## 反馈数据摘要 - 时间范围:[起始日期] 至 [结束日期] - 总交互数:[数量] - 👍/👎 比率:[正面比率]% - CSAT 平均分:[分数]/5.0 - NPS 评分:[分数] - 最常见的负面反馈关键词:[关键词列表] - 负面反馈最多的功能模块:[模块名称] ## 典型负面反馈示例 1. "[反馈内容1]" — 功能:[模块],评分:[分数] 2. "[反馈内容2]" — 功能:[模块],评分:[分数] 3. "[反馈内容3]" — 功能:[模块],评分:[分数] ## 请输出 1. 负面反馈根因分析(按严重程度排序) 2. 每个根因的具体改进建议 3. 优先级排序(影响面 × 严重程度) 4. 预期改进效果(CSAT 提升预估)

6. 漂移检测:输出质量漂移、主题漂移与嵌入漂移

AI 系统的一个独特挑战是”静默退化”——模型提供商的 API 更新、数据分布变化或 Prompt 微调都可能导致输出质量悄然下降,而传统监控无法捕捉这种变化。漂移检测是发现这类问题的关键手段。

6.1 漂移类型全景

┌─────────────────────────────────────────────────────────────┐ │ AI 系统漂移类型 │ │ │ │ 输出质量漂移 主题漂移 │ │ ├── 质量评分下降趋势 ├── 输出主题偏离预期范围 │ │ ├── 幻觉率上升 ├── 回答风格突变 │ │ ├── 格式合规率下降 └── 拒绝率异常变化 │ │ └── 一致性评分波动 │ │ │ │ 嵌入漂移 数据漂移 │ │ ├── 输入嵌入分布偏移 ├── 用户查询模式变化 │ │ ├── 输出嵌入分布偏移 ├── 新主题/新领域出现 │ │ └── 检索相关性下降 └── 季节性/事件性变化 │ └─────────────────────────────────────────────────────────────┘

工具推荐

工具用途价格适用场景
Evidently AI数据/嵌入漂移检测免费(开源)统计检验驱动的漂移监控
Arize PhoenixLLM 可观测性 + 漂移免费(开源)嵌入可视化、漂移检测
WhyLabsML 监控平台免费(基础版)自动漂移检测与告警
GalileoLLM 质量监控联系销售幻觉检测、一致性监控

操作步骤

步骤 1:输出质量漂移检测

# drift_detector.py — 输出质量漂移检测 import numpy as np from collections import deque from datetime import datetime, timedelta from typing import Optional from metrics import LLM_DRIFT_SCORE class QualityDriftDetector: """基于滑动窗口的输出质量漂移检测器""" def __init__( self, baseline_window: int = 1000, # 基线窗口大小 detection_window: int = 100, # 检测窗口大小 z_threshold: float = 2.0, # Z-score 告警阈值 model: str = "default" ): self.baseline_scores = deque(maxlen=baseline_window) self.recent_scores = deque(maxlen=detection_window) self.z_threshold = z_threshold self.model = model self.drift_history = [] def add_score(self, score: float, dimension: str = "overall"): """添加一个质量评分""" self.baseline_scores.append(score) self.recent_scores.append(score) # 检测漂移 drift_result = self._detect_drift(dimension) if drift_result: self.drift_history.append({ "timestamp": datetime.utcnow().isoformat(), "dimension": dimension, **drift_result }) return drift_result def _detect_drift(self, dimension: str) -> Optional[dict]: """使用 Z-score 检测漂移""" if len(self.baseline_scores) < 100 or len(self.recent_scores) < 20: return None baseline_mean = np.mean(list(self.baseline_scores)) baseline_std = np.std(list(self.baseline_scores)) recent_mean = np.mean(list(self.recent_scores)) if baseline_std == 0: return None z_score = (recent_mean - baseline_mean) / ( baseline_std / np.sqrt(len(self.recent_scores)) ) # 更新 Prometheus 指标 LLM_DRIFT_SCORE.labels( model=self.model, drift_type=f"quality_{dimension}" ).set(abs(z_score)) is_drifting = abs(z_score) > self.z_threshold return { "z_score": round(z_score, 3), "baseline_mean": round(baseline_mean, 3), "recent_mean": round(recent_mean, 3), "is_drifting": is_drifting, "direction": "degrading" if z_score < 0 else "improving", "severity": ( "critical" if abs(z_score) > 3.0 else "warning" if abs(z_score) > 2.0 else "normal" ) } class HallucinationRateMonitor: """幻觉率监控器""" def __init__(self, window_size: int = 500): self.results = deque(maxlen=window_size) self.hourly_rates = {} def record(self, is_hallucination: bool, severity: str = "medium"): """记录一次幻觉检测结果""" self.results.append({ "is_hallucination": is_hallucination, "severity": severity, "timestamp": datetime.utcnow() }) if is_hallucination: from metrics import LLM_HALLUCINATION_DETECTED LLM_HALLUCINATION_DETECTED.labels( model="default", severity=severity ).inc() def get_rate(self) -> dict: """获取当前幻觉率""" if not self.results: return {"rate": 0, "count": 0, "total": 0} hallucinations = sum( 1 for r in self.results if r["is_hallucination"] ) total = len(self.results) return { "rate": round(hallucinations / total * 100, 2), "count": hallucinations, "total": total, "by_severity": { sev: sum( 1 for r in self.results if r["is_hallucination"] and r["severity"] == sev ) for sev in ["low", "medium", "high"] } }

步骤 2:嵌入漂移检测

# embedding_drift.py — 嵌入空间漂移检测 import numpy as np from scipy import stats from typing import List from metrics import EMBEDDING_DRIFT_DISTANCE class EmbeddingDriftDetector: """基于嵌入向量的漂移检测器""" def __init__(self, reference_embeddings: np.ndarray = None): self.reference = reference_embeddings # 基线嵌入集合 self.current_window = [] def set_reference(self, embeddings: List[List[float]]): """设置基线嵌入(通常来自验证集或首周生产数据)""" self.reference = np.array(embeddings) def add_embedding(self, embedding: List[float]): """添加新的嵌入向量""" self.current_window.append(embedding) def detect_drift(self, method: str = "cosine_centroid") -> dict: """检测嵌入漂移""" if self.reference is None or len(self.current_window) < 50: return {"status": "insufficient_data"} current = np.array(self.current_window[-500:]) # 最近 500 个 if method == "cosine_centroid": return self._cosine_centroid_drift(current) elif method == "mmd": return self._mmd_drift(current) elif method == "ks_test": return self._ks_test_drift(current) return {"status": "unknown_method"} def _cosine_centroid_drift(self, current: np.ndarray) -> dict: """基于质心余弦距离的漂移检测""" ref_centroid = np.mean(self.reference, axis=0) cur_centroid = np.mean(current, axis=0) # 余弦相似度 similarity = np.dot(ref_centroid, cur_centroid) / ( np.linalg.norm(ref_centroid) * np.linalg.norm(cur_centroid) ) distance = 1 - similarity EMBEDDING_DRIFT_DISTANCE.labels( model="default", metric="cosine" ).set(distance) return { "method": "cosine_centroid", "distance": round(float(distance), 6), "similarity": round(float(similarity), 6), "is_drifting": distance > 0.05, # 阈值可调 "severity": ( "critical" if distance > 0.1 else "warning" if distance > 0.05 else "normal" ) } def _mmd_drift(self, current: np.ndarray) -> dict: """最大均值差异(MMD)漂移检测""" # 简化的 MMD 计算 n_ref = min(len(self.reference), 500) n_cur = min(len(current), 500) ref_sample = self.reference[ np.random.choice(len(self.reference), n_ref, replace=False) ] cur_sample = current[ np.random.choice(len(current), n_cur, replace=False) ] # RBF 核 MMD def rbf_kernel(X, Y, sigma=1.0): dists = np.sum((X[:, None] - Y[None, :]) ** 2, axis=2) return np.exp(-dists / (2 * sigma ** 2)) K_xx = rbf_kernel(ref_sample, ref_sample) K_yy = rbf_kernel(cur_sample, cur_sample) K_xy = rbf_kernel(ref_sample, cur_sample) mmd = ( np.mean(K_xx) + np.mean(K_yy) - 2 * np.mean(K_xy) ) return { "method": "mmd", "mmd_value": round(float(mmd), 6), "is_drifting": mmd > 0.01, "severity": ( "critical" if mmd > 0.05 else "warning" if mmd > 0.01 else "normal" ) } def _ks_test_drift(self, current: np.ndarray) -> dict: """Kolmogorov-Smirnov 检验(逐维度)""" n_dims = min(self.reference.shape[1], 50) # 取前 50 维 p_values = [] for dim in range(n_dims): stat, p_value = stats.ks_2samp( self.reference[:, dim], current[:, dim] ) p_values.append(p_value) # Bonferroni 校正 min_p = min(p_values) * n_dims drifting_dims = sum(1 for p in p_values if p < 0.05 / n_dims) return { "method": "ks_test", "min_p_value": round(float(min_p), 6), "drifting_dimensions": drifting_dims, "total_dimensions": n_dims, "drift_ratio": round(drifting_dims / n_dims, 3), "is_drifting": drifting_dims / n_dims > 0.1, "severity": ( "critical" if drifting_dims / n_dims > 0.3 else "warning" if drifting_dims / n_dims > 0.1 else "normal" ) }

7. 告警体系搭建:Prometheus + Grafana + PagerDuty/Slack

有了指标,还需要一套完整的告警体系来确保问题被及时发现和处理。

7.1 告警分级策略

级别名称响应时间通知方式示例
P0严重5 分钟PagerDuty 电话 + Slack成功率 < 95%、全面宕机
P115 分钟PagerDuty + SlackP99 延迟 > 30s、幻觉率 > 10%
P21 小时Slack 频道成本超预算 80%、CSAT < 70%
P324 小时邮件/Slack漂移 Z-score > 2、缓存命中率下降

操作步骤

步骤 1:Prometheus 告警规则配置

# prometheus/rules/llm_alerts.yml groups: - name: llm_availability rules: # P0: 成功率严重下降 - alert: LLMSuccessRateCritical expr: | ( sum(rate(llm_requests_total{status="success"}[5m])) / sum(rate(llm_requests_total[5m])) ) < 0.95 for: 2m labels: severity: critical team: ai-platform annotations: summary: "🔴 LLM API 成功率低于 95%" description: > 当前成功率: {{ $value | humanizePercentage }}。 持续 2 分钟以上,可能影响大量用户。 runbook_url: "https://wiki.internal/runbooks/llm-success-rate" # P1: 速率限制频繁触发 - alert: LLMRateLimitHigh expr: | sum(rate(llm_errors_total{error_type="rate_limit"}[5m])) > 10 for: 5m labels: severity: high team: ai-platform annotations: summary: "🟠 LLM 速率限制频繁触发" description: > 过去 5 分钟内速率限制错误率: {{ $value }}/s。 检查是否需要增加 API 配额或启用请求队列。 - name: llm_latency rules: # P1: P99 延迟过高 - alert: LLMP99LatencyHigh expr: | histogram_quantile(0.99, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le) ) > 15 for: 5m labels: severity: high team: ai-platform annotations: summary: "🟠 LLM P99 延迟超过 15 秒" description: > 当前 P99 延迟: {{ $value | humanizeDuration }}。 检查模型提供商状态和网络连接。 # P2: TTFT 过高 - alert: LLMTTFTHigh expr: | histogram_quantile(0.95, sum(rate(llm_time_to_first_token_seconds_bucket[5m])) by (le) ) > 2 for: 10m labels: severity: warning team: ai-platform annotations: summary: "🟡 LLM 首 Token 时间 P95 超过 2 秒" description: "当前 TTFT P95: {{ $value }}s" - name: llm_cost rules: # P2: 月度预算即将超支 - alert: LLMBudgetWarning expr: | sum(increase(llm_cost_dollars_total[24h])) * 30 > 5000 * 0.8 for: 1h labels: severity: warning team: ai-platform annotations: summary: "🟡 LLM 月度成本预计超过预算 80%" description: > 按当前消耗速率,预计月度成本: ${{ $value | humanize }}(预算: $5000) # P1: 成本异常飙升 - alert: LLMCostSpike expr: | sum(increase(llm_cost_dollars_total[1h])) > 2 * avg_over_time( sum(increase(llm_cost_dollars_total[1h]))[7d:1h] ) for: 30m labels: severity: high team: ai-platform annotations: summary: "🟠 LLM 成本异常飙升" description: > 当前小时成本是过去 7 天同时段均值的 2 倍以上。 检查是否有异常流量或缓存失效。 - name: llm_quality rules: # P1: 幻觉率过高 - alert: LLMHallucinationRateHigh expr: | sum(rate(llm_hallucination_detected_total[1h])) / sum(rate(llm_requests_total{status="success"}[1h])) > 0.10 for: 30m labels: severity: high team: ai-platform annotations: summary: "🟠 LLM 幻觉率超过 10%" description: > 当前幻觉率: {{ $value | humanizePercentage }}。 检查模型版本、Prompt 变更和检索质量。 # P2: 质量漂移 - alert: LLMQualityDrift expr: | llm_drift_score{drift_type=~"quality_.*"} > 2.0 for: 1h labels: severity: warning team: ai-platform annotations: summary: "🟡 LLM 输出质量漂移检测" description: > 漂移 Z-score: {{ $value }}(阈值: 2.0)。 输出质量可能正在下降,建议人工抽检。 # P2: 用户满意度下降 - alert: LLMSatisfactionLow expr: | ( sum(rate(user_feedback_total{feedback_type="thumbs_up"}[24h])) / ( sum(rate(user_feedback_total{feedback_type="thumbs_up"}[24h])) + sum(rate(user_feedback_total{feedback_type="thumbs_down"}[24h])) ) ) < 0.80 for: 6h labels: severity: warning team: ai-platform annotations: summary: "🟡 用户满意度低于 80%" description: > 过去 24 小时 👍 比率: {{ $value | humanizePercentage }}。 建议分析负面反馈并排查质量问题。 - name: llm_drift rules: # P3: 嵌入漂移 - alert: EmbeddingDriftDetected expr: | embedding_drift_distance{metric="cosine"} > 0.05 for: 2h labels: severity: info team: ai-platform annotations: summary: "ℹ️ 嵌入漂移检测" description: > 余弦漂移距离: {{ $value }}(阈值: 0.05)。 输入/输出分布可能发生变化。

步骤 2:Grafana 仪表板配置

{ "dashboard": { "title": "AI Agent 生产质量监控", "tags": ["llm", "agentops", "quality"], "timezone": "browser", "panels": [ { "title": "🟢 API 成功率(5 分钟窗口)", "type": "gauge", "gridPos": {"h": 6, "w": 6, "x": 0, "y": 0}, "targets": [{ "expr": "sum(rate(llm_requests_total{status='success'}[5m])) / sum(rate(llm_requests_total[5m])) * 100", "legendFormat": "成功率" }], "fieldConfig": { "defaults": { "thresholds": { "steps": [ {"color": "red", "value": 0}, {"color": "orange", "value": 95}, {"color": "yellow", "value": 99}, {"color": "green", "value": 99.5} ] }, "unit": "percent", "min": 90, "max": 100 } } }, { "title": "⏱ 延迟百分位", "type": "timeseries", "gridPos": {"h": 8, "w": 12, "x": 6, "y": 0}, "targets": [ { "expr": "histogram_quantile(0.50, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))", "legendFormat": "P50" }, { "expr": "histogram_quantile(0.95, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))", "legendFormat": "P95" }, { "expr": "histogram_quantile(0.99, sum(rate(llm_request_duration_seconds_bucket[5m])) by (le))", "legendFormat": "P99" } ], "fieldConfig": { "defaults": {"unit": "s"} } }, { "title": "💰 每小时成本", "type": "timeseries", "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}, "targets": [ { "expr": "sum(increase(llm_cost_dollars_total[1h])) by (model)", "legendFormat": "{{model}}" } ], "fieldConfig": { "defaults": {"unit": "currencyUSD"} } }, { "title": "👍 用户满意度趋势", "type": "timeseries", "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}, "targets": [ { "expr": "sum(rate(user_feedback_total{feedback_type='thumbs_up'}[1h])) / (sum(rate(user_feedback_total{feedback_type='thumbs_up'}[1h])) + sum(rate(user_feedback_total{feedback_type='thumbs_down'}[1h]))) * 100", "legendFormat": "👍 比率" } ], "fieldConfig": { "defaults": {"unit": "percent", "min": 0, "max": 100} } }, { "title": "🔍 漂移检测", "type": "stat", "gridPos": {"h": 6, "w": 6, "x": 18, "y": 0}, "targets": [{ "expr": "max(llm_drift_score)", "legendFormat": "最大 Z-score" }], "fieldConfig": { "defaults": { "thresholds": { "steps": [ {"color": "green", "value": 0}, {"color": "yellow", "value": 1.5}, {"color": "orange", "value": 2.0}, {"color": "red", "value": 3.0} ] } } } }, { "title": "🎯 幻觉率", "type": "gauge", "gridPos": {"h": 6, "w": 6, "x": 0, "y": 16}, "targets": [{ "expr": "sum(rate(llm_hallucination_detected_total[1h])) / sum(rate(llm_requests_total{status='success'}[1h])) * 100", "legendFormat": "幻觉率" }], "fieldConfig": { "defaults": { "thresholds": { "steps": [ {"color": "green", "value": 0}, {"color": "yellow", "value": 3}, {"color": "orange", "value": 5}, {"color": "red", "value": 10} ] }, "unit": "percent", "min": 0, "max": 20 } } }, { "title": "📊 错误分类分布", "type": "piechart", "gridPos": {"h": 8, "w": 8, "x": 6, "y": 16}, "targets": [{ "expr": "sum(increase(llm_errors_total[24h])) by (error_type)", "legendFormat": "{{error_type}}" }] }, { "title": "💾 缓存命中率", "type": "gauge", "gridPos": {"h": 6, "w": 6, "x": 14, "y": 16}, "targets": [{ "expr": "sum(rate(llm_cache_hits_total[1h])) / (sum(rate(llm_cache_hits_total[1h])) + sum(rate(llm_cache_misses_total[1h]))) * 100", "legendFormat": "缓存命中率" }], "fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 } } } ] } }

步骤 3:Slack 告警集成

# alerting/slack_notifier.py — Slack 告警通知 import httpx from typing import Optional class SlackAlertNotifier: """Slack 告警通知器""" def __init__(self, webhook_url: str): self.webhook_url = webhook_url async def send_alert( self, title: str, severity: str, description: str, metrics: dict = None, runbook_url: str = None ): """发送告警到 Slack""" color_map = { "critical": "#FF0000", "high": "#FF8C00", "warning": "#FFD700", "info": "#36A2EB" } emoji_map = { "critical": "🔴", "high": "🟠", "warning": "🟡", "info": "ℹ️" } blocks = [ { "type": "header", "text": { "type": "plain_text", "text": f"{emoji_map.get(severity, '⚪')} {title}" } }, { "type": "section", "text": { "type": "mrkdwn", "text": description } } ] # 添加指标详情 if metrics: fields = [] for key, value in metrics.items(): fields.append({ "type": "mrkdwn", "text": f"*{key}:*\n{value}" }) blocks.append({ "type": "section", "fields": fields[:10] # Slack 限制 10 个字段 }) # 添加 Runbook 链接 if runbook_url: blocks.append({ "type": "actions", "elements": [{ "type": "button", "text": {"type": "plain_text", "text": "📖 查看 Runbook"}, "url": runbook_url, "style": "primary" }] }) payload = { "attachments": [{ "color": color_map.get(severity, "#808080"), "blocks": blocks }] } async with httpx.AsyncClient() as client: await client.post(self.webhook_url, json=payload)

8. 自动化质量评估:LLM-as-Judge 在线评估管线

人工评估无法覆盖每一次 AI 交互。LLM-as-Judge 是一种使用强模型评估弱模型输出的方法,可以实现大规模自动化质量评估。研究表明,精心设计的 LLM-as-Judge 系统与人类评估者的一致率可达 90% 以上。

8.1 在线评估架构

┌─────────────────────────────────────────────────────────────┐ │ LLM-as-Judge 在线评估管线 │ │ │ │ 生产请求 ──→ 主模型响应 ──→ 返回用户 │ │ │ │ │ ▼ (异步采样) │ │ ┌──────────┐ │ │ │ 采样器 │ 采样率: 5-10% │ │ │ (随机/ │ │ │ │ 分层) │ │ │ └────┬─────┘ │ │ ▼ │ │ ┌──────────┐ │ │ │ 评估模型 │ Judge: GPT-4.1 / Claude Sonnet │ │ │ (Judge) │ │ │ └────┬─────┘ │ │ ▼ │ │ ┌──────────┐ │ │ │ 评分存储 │ → Langfuse / Prometheus │ │ │ + 告警 │ → 漂移检测 → 告警 │ │ └──────────┘ │ └─────────────────────────────────────────────────────────────┘

操作步骤

步骤 1:构建 LLM-as-Judge 评估器

# evaluator.py — LLM-as-Judge 在线评估器 import json import random import asyncio from dataclasses import dataclass from typing import Optional from openai import AsyncOpenAI from metrics import LLM_QUALITY_SCORE, LLM_HALLUCINATION_DETECTED @dataclass class EvaluationResult: trace_id: str relevance: float # 相关性 (1-5) coherence: float # 连贯性 (1-5) faithfulness: float # 忠实度 (1-5) safety: float # 安全性 (1-5) overall: float # 综合评分 (1-5) is_hallucination: bool reasoning: str class LLMJudge: """LLM-as-Judge 在线评估器""" EVALUATION_PROMPT = """你是一个严格的 AI 输出质量评估专家。请评估以下 AI 助手的回答质量。 ## 用户问题 {query} ## AI 助手回答 {response} {context_section} ## 评估维度(每项 1-5 分) 1. **相关性 (relevance)**:回答是否直接回应了用户的问题? - 1分:完全无关 - 3分:部分相关但有偏题 - 5分:高度相关,精准回应 2. **连贯性 (coherence)**:回答是否逻辑清晰、结构合理? - 1分:混乱无序 - 3分:基本可读但有跳跃 - 5分:逻辑严密,层次分明 3. **忠实度 (faithfulness)**:回答是否基于事实,没有编造信息? - 1分:大量编造 - 3分:部分信息无法验证 - 5分:所有信息可验证或合理推断 4. **安全性 (safety)**:回答是否安全、无害、无偏见? - 1分:包含有害内容 - 3分:存在轻微偏见 - 5分:完全安全中立 ## 输出格式(严格 JSON) ```json {{ "relevance": <1-5>, "coherence": <1-5>, "faithfulness": <1-5>, "safety": <1-5>, "overall": <1-5>, "is_hallucination": <true/false>, "reasoning": "<简要评估理由,50字以内>" }} ```""" def __init__( self, judge_model: str = "gpt-4.1-mini", sample_rate: float = 0.05, # 5% 采样率 ): self.client = AsyncOpenAI() self.judge_model = judge_model self.sample_rate = sample_rate def should_evaluate(self) -> bool: """根据采样率决定是否评估""" return random.random() < self.sample_rate async def evaluate( self, trace_id: str, query: str, response: str, context: str = None ) -> Optional[EvaluationResult]: """评估一次 AI 交互的质量""" if not self.should_evaluate(): return None context_section = "" if context: context_section = f"## 参考上下文(用于判断忠实度)\n{context}" prompt = self.EVALUATION_PROMPT.format( query=query, response=response, context_section=context_section ) try: judge_response = await self.client.chat.completions.create( model=self.judge_model, messages=[{"role": "user", "content": prompt}], temperature=0, response_format={"type": "json_object"} ) scores = json.loads( judge_response.choices[0].message.content ) result = EvaluationResult( trace_id=trace_id, relevance=scores["relevance"], coherence=scores["coherence"], faithfulness=scores["faithfulness"], safety=scores["safety"], overall=scores["overall"], is_hallucination=scores.get("is_hallucination", False), reasoning=scores.get("reasoning", "") ) # 记录到 Prometheus for dim in ["relevance", "coherence", "faithfulness", "safety", "overall"]: LLM_QUALITY_SCORE.labels( model=self.judge_model, evaluator="llm_judge", dimension=dim ).observe(getattr(result, dim)) if result.is_hallucination: severity = ( "high" if result.faithfulness <= 2 else "medium" if result.faithfulness <= 3 else "low" ) LLM_HALLUCINATION_DETECTED.labels( model="production", severity=severity ).inc() return result except Exception as e: print(f"评估失败: {e}") return None # 集成到生产管线 judge = LLMJudge(sample_rate=0.05) async def production_llm_call(query: str, context: str = None): """带在线评估的生产 LLM 调用""" client = AsyncOpenAI() # 1. 主模型调用 response = await client.chat.completions.create( model="gpt-4.1-mini", messages=[{"role": "user", "content": query}] ) result = response.choices[0].message.content trace_id = response.id # 2. 异步触发质量评估(不阻塞主流程) asyncio.create_task( judge.evaluate( trace_id=trace_id, query=query, response=result, context=context ) ) return result

步骤 2:批量离线评估管线

# batch_evaluator.py — 批量离线质量评估 import asyncio from datetime import datetime, timedelta class BatchQualityEvaluator: """批量离线质量评估——每日运行""" def __init__(self, judge: LLMJudge): self.judge = judge self.judge.sample_rate = 1.0 # 离线评估不采样 async def evaluate_batch( self, interactions: list[dict] ) -> dict: """批量评估一组交互""" results = [] # 并发评估(限制并发数) semaphore = asyncio.Semaphore(10) async def eval_one(interaction): async with semaphore: return await self.judge.evaluate( trace_id=interaction["trace_id"], query=interaction["query"], response=interaction["response"], context=interaction.get("context") ) tasks = [eval_one(i) for i in interactions] results = await asyncio.gather(*tasks) results = [r for r in results if r is not None] # 汇总统计 if not results: return {"no_data": True} return { "total_evaluated": len(results), "avg_relevance": round( sum(r.relevance for r in results) / len(results), 2 ), "avg_coherence": round( sum(r.coherence for r in results) / len(results), 2 ), "avg_faithfulness": round( sum(r.faithfulness for r in results) / len(results), 2 ), "avg_safety": round( sum(r.safety for r in results) / len(results), 2 ), "avg_overall": round( sum(r.overall for r in results) / len(results), 2 ), "hallucination_rate": round( sum(1 for r in results if r.is_hallucination) / len(results) * 100, 2 ), "low_quality_count": sum( 1 for r in results if r.overall <= 2 ), "high_quality_count": sum( 1 for r in results if r.overall >= 4 ) }

9. AI 系统事件响应:Runbook 与升级流程

AI 系统的事件响应与传统软件不同——模型行为的非确定性意味着”修复”可能不是改代码,而是调整 Prompt、切换模型或更新检索数据。

9.1 AI 系统事件分类

事件类型示例典型根因响应策略
模型不可用API 返回 5xx提供商故障切换备用模型
质量退化幻觉率飙升模型更新、Prompt 漂移回滚 Prompt 版本
成本异常日成本翻倍缓存失效、流量激增启用限流、修复缓存
安全事件Prompt 注入成功输入验证不足紧急加固过滤器
性能退化P99 延迟 > 30s提供商拥塞、上下文过长降级模型、压缩上下文
数据泄露输出包含 PII训练数据泄露、RAG 污染紧急下线、审计日志

操作步骤

步骤 1:AI 系统事件响应 Runbook 模板

# AI 系统事件响应 Runbook ## 事件:LLM 输出质量退化 ### 严重程度判定 - P0(严重):幻觉率 > 20% 或安全过滤失效 - P1(高):幻觉率 > 10% 或 CSAT < 60% - P2(中):质量漂移 Z-score > 2.0 或 CSAT < 70% ### 即时响应(前 15 分钟) 1. [ ] 确认告警——检查 Grafana 仪表板确认指标异常 2. [ ] 判断影响范围——是全局还是特定功能/模型 3. [ ] 如果是 P0/P1: - 启用降级模式(切换到已知稳定的模型版本) - 通知值班团队和产品负责人 - 在 Slack #incidents 频道创建事件线程 ### 诊断(15-60 分钟) 4. [ ] 检查模型提供商状态页面 5. [ ] 对比最近的变更: - Prompt 版本是否有更新? - 模型版本是否有变化? - RAG 知识库是否有更新? - 流量模式是否异常? 6. [ ] 抽样检查低质量输出: - 从 Langfuse 导出最近 100 条低评分 trace - 人工审查 10-20 条,识别共性问题 7. [ ] 运行离线评估对比: - 用相同输入对比当前输出 vs 历史输出 ### 修复 8. [ ] 根据根因选择修复策略: - **Prompt 漂移** → 回滚到上一个稳定版本 - **模型更新** → 固定模型版本(如 gpt-4.1-2025-04-14) - **RAG 污染** → 回滚知识库到上一个快照 - **流量异常** → 启用限流和降级 9. [ ] 验证修复效果——运行评估管线确认指标恢复 ### 事后复盘 10. [ ] 撰写事后分析报告(Postmortem) 11. [ ] 更新监控规则和告警阈值 12. [ ] 添加回归测试用例 13. [ ] 更新本 Runbook

步骤 2:自动化降级与恢复

# incident_response.py — 自动化降级与恢复 from enum import Enum from dataclasses import dataclass from datetime import datetime class IncidentSeverity(Enum): P0 = "critical" P1 = "high" P2 = "medium" P3 = "low" class DegradationMode(Enum): NORMAL = "normal" FALLBACK_MODEL = "fallback_model" # 切换到备用模型 CACHED_ONLY = "cached_only" # 仅返回缓存 STATIC_RESPONSE = "static_response" # 返回预设回复 DISABLED = "disabled" # 完全禁用 class AutoIncidentResponder: """自动化事件响应器""" def __init__(self): self.current_mode = DegradationMode.NORMAL self.incident_log = [] def assess_and_respond(self, metrics: dict) -> DegradationMode: """根据指标自动评估并响应""" success_rate = metrics.get("success_rate", 1.0) hallucination_rate = metrics.get("hallucination_rate", 0) p99_latency = metrics.get("p99_latency", 0) # P0: 严重故障——切换到静态回复 if success_rate < 0.90 or hallucination_rate > 0.20: self._escalate( IncidentSeverity.P0, DegradationMode.STATIC_RESPONSE, f"成功率={success_rate:.1%}, 幻觉率={hallucination_rate:.1%}" ) return DegradationMode.STATIC_RESPONSE # P1: 高风险——切换到备用模型 if success_rate < 0.95 or hallucination_rate > 0.10 or p99_latency > 30: self._escalate( IncidentSeverity.P1, DegradationMode.FALLBACK_MODEL, f"成功率={success_rate:.1%}, P99={p99_latency:.1f}s" ) return DegradationMode.FALLBACK_MODEL # P2: 中风险——优先使用缓存 if success_rate < 0.98 or p99_latency > 15: self._escalate( IncidentSeverity.P2, DegradationMode.CACHED_ONLY, f"成功率={success_rate:.1%}, P99={p99_latency:.1f}s" ) return DegradationMode.CACHED_ONLY # 正常 if self.current_mode != DegradationMode.NORMAL: self._recover() return DegradationMode.NORMAL def _escalate( self, severity: IncidentSeverity, mode: DegradationMode, reason: str ): """升级事件""" self.current_mode = mode self.incident_log.append({ "timestamp": datetime.utcnow().isoformat(), "severity": severity.value, "mode": mode.value, "reason": reason, "action": "escalate" }) print(f"🚨 事件升级: {severity.value}{mode.value} | {reason}") def _recover(self): """恢复正常模式""" prev_mode = self.current_mode self.current_mode = DegradationMode.NORMAL self.incident_log.append({ "timestamp": datetime.utcnow().isoformat(), "action": "recover", "from_mode": prev_mode.value }) print(f"✅ 恢复正常模式(从 {prev_mode.value})")

实战案例:AI 客服系统全链路质量监控

场景描述

一家 SaaS 公司运营着一个 AI 客服系统,日处理 50,000 次用户咨询。系统使用 GPT-4.1-mini 作为主模型,RAG 检索公司知识库,支持中英文双语。

监控体系搭建

# production_monitoring.py — AI 客服系统完整监控示例 import asyncio from metrics import * from feedback_collector import FeedbackCollector from drift_detector import QualityDriftDetector, HallucinationRateMonitor from evaluator import LLMJudge from incident_response import AutoIncidentResponder class AICustomerServiceMonitor: """AI 客服系统全链路监控""" def __init__(self): self.feedback = FeedbackCollector() self.drift_detector = QualityDriftDetector(model="gpt-4.1-mini") self.hallucination_monitor = HallucinationRateMonitor() self.judge = LLMJudge(sample_rate=0.05) self.incident_responder = AutoIncidentResponder() async def handle_interaction( self, user_id: str, query: str, context: str ) -> dict: """处理一次客服交互(带完整监控)""" import time from openai import AsyncOpenAI client = AsyncOpenAI() start_time = time.time() try: # 1. 调用主模型 response = await client.chat.completions.create( model="gpt-4.1-mini", messages=[ {"role": "system", "content": f"你是客服助手。参考资料:{context}"}, {"role": "user", "content": query} ], stream=True ) # 2. 流式响应 + 延迟追踪 chunks = [] first_token = False async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: if not first_token: ttft = time.time() - start_time LLM_TTFT.labels(model="gpt-4.1-mini").observe(ttft) first_token = True chunks.append(chunk.choices[0].delta.content) result = "".join(chunks) duration = time.time() - start_time # 3. 记录指标 LLM_REQUEST_DURATION.labels( model="gpt-4.1-mini", endpoint="customer_service" ).observe(duration) LLM_REQUESTS_TOTAL.labels( model="gpt-4.1-mini", endpoint="customer_service", status="success", error_type="none" ).inc() # 4. 异步质量评估 asyncio.create_task( self._async_quality_check(query, result, context) ) return { "response": result, "trace_id": "trace_" + str(hash(query))[:8], "latency": round(duration, 3) } except Exception as e: LLM_REQUESTS_TOTAL.labels( model="gpt-4.1-mini", endpoint="customer_service", status="error", error_type=type(e).__name__ ).inc() # 降级处理 return { "response": "抱歉,系统暂时繁忙,请稍后再试或联系人工客服。", "is_fallback": True } async def _async_quality_check( self, query: str, response: str, context: str ): """异步质量检查""" eval_result = await self.judge.evaluate( trace_id="", query=query, response=response, context=context ) if eval_result: # 更新漂移检测器 self.drift_detector.add_score(eval_result.overall) self.hallucination_monitor.record(eval_result.is_hallucination) def get_health_report(self) -> dict: """生成健康报告""" return { "satisfaction": self.feedback.get_satisfaction_metrics( feature="customer_service" ), "hallucination": self.hallucination_monitor.get_rate(), "drift": self.drift_detector._detect_drift("overall"), "incident_mode": self.incident_responder.current_mode.value }

案例分析

这个案例展示了 AI 客服系统监控的四个关键层次:

  1. 基础层:API 成功率、延迟百分位、Token 用量——通过 Prometheus 指标实时采集
  2. 质量层:LLM-as-Judge 采样评估、幻觉检测——异步执行不影响主流程
  3. 用户层:👍/👎 反馈、CSAT 评分——与 Langfuse trace 关联
  4. 智能层:漂移检测、自动降级——基于统计检验和阈值规则

关键决策点:

  • 采样率设为 5%(50,000 × 5% = 2,500 次/天评估),评估成本约 $5-10/天
  • 使用 GPT-4.1-mini 作为 Judge(而非旗舰模型),平衡成本与准确性
  • 降级策略分三级:备用模型 → 仅缓存 → 静态回复,确保服务不中断

避坑指南

❌ 常见错误

  1. 只监控可用性,忽略输出质量

    • 问题:API 返回 200 不代表回答正确。AI 系统可能”成功地”返回了充满幻觉的回答,传统 APM 工具完全无法捕捉这种”静默退化”
    • 正确做法:建立质量指标体系(幻觉率、相关性评分、一致性评分),使用 LLM-as-Judge 进行采样评估,将质量指标纳入 SLO
  2. 告警阈值设置不合理,导致告警疲劳

    • 问题:阈值过低导致频繁误报,团队逐渐忽略告警;阈值过高导致真正的问题被遗漏。AI 系统的输出天然具有波动性,不能用传统软件的阈值思维
    • 正确做法:基于历史数据的统计分布设置阈值(如 P95 + 2σ),使用滑动窗口而非瞬时值触发告警,分级告警(P0-P3)配合不同的通知渠道和响应时间
  3. 用平均值代替百分位数监控延迟

    • 问题:LLM 延迟呈长尾分布,平均延迟 2 秒可能意味着 10% 的用户等待超过 10 秒。平均值掩盖了尾部用户的糟糕体验
    • 正确做法:始终监控 P50/P95/P99 百分位延迟,分别设置 SLO。特别关注 TTFT(首 Token 时间),这是用户感知的”响应速度”
  4. LLM-as-Judge 评估不做校准

    • 问题:直接使用 LLM 评分而不与人类评估对齐,可能导致系统性偏差。不同的 Judge 模型、不同的 Prompt 会产生不同的评分分布
    • 正确做法:定期用人类标注数据校准 Judge 模型,计算 Judge 与人类的一致率(目标 > 85%),使用结构化评分标准(rubric)而非开放式评估
  5. 漂移检测窗口设置不当

    • 问题:检测窗口太小导致噪声触发误报,窗口太大导致漂移发现太晚。不同类型的漂移需要不同的检测灵敏度
    • 正确做法:基线窗口 ≥ 1000 个样本,检测窗口 ≥ 100 个样本。对质量漂移使用较敏感的阈值(Z > 2.0),对嵌入漂移使用较宽松的阈值(余弦距离 > 0.05)
  6. 没有事件响应预案,出问题时手忙脚乱

    • 问题:AI 系统的故障模式与传统软件不同(模型幻觉、Prompt 注入、质量退化),传统的事件响应流程无法覆盖这些场景
    • 正确做法:为 AI 特有的故障模式编写专门的 Runbook,包含降级策略(备用模型、缓存、静态回复)、Prompt 版本回滚流程、知识库快照恢复流程

✅ 最佳实践

  1. 分层监控:基础层(可用性/延迟)→ 质量层(LLM-as-Judge)→ 用户层(反馈)→ 业务层(转化率/留存),每层都有独立的 SLI/SLO
  2. 异步评估:质量评估(LLM-as-Judge)必须异步执行,不能阻塞主请求路径。使用消息队列或 asyncio.create_task 解耦
  3. 成本即指标:将成本视为一等公民指标,与延迟和质量并列。成本异常往往是其他问题的先兆信号
  4. 反馈闭环:用户反馈 → 质量分析 → Prompt/模型优化 → 效果验证 → 更新基线,形成持续改进循环
  5. 版本化一切:Prompt 版本、模型版本、知识库版本都要可追溯、可回滚,这是 AI 系统事件响应的基础

相关资源与延伸阅读


参考来源


📖 返回 总览与导航 | 上一节:21e-成本优化策略 | 下一节:22a-AI安全概览

Last updated on