22f - 生产部署安全清单
本文是《AI Agent 实战手册》第 22 章第 6 节。 上一节:22e-AI-Agent红队测试 | 下一节:23a-OpenClaw核心概念 📖 返回 总览与导航
⏱ 阅读时间:120 分钟 | 难度:⭐⭐⭐⭐⭐ 高级 | 前置知识:AI Agent 架构基础、网络安全基础、22a-22e 章节内容
概述
将 AI Agent 从原型推向生产环境,安全是最关键的门槛。2025-2026 年的实践表明,超过 60% 的大型企业已在生产环境中部署自主 AI Agent,但许多团队在安全防护上仍存在显著缺口——从硬编码的 API 密钥到缺失的网络隔离,从无速率限制到没有事件响应预案。本节提供一份全面的生产部署安全清单,覆盖网络安全、密钥管理、输出验证、速率限制和事件响应五大核心领域,帮助团队在上线前系统性地消除安全盲区。
1. 网络安全与基础设施加固
AI Agent 部署的网络安全不同于传统 Web 应用——Agent 需要访问外部 LLM API、内部数据库、MCP Server 等多种资源,攻击面更广。核心原则是:最小化暴露面、加密一切通信、隔离关键组件。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| AWS VPC + Security Groups | 网络隔离与访问控制 | 免费(VPC 本身免费,流量费另计) | AWS 上的 Agent 部署网络隔离 |
| Google VPC Service Controls | 服务边界与数据防泄露 | 免费(GCP 项目内) | GCP 上的 Agent 部署,防止数据渗透 |
| Cloudflare Zero Trust | 零信任网络访问 | 免费(50 用户以下)/ $7/用户/月起 | 远程访问 Agent 管理面板 |
| Tailscale | WireGuard 组网 | 免费(个人)/ $6/用户/月起 | 内部 Agent 服务间安全通信 |
| Nginx / Caddy | 反向代理 + TLS 终止 | 开源免费 | Agent API 端点的 TLS 加密和访问控制 |
| Falco | 运行时安全监控 | 开源免费 / 企业版按需报价 | 容器化 Agent 的运行时威胁检测 |
| Cilium | eBPF 网络策略 | 开源免费 | K8s 环境下 Agent Pod 的网络微分段 |
操作步骤
步骤 1:网络分段设计
AI Agent 生产网络架构(推荐)
┌─────────────────────────────────────────────────────────────┐
│ 公网(Internet) │
└──────────────────────┬──────────────────────────────────────┘
│ HTTPS (443)
┌────────▼────────┐
│ WAF / CDN │ ← Cloudflare / AWS WAF
│ (DDoS 防护层) │
└────────┬────────┘
│
┌────────▼────────┐
│ API Gateway / │ ← 认证、速率限制、日志
│ Load Balancer │
└────────┬────────┘
│
┌─────────────┼─────────────┐
│ DMZ 子网(公共) │
│ ┌──────────────────┐ │
│ │ Agent API 服务 │ │
│ │ (无状态、可扩展) │ │
│ └────────┬─────────┘ │
└───────────┼───────────────┘
│ 内部通信(mTLS)
┌───────────┼───────────────┐
│ 应用子网(私有) │
│ ┌─────────┐ ┌─────────┐ │
│ │ LLM 代理 │ │MCP Server│ │
│ │ (出站白名单)│ │(工具层) │ │
│ └─────────┘ └─────────┘ │
└───────────┼───────────────┘
│ 内部通信(加密)
┌───────────┼───────────────┐
│ 数据子网(隔离) │
│ ┌─────────┐ ┌─────────┐ │
│ │ 向量数据库 │ │ 业务数据库│ │
│ └─────────┘ └─────────┘ │
│ (仅允许应用子网访问) │
└───────────────────────────┘步骤 2:出站流量白名单
AI Agent 的出站流量必须严格控制——Agent 只应访问已知的 LLM API 端点和必要的外部服务,防止数据渗透。
# Kubernetes NetworkPolicy 示例:限制 Agent Pod 出站流量
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: agent-egress-whitelist
namespace: ai-agents
spec:
podSelector:
matchLabels:
app: ai-agent
policyTypes:
- Egress
egress:
# 允许 DNS 解析
- to:
- namespaceSelector: {}
ports:
- protocol: UDP
port: 53
# 允许访问 LLM API(通过 FQDN 或 IP 段)
- to:
- ipBlock:
cidr: 0.0.0.0/0
ports:
- protocol: TCP
port: 443
# 允许访问内部数据库
- to:
- podSelector:
matchLabels:
app: vector-db
ports:
- protocol: TCP
port: 6333
# 允许访问内部 MCP Server
- to:
- podSelector:
matchLabels:
app: mcp-server
ports:
- protocol: TCP
port: 8080配合 Cilium 的 FQDN 策略实现更精细的域名级控制:
# Cilium CiliumNetworkPolicy:域名级出站白名单
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
name: agent-fqdn-egress
spec:
endpointSelector:
matchLabels:
app: ai-agent
egress:
- toFQDNs:
- matchName: "api.openai.com"
- matchName: "api.anthropic.com"
- matchName: "generativelanguage.googleapis.com"
toPorts:
- ports:
- port: "443"
protocol: TCP步骤 3:TLS 加密与 mTLS
# Nginx 反向代理配置:Agent API 端点 TLS 加密
server {
listen 443 ssl http2;
server_name agent-api.example.com;
# TLS 1.3 强制
ssl_protocols TLSv1.3;
ssl_certificate /etc/ssl/certs/agent-api.crt;
ssl_certificate_key /etc/ssl/private/agent-api.key;
# HSTS 头
add_header Strict-Transport-Security
"max-age=63072000; includeSubDomains; preload" always;
# 安全头
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "DENY" always;
add_header Content-Security-Policy
"default-src 'none'; connect-src 'self'" always;
location /api/agent/ {
proxy_pass http://agent-service:8000;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Request-ID $request_id;
# 请求体大小限制(防止超大 prompt 攻击)
client_max_body_size 1m;
# 超时设置(防止长时间挂起)
proxy_read_timeout 120s;
proxy_connect_timeout 10s;
}
}内部服务间通信使用 mTLS(双向 TLS):
# Istio mTLS 策略:Agent 服务网格内强制 mTLS
apiVersion: security.istio.io/v1
kind: PeerAuthentication
metadata:
name: agent-mtls-strict
namespace: ai-agents
spec:
mtls:
mode: STRICT提示词模板:网络安全架构审查
你是一名云安全架构师。请审查以下 AI Agent 部署的网络架构,
识别安全风险并提供改进建议。
## 当前架构
- 部署环境:[AWS/GCP/Azure/自建]
- Agent 服务位置:[公网/私有子网/容器]
- LLM API 访问方式:[直连/代理/网关]
- 数据库位置:[同子网/独立子网/外部托管]
- MCP Server 位置:[同 Pod/独立服务/外部]
## 当前安全措施
- 防火墙规则:[描述当前规则]
- TLS 配置:[版本和证书管理方式]
- 网络分段:[当前分段策略]
## 请评估
1. 网络分段是否充分隔离了 Agent、数据和工具层?
2. 出站流量是否有白名单控制?
3. 内部通信是否加密?
4. 是否存在数据渗透路径?
5. 提供具体的改进建议和优先级排序。2. 密钥管理与凭证安全
AI Agent 系统涉及大量敏感凭证——LLM API 密钥、数据库连接串、MCP Server 认证令牌、第三方服务 OAuth 令牌等。2025 年的研究发现,超过 12,000 个活跃 API 密钥和密码出现在用于训练 LLM 的公开数据集中,凸显了密钥管理的重要性。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| HashiCorp Vault | 动态密钥管理、自动轮换 | 开源免费(社区版)/ HCP Vault $0.03/小时起 | 企业级密钥管理,支持 OpenAI API 密钥动态生成 |
| AWS Secrets Manager | 云原生密钥存储与轮换 | $0.40/密钥/月 + $0.05/万次 API 调用 | AWS 环境下的密钥管理 |
| Doppler | 开发者友好的密钥管理平台 | 免费(5 人以下)/ $18/人/月起 | 跨环境密钥同步,AI 管线密钥管理 |
| Infisical | 开源密钥管理平台 | 开源免费(自托管)/ $8/人/月起(云版) | 自托管密钥管理,CI/CD 集成 |
| 1Password Service Accounts | 服务账户密钥管理 | $4/月起(个人)/ 企业版按需报价 | 小团队的密钥管理 |
| Scalekit Token Vault | AI Agent 专用令牌保险库 | 按需报价 | AI Agent 的 OAuth 令牌集中管理 |
| GitGuardian | 代码仓库密钥泄露检测 | 免费(公开仓库)/ $40/开发者/月起 | CI/CD 中检测意外提交的密钥 |
| TruffleHog | 密钥扫描工具 | 开源免费 | 扫描代码库和 Git 历史中的泄露密钥 |
操作步骤
步骤 1:密钥分类与清点
AI Agent 系统密钥清单模板
┌─────────────────────────────────────────────────────────────┐
│ 密钥类别 │ 示例 │ 风险等级 │ 轮换周期 │
│──────────────────│────────────────────│─────────│─────────│
│ LLM API 密钥 │ OpenAI, Anthropic │ 高 │ 30 天 │
│ 数据库凭证 │ PostgreSQL, Redis │ 严重 │ 90 天 │
│ MCP Server 令牌 │ 自定义 MCP 认证 │ 高 │ 30 天 │
│ OAuth 令牌 │ GitHub, Slack │ 高 │ 按过期 │
│ 向量数据库密钥 │ Pinecone, Qdrant │ 中 │ 90 天 │
│ 监控平台密钥 │ LangSmith, Langfuse│ 中 │ 90 天 │
│ 加密密钥 │ AES/RSA 密钥 │ 严重 │ 365 天 │
│ Webhook 签名密钥 │ HMAC 签名 │ 中 │ 90 天 │
└─────────────────────────────────────────────────────────────┘步骤 2:使用 HashiCorp Vault 管理 LLM API 密钥
HashiCorp 于 2025 年发布了 Vault 的 OpenAI 动态密钥插件,支持按需生成临时 API 密钥并自动过期,避免长期密钥泄露风险。
# 1. 启用 OpenAI 密钥引擎
vault secrets enable -path=openai vault-plugin-secrets-openai
# 2. 配置 OpenAI 管理员密钥(仅 Vault 持有)
vault write openai/config \
admin_api_key="sk-admin-xxxx" \
organization_id="org-xxxx"
# 3. 创建角色(定义临时密钥的权限和 TTL)
vault write openai/roles/agent-prod \
name="agent-prod-key" \
ttl="1h" \
max_ttl="24h"
# 4. Agent 服务在运行时动态获取临时密钥
vault read openai/creds/agent-prod
# 返回:临时 API 密钥,1 小时后自动过期# agent_vault_integration.py — Agent 服务集成 Vault
import hvac
import os
from functools import lru_cache
from datetime import datetime, timedelta
class VaultSecretManager:
"""通过 Vault 管理 Agent 运行时密钥"""
def __init__(self):
self.client = hvac.Client(
url=os.environ["VAULT_ADDR"],
token=os.environ["VAULT_TOKEN"], # 使用 AppRole 更安全
)
self._cache = {}
self._expiry = {}
def get_llm_api_key(self, provider: str = "openai") -> str:
"""获取 LLM API 密钥(带缓存和自动续期)"""
cache_key = f"llm_{provider}"
# 检查缓存是否有效(提前 5 分钟续期)
if (cache_key in self._cache
and self._expiry.get(cache_key, datetime.min)
> datetime.now() + timedelta(minutes=5)):
return self._cache[cache_key]
# 从 Vault 获取新密钥
secret = self.client.secrets.kv.v2.read_secret_version(
path=f"ai-agents/{provider}",
mount_point="secret",
)
api_key = secret["data"]["data"]["api_key"]
ttl = secret["data"]["metadata"].get("ttl", 3600)
# 更新缓存
self._cache[cache_key] = api_key
self._expiry[cache_key] = (
datetime.now() + timedelta(seconds=ttl)
)
return api_key
def get_db_credentials(self, db_name: str) -> dict:
"""获取数据库动态凭证"""
secret = self.client.secrets.database.generate_credentials(
name=f"agent-{db_name}-role",
)
return {
"username": secret["data"]["username"],
"password": secret["data"]["password"],
"ttl": secret["lease_duration"],
}步骤 3:密钥轮换自动化
# GitHub Actions:自动检测密钥泄露 + 触发轮换
name: Secret Leak Detection & Rotation
on:
push:
branches: [main, develop]
pull_request:
jobs:
scan-secrets:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # 扫描完整 Git 历史
- name: Run TruffleHog
uses: trufflesecurity/trufflehog@main
with:
extra_args: --only-verified
- name: Run GitGuardian
uses: GitGuardian/ggshield-action@v1
env:
GITGUARDIAN_API_KEY: ${{ secrets.GITGUARDIAN_API_KEY }}
rotate-on-leak:
needs: scan-secrets
if: failure()
runs-on: ubuntu-latest
steps:
- name: Alert Security Team
uses: slackapi/slack-github-action@v2
with:
webhook: ${{ secrets.SECURITY_SLACK_WEBHOOK }}
payload: |
{
"text": "🚨 密钥泄露检测:${{ github.repository }} 发现疑似泄露的密钥,请立即检查并轮换。"
}
- name: Trigger Key Rotation
run: |
curl -X POST "$VAULT_ADDR/v1/sys/leases/revoke-prefix/openai/creds/" \
-H "X-Vault-Token: $VAULT_TOKEN"
echo "已撤销所有 OpenAI 临时密钥,服务将自动获取新密钥"步骤 4:防止密钥泄露到 LLM 上下文
# prevent_secret_leakage.py — 防止密钥进入 LLM 上下文
import re
from typing import Optional
# 密钥模式正则表达式
SECRET_PATTERNS = [
(r"sk-[a-zA-Z0-9]{20,}", "OpenAI API Key"),
(r"sk-ant-[a-zA-Z0-9\-]{20,}", "Anthropic API Key"),
(r"AIza[a-zA-Z0-9\-_]{35}", "Google API Key"),
(r"ghp_[a-zA-Z0-9]{36}", "GitHub PAT"),
(r"xoxb-[0-9]{10,}-[a-zA-Z0-9]{20,}", "Slack Bot Token"),
(r"(?i)(password|passwd|pwd)\s*[=:]\s*\S+", "Password"),
(r"(?i)(secret|token|key)\s*[=:]\s*['\"]?\S{16,}", "Generic Secret"),
(r"-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----", "Private Key"),
]
def sanitize_for_llm(text: str) -> str:
"""清洗文本中的密钥,防止泄露到 LLM 上下文"""
sanitized = text
for pattern, label in SECRET_PATTERNS:
sanitized = re.sub(
pattern,
f"[REDACTED:{label}]",
sanitized,
)
return sanitized
def check_llm_output_for_secrets(output: str) -> Optional[str]:
"""检查 LLM 输出是否包含密钥(防止模型泄露记忆中的密钥)"""
for pattern, label in SECRET_PATTERNS:
if re.search(pattern, output):
return f"检测到 LLM 输出中包含疑似 {label}"
return None提示词模板:密钥管理审计
你是一名安全审计师。请审查以下 AI Agent 系统的密钥管理实践,
识别风险并提供改进建议。
## 当前密钥管理方式
- 密钥存储位置:[环境变量/配置文件/密钥管理服务/硬编码]
- 密钥轮换策略:[手动/自动/无]
- 密钥访问控制:[描述谁可以访问哪些密钥]
- 密钥泄露检测:[有/无,使用什么工具]
## Agent 使用的密钥清单
[列出所有密钥类型和用途]
## 请评估
1. 是否存在硬编码密钥或明文存储的风险?
2. 密钥轮换周期是否合理?
3. 是否有密钥泄露到 LLM 上下文的风险?
4. 密钥的最小权限原则是否得到遵守?
5. 提供具体的改进建议和实施优先级。3. 输出验证与内容安全
AI Agent 的输出直接面向用户或触发下游操作,必须经过严格验证。输出验证覆盖三个维度:事实性验证(幻觉检测)、格式验证(结构化输出校验)、内容安全(有害内容过滤)。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Guardrails AI | 输出验证框架(结构化 + 语义) | 开源免费 | Python Agent 的输出格式和内容验证 |
| NeMo Guardrails (NVIDIA) | 对话安全护栏 | 开源免费 | 对话式 Agent 的输入/输出安全控制 |
| Maxim AI | 幻觉检测与 Agent 评估 | 免费试用 / 企业版按需报价 | 生产环境幻觉监控 |
| Langfuse | LLM 可观测性 + 输出质量追踪 | 开源免费(自托管)/ 云版按用量计费 | 输出质量持续监控 |
| HaluGate (vLLM) | Token 级实时幻觉检测 | 开源免费 | 高性能实时幻觉拦截 |
| OpenAI Moderation API | 内容安全分类 | 免费 | 检测有害、暴力、色情等内容 |
| Perspective API (Google) | 毒性评分 | 免费(配额内) | 评估输出文本的毒性和攻击性 |
| Rebuff | Prompt 注入检测 | 开源免费 | 检测输入中的 Prompt 注入尝试 |
操作步骤
步骤 1:构建多层输出验证管线
AI Agent 输出验证管线
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ LLM 原始 │───▶│ 第 1 层:格式 │───▶│ 第 2 层:安全 │
│ 输出 │ │ 验证 │ │ 过滤 │
└──────────┘ └──────┬───────┘ └──────┬───────┘
│ 格式不合规 │ 包含有害内容
▼ → 重试/降级 ▼ → 拦截/替换
┌──────────────┐
───▶│ 第 3 层:事实 │
│ 性验证 │
└──────┬───────┘
│ 检测到幻觉
▼ → 标记/修正
┌──────────────┐
───▶│ 第 4 层:密钥 │
│ 泄露检测 │
└──────┬───────┘
│ 检测到密钥
▼ → 脱敏/拦截
┌──────────────┐
───▶│ 最终输出 │
│ → 用户/下游 │
└──────────────┘步骤 2:实现输出验证框架
# output_validator.py — AI Agent 输出验证框架
import re
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class ValidationResult(Enum):
PASS = "pass"
FAIL_FORMAT = "fail_format"
FAIL_SAFETY = "fail_safety"
FAIL_HALLUCINATION = "fail_hallucination"
FAIL_SECRET_LEAK = "fail_secret_leak"
@dataclass
class ValidatedOutput:
original: str
sanitized: str
result: ValidationResult
issues: list[str] = field(default_factory=list)
confidence: float = 1.0
class AgentOutputValidator:
"""AI Agent 输出多层验证器"""
def __init__(self, config: dict):
self.max_length = config.get("max_output_length", 10000)
self.blocked_patterns = config.get("blocked_patterns", [])
self.required_format = config.get("required_format", None)
def validate(self, output: str, context: dict = None) -> ValidatedOutput:
"""执行完整的输出验证管线"""
issues = []
sanitized = output
# 第 1 层:格式验证
format_ok, format_issues = self._validate_format(output)
issues.extend(format_issues)
# 第 2 层:安全过滤
safety_ok, safety_issues, sanitized = self._validate_safety(
sanitized
)
issues.extend(safety_issues)
# 第 3 层:密钥泄露检测
secret_ok, secret_issues, sanitized = self._check_secret_leak(
sanitized
)
issues.extend(secret_issues)
# 第 4 层:URL / Markdown 图片渗透检测
exfil_ok, exfil_issues, sanitized = self._check_exfiltration(
sanitized
)
issues.extend(exfil_issues)
# 确定最终结果
if not secret_ok:
result = ValidationResult.FAIL_SECRET_LEAK
elif not safety_ok:
result = ValidationResult.FAIL_SAFETY
elif not format_ok:
result = ValidationResult.FAIL_FORMAT
else:
result = ValidationResult.PASS
return ValidatedOutput(
original=output,
sanitized=sanitized,
result=result,
issues=issues,
)
def _validate_format(self, output: str) -> tuple[bool, list[str]]:
"""格式验证:长度、结构、编码"""
issues = []
if len(output) > self.max_length:
issues.append(
f"输出超过最大长度限制 ({len(output)}/{self.max_length})"
)
if len(output.strip()) == 0:
issues.append("输出为空")
# 检测异常编码(可能是编码混淆攻击的输出)
try:
output.encode("utf-8")
except UnicodeEncodeError:
issues.append("输出包含无效 Unicode 字符")
return len(issues) == 0, issues
def _validate_safety(
self, output: str
) -> tuple[bool, list[str], str]:
"""安全过滤:有害内容、被禁止的模式"""
issues = []
sanitized = output
for pattern in self.blocked_patterns:
if re.search(pattern, sanitized, re.IGNORECASE):
issues.append(f"输出匹配被禁止的模式: {pattern[:50]}")
sanitized = re.sub(
pattern, "[BLOCKED]", sanitized, flags=re.IGNORECASE
)
return len(issues) == 0, issues, sanitized
def _check_secret_leak(
self, output: str
) -> tuple[bool, list[str], str]:
"""密钥泄露检测"""
issues = []
sanitized = output
secret_patterns = [
(r"sk-[a-zA-Z0-9]{20,}", "OpenAI API Key"),
(r"sk-ant-[a-zA-Z0-9\-]{20,}", "Anthropic API Key"),
(r"-----BEGIN\s+PRIVATE\s+KEY-----", "Private Key"),
(r"(?i)password\s*[=:]\s*\S+", "Password"),
]
for pattern, label in secret_patterns:
if re.search(pattern, sanitized):
issues.append(f"检测到疑似 {label} 泄露")
sanitized = re.sub(
pattern, f"[REDACTED:{label}]", sanitized
)
return len(issues) == 0, issues, sanitized
def _check_exfiltration(
self, output: str
) -> tuple[bool, list[str], str]:
"""数据渗透检测:Markdown 图片、外部 URL"""
issues = []
sanitized = output
# 检测 Markdown 图片渗透
# (攻击者可能让 Agent 输出包含用户数据的图片 URL)
md_img_pattern = r"!\[.*?\]\(https?://(?!trusted\.domain).*?\)"
if re.search(md_img_pattern, sanitized):
issues.append("检测到可疑的外部 Markdown 图片链接")
sanitized = re.sub(
md_img_pattern, "[IMAGE_BLOCKED]", sanitized
)
# 检测隐藏的 HTML 标签
html_pattern = r"<(script|iframe|img|link|object|embed)[^>]*>"
if re.search(html_pattern, sanitized, re.IGNORECASE):
issues.append("检测到可疑的 HTML 标签")
sanitized = re.sub(
html_pattern, "[HTML_BLOCKED]", sanitized,
flags=re.IGNORECASE,
)
return len(issues) == 0, issues, sanitized步骤 3:幻觉检测集成
# hallucination_detector.py — 生产环境幻觉检测
from dataclasses import dataclass
@dataclass
class HallucinationCheck:
is_hallucinated: bool
confidence: float
evidence: str
category: str # "factual" | "faithfulness" | "fabrication"
class ProductionHallucinationDetector:
"""生产环境幻觉检测器(多策略组合)"""
def __init__(self, config: dict):
self.threshold = config.get("threshold", 0.7)
self.strategies = config.get(
"strategies",
["self_consistency", "source_attribution"],
)
async def check(
self,
output: str,
context: str = "",
sources: list[str] = None,
) -> HallucinationCheck:
"""检查输出是否包含幻觉"""
scores = []
# 策略 1:自一致性检查(多次生成比较)
if "self_consistency" in self.strategies:
consistency = await self._self_consistency_check(
output, context
)
scores.append(consistency)
# 策略 2:来源归因检查(输出是否有来源支撑)
if "source_attribution" in self.strategies and sources:
attribution = await self._source_attribution_check(
output, sources
)
scores.append(attribution)
# 策略 3:事实性关键词检查
factual_score = self._factual_keyword_check(output)
scores.append(factual_score)
avg_score = sum(scores) / len(scores) if scores else 0
is_hallucinated = avg_score < self.threshold
return HallucinationCheck(
is_hallucinated=is_hallucinated,
confidence=avg_score,
evidence=(
"多项检测策略表明输出可能包含不准确信息"
if is_hallucinated
else "输出通过幻觉检测"
),
category="faithfulness" if sources else "factual",
)
def _factual_keyword_check(self, output: str) -> float:
"""检测高风险幻觉信号词"""
risk_phrases = [
"据我所知", "我记得", "大约在", "可能是",
"如果我没记错", "一般来说",
]
high_confidence_phrases = [
"根据文档", "数据显示", "来源:", "参考:",
]
risk_count = sum(
1 for p in risk_phrases if p in output
)
confidence_count = sum(
1 for p in high_confidence_phrases if p in output
)
if risk_count > 2 and confidence_count == 0:
return 0.4 # 高幻觉风险
elif confidence_count > 0:
return 0.9 # 有来源支撑
return 0.7 # 中等置信度
async def _self_consistency_check(
self, output: str, context: str
) -> float:
"""自一致性检查(简化实现)"""
# 生产环境中:对同一问题多次生成,比较一致性
# 此处为简化示例
return 0.8
async def _source_attribution_check(
self, output: str, sources: list[str]
) -> float:
"""来源归因检查(简化实现)"""
# 生产环境中:检查输出中的声明是否有来源支撑
# 此处为简化示例
return 0.85提示词模板:输出验证规则设计
你是一名 AI 安全工程师。请为以下 AI Agent 设计输出验证规则。
## Agent 描述
- 功能:[Agent 的主要功能]
- 输出类型:[文本/JSON/代码/混合]
- 目标用户:[内部员工/外部客户/API 消费者]
- 敏感数据范围:[Agent 可能接触到的敏感数据类型]
## 请设计
1. 格式验证规则(输出结构、长度限制、编码要求)
2. 内容安全规则(需要过滤的内容类别)
3. 幻觉检测策略(适合该场景的检测方法)
4. 数据渗透防护规则(URL 白名单、Markdown 限制等)
5. 每条规则的处理方式(拦截/替换/标记/降级)4. 速率限制与滥用防护
AI Agent 的运行成本远高于传统 API——每次 LLM 调用消耗 token,每次工具调用可能触发外部 API 费用。缺乏速率限制不仅导致成本失控,还可能被恶意用户利用进行资源耗尽攻击。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Redis + 令牌桶算法 | 分布式速率限制 | 开源免费(自托管)/ Redis Cloud $5/月起 | 自建 Agent API 的速率限制 |
| Cloudflare Rate Limiting | 边缘速率限制 + DDoS 防护 | 免费(基础)/ $0.05/万次请求起 | Agent API 端点的边缘防护 |
| AWS API Gateway | API 管理 + 速率限制 | $3.50/百万次 API 调用 | AWS 上的 Agent API 管理 |
| Kong Gateway | API 网关 + 速率限制插件 | 开源免费 / 企业版按需报价 | 自建 API 网关,支持多种限流策略 |
| ngrok AI Gateway | AI 专用 API 网关 | 免费(开发)/ $8/月起 | AI API 密钥管理 + 速率限制 + 故障转移 |
| Helicone | LLM 代理 + 成本追踪 + 速率限制 | 免费(10K 请求/月)/ $20/月起 | LLM API 调用的成本控制和速率限制 |
| SlowAPI | Python FastAPI 速率限制 | 开源免费 | Python Agent 服务的请求级限流 |
操作步骤
步骤 1:设计多层速率限制策略
AI Agent 多层速率限制架构
┌─────────────────────────────────────────────────────────────┐
│ │
│ 第 1 层:边缘限流(Cloudflare / WAF) │
│ ├── IP 级限流:100 请求/分钟/IP │
│ ├── DDoS 防护:自动检测和缓解 │
│ └── 地理封锁:仅允许目标市场的流量 │
│ │
│ 第 2 层:API 网关限流(Kong / API Gateway) │
│ ├── 用户级限流:基于 API Key 或 JWT │
│ │ ├── 免费用户:10 请求/分钟,1000 token/请求 │
│ │ ├── 付费用户:60 请求/分钟,4000 token/请求 │
│ │ └── 企业用户:300 请求/分钟,8000 token/请求 │
│ └── 全局限流:总请求量上限(保护后端) │
│ │
│ 第 3 层:应用级限流(Agent 服务内部) │
│ ├── Token 预算:每用户每日 token 上限 │
│ ├── 工具调用限制:每次对话最多 N 次工具调用 │
│ ├── 并发限制:每用户最多 M 个并发 Agent 会话 │
│ └── 成本熔断:单次请求成本超过阈值时中断 │
│ │
│ 第 4 层:LLM API 限流(提供商侧) │
│ ├── OpenAI:RPM/TPM 限制(按 Tier) │
│ ├── Anthropic:RPM/TPM 限制(按 Tier) │
│ └── 自建模型:GPU 资源限制 │
│ │
└─────────────────────────────────────────────────────────────┘步骤 2:实现 Token 预算与成本熔断
# token_budget.py — AI Agent Token 预算与成本熔断
import time
from dataclasses import dataclass, field
from typing import Optional
import redis
@dataclass
class TokenBudget:
"""用户 Token 预算配置"""
daily_input_tokens: int = 100_000
daily_output_tokens: int = 50_000
max_tokens_per_request: int = 4_000
max_tool_calls_per_session: int = 20
max_concurrent_sessions: int = 3
cost_limit_per_request_usd: float = 0.50
cost_limit_daily_usd: float = 10.00
TIER_BUDGETS = {
"free": TokenBudget(
daily_input_tokens=50_000,
daily_output_tokens=20_000,
max_tokens_per_request=2_000,
max_tool_calls_per_session=10,
max_concurrent_sessions=1,
cost_limit_per_request_usd=0.10,
cost_limit_daily_usd=2.00,
),
"pro": TokenBudget(
daily_input_tokens=500_000,
daily_output_tokens=200_000,
max_tokens_per_request=8_000,
max_tool_calls_per_session=50,
max_concurrent_sessions=5,
cost_limit_per_request_usd=1.00,
cost_limit_daily_usd=50.00,
),
"enterprise": TokenBudget(
daily_input_tokens=5_000_000,
daily_output_tokens=2_000_000,
max_tokens_per_request=16_000,
max_tool_calls_per_session=100,
max_concurrent_sessions=20,
cost_limit_per_request_usd=5.00,
cost_limit_daily_usd=500.00,
),
}
class TokenBudgetEnforcer:
"""Token 预算执行器(基于 Redis)"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_budget(
self, user_id: str, tier: str, estimated_tokens: int
) -> tuple[bool, Optional[str]]:
"""检查用户是否有足够的 Token 预算"""
budget = TIER_BUDGETS.get(tier, TIER_BUDGETS["free"])
today = time.strftime("%Y-%m-%d")
key = f"token_budget:{user_id}:{today}"
# 获取今日已用量
used = self.redis.hgetall(key)
used_input = int(used.get(b"input_tokens", 0))
used_output = int(used.get(b"output_tokens", 0))
used_cost = float(used.get(b"cost_usd", 0))
# 检查 Token 限制
if used_input + estimated_tokens > budget.daily_input_tokens:
return False, (
f"已达今日输入 Token 上限 "
f"({used_input}/{budget.daily_input_tokens})"
)
# 检查单次请求 Token 限制
if estimated_tokens > budget.max_tokens_per_request:
return False, (
f"单次请求 Token 超限 "
f"({estimated_tokens}/{budget.max_tokens_per_request})"
)
# 检查每日成本限制
if used_cost >= budget.cost_limit_daily_usd:
return False, (
f"已达今日成本上限 "
f"(${used_cost:.2f}/${budget.cost_limit_daily_usd})"
)
return True, None
def record_usage(
self,
user_id: str,
input_tokens: int,
output_tokens: int,
cost_usd: float,
):
"""记录 Token 使用量"""
today = time.strftime("%Y-%m-%d")
key = f"token_budget:{user_id}:{today}"
pipe = self.redis.pipeline()
pipe.hincrby(key, "input_tokens", input_tokens)
pipe.hincrby(key, "output_tokens", output_tokens)
pipe.hincrbyfloat(key, "cost_usd", cost_usd)
pipe.hincrby(key, "request_count", 1)
pipe.expire(key, 86400 * 2) # 保留 2 天
pipe.execute()步骤 3:API 级速率限制(FastAPI + SlowAPI 示例)
# rate_limiter.py — FastAPI Agent API 速率限制
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# 基于 IP + API Key 的复合限流
def get_rate_limit_key(request: Request) -> str:
api_key = request.headers.get("X-API-Key", "anonymous")
ip = get_remote_address(request)
return f"{api_key}:{ip}"
limiter = Limiter(
key_func=get_rate_limit_key,
storage_uri="redis://localhost:6379/1",
default_limits=["10/minute"], # 默认限制
)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(
RateLimitExceeded, _rate_limit_exceeded_handler
)
@app.post("/api/agent/chat")
@limiter.limit(
"30/minute", # 付费用户限制
key_func=get_rate_limit_key,
error_message="请求过于频繁,请稍后再试。"
)
async def agent_chat(request: Request):
"""Agent 聊天端点(带速率限制)"""
# ... Agent 逻辑
pass
@app.post("/api/agent/tool-call")
@limiter.limit(
"60/minute",
key_func=get_rate_limit_key,
)
async def agent_tool_call(request: Request):
"""Agent 工具调用端点(更严格的限制)"""
# ... 工具调用逻辑
pass步骤 4:工具调用限制与循环检测
# tool_call_limiter.py — Agent 工具调用限制与循环检测
from dataclasses import dataclass, field
from collections import Counter
@dataclass
class ToolCallTracker:
"""追踪单次 Agent 会话的工具调用"""
max_calls: int = 20
max_calls_per_tool: int = 5
max_loop_iterations: int = 3
call_history: list[str] = field(default_factory=list)
def can_call(self, tool_name: str) -> tuple[bool, str]:
"""检查是否允许调用该工具"""
# 检查总调用次数
if len(self.call_history) >= self.max_calls:
return False, (
f"已达工具调用上限 ({self.max_calls} 次/会话)"
)
# 检查单工具调用次数
tool_count = self.call_history.count(tool_name)
if tool_count >= self.max_calls_per_tool:
return False, (
f"工具 {tool_name} 已达调用上限 "
f"({self.max_calls_per_tool} 次/会话)"
)
# 检测循环模式(如 A→B→A→B→A→B)
if self._detect_loop(tool_name):
return False, (
f"检测到工具调用循环模式,已中断"
)
return True, ""
def record_call(self, tool_name: str):
"""记录工具调用"""
self.call_history.append(tool_name)
def _detect_loop(self, next_tool: str) -> bool:
"""检测工具调用循环"""
if len(self.call_history) < 4:
return False
# 检查最近的调用是否形成重复模式
recent = self.call_history[-6:] + [next_tool]
for pattern_len in range(2, 4):
pattern = recent[-pattern_len:]
repetitions = 0
for i in range(
len(recent) - pattern_len, -1, -pattern_len
):
segment = recent[i:i + pattern_len]
if segment == pattern:
repetitions += 1
else:
break
if repetitions >= self.max_loop_iterations:
return True
return False提示词模板:速率限制策略设计
你是一名 API 安全架构师。请为以下 AI Agent 服务设计速率限制策略。
## 服务描述
- Agent 类型:[客服/编码助手/数据分析/通用]
- 用户规模:[预估日活用户数]
- 用户层级:[免费/付费/企业,各层级比例]
- LLM 提供商:[OpenAI/Anthropic/自建,各自的 RPM/TPM 限制]
- 月度 LLM 预算:[$X]
## 请设计
1. 各用户层级的速率限制(请求/分钟、Token/天、成本/天)
2. 工具调用限制(每会话最大调用次数、循环检测策略)
3. 成本熔断阈值(单次请求、每日、每月)
4. DDoS 防护策略(边缘限流、IP 封禁规则)
5. 超限时的用户体验(错误消息、降级策略、排队机制)5. 事件响应与应急预案
AI Agent 的安全事件不同于传统应用——模型被劫持、Prompt 注入导致数据泄露、Agent 执行未授权操作等场景需要专门的响应流程。OpenAI 在 2026 年公开承认浏览器 Agent 中的 Prompt 注入是”结构性漏洞”,这意味着事件响应能力是生产部署的必备条件。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| PagerDuty | 事件管理与告警升级 | $21/用户/月起 | 7×24 安全事件告警和值班管理 |
| Opsgenie (Atlassian) | 告警管理与事件响应 | 免费(5 用户)/ $9/用户/月起 | 中小团队的事件响应管理 |
| Grafana + Loki | 日志聚合与安全事件检索 | 开源免费(自托管)/ Grafana Cloud 免费起 | Agent 安全日志的集中管理和检索 |
| Falco | 运行时安全事件检测 | 开源免费 / Falco Cloud 按需报价 | 容器化 Agent 的运行时威胁检测 |
| Wazuh | 开源 SIEM + 入侵检测 | 开源免费 | 全面的安全事件监控和合规审计 |
| Langfuse | LLM 调用追踪与审计 | 开源免费(自托管)/ 云版按用量计费 | Agent 行为审计和异常检测 |
| Sentry | 应用错误追踪 | 免费(5K 事件/月)/ $26/月起 | Agent 服务的错误监控和告警 |
操作步骤
步骤 1:定义 AI Agent 安全事件分级
AI Agent 安全事件分级标准
┌─────────────────────────────────────────────────────────────┐
│ │
│ P0 — 严重(15 分钟内响应,1 小时内缓解) │
│ ├── Agent 被劫持执行未授权操作(如发送数据到外部) │
│ ├── 批量用户数据通过 Agent 泄露 │
│ ├── LLM API 密钥泄露并被滥用 │
│ ├── Agent 执行了破坏性操作(删除数据、修改配置) │
│ └── 生产环境 Agent 被用于攻击其他系统 │
│ │
│ P1 — 高危(1 小时内响应,4 小时内缓解) │
│ ├── 系统提示词被完整提取 │
│ ├── 单用户数据通过 Agent 泄露 │
│ ├── Agent 绕过审批流执行敏感操作 │
│ ├── Prompt 注入导致 Agent 行为异常 │
│ └── 异常的 Token 消耗(疑似滥用) │
│ │
│ P2 — 中危(4 小时内响应,24 小时内缓解) │
│ ├── Agent 输出包含不当内容(但未泄露敏感数据) │
│ ├── 速率限制被绕过 │
│ ├── 工具调用异常(非安全性问题) │
│ └── 幻觉导致用户收到错误信息 │
│ │
│ P3 — 低危(24 小时内响应,1 周内修复) │
│ ├── 安全日志中发现可疑但未成功的攻击尝试 │
│ ├── 非敏感信息泄露(如模型版本) │
│ └── 安全配置偏离最佳实践 │
│ │
└─────────────────────────────────────────────────────────────┘步骤 2:构建 AI Agent 安全监控告警
# security_monitor.py — AI Agent 安全事件监控
import logging
import json
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger("agent_security")
class ThreatLevel(Enum):
P0_CRITICAL = "P0"
P1_HIGH = "P1"
P2_MEDIUM = "P2"
P3_LOW = "P3"
@dataclass
class SecurityEvent:
timestamp: str
event_type: str
threat_level: ThreatLevel
user_id: str
session_id: str
description: str
evidence: dict
auto_action: Optional[str] = None
class AgentSecurityMonitor:
"""AI Agent 安全事件实时监控"""
def __init__(self, alert_service, redis_client):
self.alert_service = alert_service
self.redis = redis_client
async def check_tool_call(
self,
user_id: str,
session_id: str,
tool_name: str,
tool_args: dict,
tool_result: str,
):
"""监控工具调用的安全性"""
# 检测 1:敏感工具未授权调用
sensitive_tools = [
"send_email", "delete_file", "execute_code",
"database_write", "api_call_external",
]
if tool_name in sensitive_tools:
await self._log_event(SecurityEvent(
timestamp=datetime.utcnow().isoformat(),
event_type="sensitive_tool_call",
threat_level=ThreatLevel.P2_MEDIUM,
user_id=user_id,
session_id=session_id,
description=f"敏感工具调用: {tool_name}",
evidence={
"tool": tool_name,
"args_preview": str(tool_args)[:500],
},
))
# 检测 2:工具调用频率异常
await self._check_call_frequency(
user_id, session_id, tool_name
)
# 检测 3:工具输出中的数据渗透信号
await self._check_data_exfiltration(
user_id, session_id, tool_name, tool_result
)
async def check_agent_output(
self,
user_id: str,
session_id: str,
output: str,
validation_result: dict,
):
"""监控 Agent 输出的安全性"""
if validation_result.get("secret_leak"):
await self._log_event(SecurityEvent(
timestamp=datetime.utcnow().isoformat(),
event_type="secret_leak_in_output",
threat_level=ThreatLevel.P0_CRITICAL,
user_id=user_id,
session_id=session_id,
description="Agent 输出中检测到密钥泄露",
evidence={
"leak_type": validation_result["secret_type"],
"output_preview": output[:200],
},
auto_action="terminate_session",
))
if validation_result.get("prompt_injection_echo"):
await self._log_event(SecurityEvent(
timestamp=datetime.utcnow().isoformat(),
event_type="prompt_injection_success",
threat_level=ThreatLevel.P1_HIGH,
user_id=user_id,
session_id=session_id,
description="疑似 Prompt 注入成功",
evidence={
"output_preview": output[:500],
},
auto_action="flag_session",
))
async def _check_call_frequency(
self, user_id: str, session_id: str, tool_name: str
):
"""检测工具调用频率异常"""
key = f"tool_freq:{session_id}"
count = self.redis.incr(key)
self.redis.expire(key, 60) # 1 分钟窗口
if count > 30: # 1 分钟内超过 30 次工具调用
await self._log_event(SecurityEvent(
timestamp=datetime.utcnow().isoformat(),
event_type="abnormal_tool_frequency",
threat_level=ThreatLevel.P1_HIGH,
user_id=user_id,
session_id=session_id,
description=(
f"工具调用频率异常: {count} 次/分钟"
),
evidence={"count_per_minute": count},
auto_action="throttle_session",
))
async def _check_data_exfiltration(
self,
user_id: str,
session_id: str,
tool_name: str,
tool_result: str,
):
"""检测数据渗透尝试"""
import re
# 检测外部 URL 调用
external_urls = re.findall(
r"https?://(?!internal\.company\.com)[^\s]+",
tool_result,
)
if external_urls and tool_name in [
"send_email", "http_request", "webhook"
]:
await self._log_event(SecurityEvent(
timestamp=datetime.utcnow().isoformat(),
event_type="potential_data_exfiltration",
threat_level=ThreatLevel.P0_CRITICAL,
user_id=user_id,
session_id=session_id,
description="检测到可能的数据渗透尝试",
evidence={
"tool": tool_name,
"external_urls": external_urls[:5],
},
auto_action="block_and_alert",
))
async def _log_event(self, event: SecurityEvent):
"""记录安全事件并触发告警"""
# 结构化日志
logger.warning(json.dumps({
"type": "security_event",
"event": event.event_type,
"level": event.threat_level.value,
"user": event.user_id,
"session": event.session_id,
"description": event.description,
"evidence": event.evidence,
}))
# 自动响应
if event.auto_action == "terminate_session":
await self._terminate_session(event.session_id)
elif event.auto_action == "block_and_alert":
await self._block_user(event.user_id)
# P0/P1 事件触发即时告警
if event.threat_level in [
ThreatLevel.P0_CRITICAL, ThreatLevel.P1_HIGH
]:
await self.alert_service.send_alert(
severity=event.threat_level.value,
title=f"AI Agent 安全事件: {event.event_type}",
description=event.description,
details=event.evidence,
)
async def _terminate_session(self, session_id: str):
"""紧急终止 Agent 会话"""
self.redis.set(
f"session_blocked:{session_id}", "1", ex=3600
)
logger.critical(f"会话已紧急终止: {session_id}")
async def _block_user(self, user_id: str):
"""临时封禁用户"""
self.redis.set(
f"user_blocked:{user_id}", "1", ex=3600
)
logger.critical(f"用户已临时封禁: {user_id}")步骤 3:事件响应运行手册
# AI Agent 安全事件响应运行手册
## 场景 1:Agent 被 Prompt 注入劫持
### 症状
- Agent 执行了用户未请求的操作
- Agent 输出包含异常内容(如系统提示词)
- 安全监控检测到异常工具调用模式
### 即时响应(15 分钟内)
1. [ ] 终止受影响的 Agent 会话
2. [ ] 检查 Agent 是否执行了敏感操作(邮件发送、数据导出等)
3. [ ] 如有数据泄露,立即撤销相关 API 密钥
4. [ ] 通知安全团队和值班工程师
### 调查(1-4 小时)
1. [ ] 从 Langfuse/LangSmith 提取完整的对话追踪
2. [ ] 识别注入向量(直接输入/外部数据/工具输出)
3. [ ] 评估影响范围(受影响用户数、泄露数据量)
4. [ ] 检查是否有其他会话受到类似攻击
### 修复(4-24 小时)
1. [ ] 针对发现的注入向量添加防御措施
2. [ ] 更新输入/输出验证规则
3. [ ] 如涉及外部数据源,清洗受污染的数据
4. [ ] 运行红队测试验证修复有效性
### 事后(1-7 天)
1. [ ] 编写事后分析报告(RCA)
2. [ ] 更新攻击用例库
3. [ ] 将新的检测规则集成到 CI/CD
4. [ ] 如涉及用户数据泄露,按合规要求通知用户
---
## 场景 2:LLM API 密钥泄露
### 即时响应(15 分钟内)
1. [ ] 立即在 LLM 提供商控制台撤销泄露的密钥
2. [ ] 通过 Vault 生成新的临时密钥
3. [ ] 检查泄露密钥的使用日志(异常调用量/来源 IP)
4. [ ] 如密钥被滥用,联系 LLM 提供商报告
### 调查
1. [ ] 确定泄露途径(代码提交/日志/LLM 输出/配置文件)
2. [ ] 检查 GitGuardian/TruffleHog 扫描结果
3. [ ] 评估泄露密钥的权限范围和潜在影响
### 修复
1. [ ] 修复泄露途径(从代码/日志中移除密钥)
2. [ ] 迁移到动态密钥管理(Vault)
3. [ ] 添加密钥泄露检测到 CI/CD 管线
---
## 场景 3:Agent 成本异常飙升
### 即时响应
1. [ ] 检查是否有用户滥用(大量请求/超长 prompt)
2. [ ] 启用紧急成本熔断(降低所有用户的 Token 限额)
3. [ ] 检查是否有 DDoS 攻击或自动化滥用
### 调查
1. [ ] 分析 Token 消耗的用户分布和时间分布
2. [ ] 检查是否有 Agent 陷入无限循环
3. [ ] 检查是否有工具调用导致的级联 API 费用
### 修复
1. [ ] 调整速率限制和 Token 预算
2. [ ] 添加工具调用循环检测
3. [ ] 实施更严格的成本熔断策略提示词模板:事件响应分析
你是一名 AI 安全事件响应专家。请分析以下 AI Agent 安全事件
并提供响应建议。
## 事件信息
- 事件类型:[Prompt 注入/数据泄露/密钥泄露/成本异常/其他]
- 发现时间:[时间]
- 影响范围:[受影响用户数/数据量/成本]
- 当前状态:[进行中/已缓解/已修复]
## 已知信息
[粘贴安全日志、对话追踪、监控告警等]
## 请提供
1. 事件严重性评估(P0-P3)
2. 即时响应步骤(按优先级排序)
3. 根因分析方向
4. 短期和长期修复建议
5. 需要通知的利益相关方6. 生产部署安全清单(完整版)
以下是 AI Agent 上线前必须完成的安全检查清单。建议打印或复制到项目管理工具中,逐项确认。
🔒 网络安全
- Agent 服务部署在私有子网,不直接暴露公网
- 数据库和向量数据库部署在隔离子网,仅允许应用层访问
- 出站流量白名单已配置(仅允许访问已知的 LLM API 端点)
- 所有外部通信使用 TLS 1.3 加密
- 内部服务间通信使用 mTLS
- WAF / DDoS 防护已启用
- 安全头已配置(HSTS、X-Content-Type-Options、CSP)
- 请求体大小限制已设置(防止超大 prompt 攻击)
- 连接超时已配置(防止长时间挂起)
🔑 密钥管理
- 所有密钥存储在密钥管理服务中(Vault / Secrets Manager),无硬编码
- LLM API 密钥使用动态生成 + 自动过期机制
- 数据库凭证使用动态凭证(Vault Database Secrets Engine)
- 密钥轮换策略已制定并自动化(30-90 天周期)
- CI/CD 管线中集成了密钥泄露检测(GitGuardian / TruffleHog)
- 密钥不会出现在日志、错误消息或 LLM 上下文中
- 密钥访问有审计日志
- 紧急密钥撤销流程已测试
🛡️ 输出验证
- 输出格式验证已实现(长度限制、结构校验、编码检查)
- 有害内容过滤已启用(暴力、色情、仇恨言论等)
- 密钥泄露检测已集成到输出管线
- Markdown 图片 / HTML 标签渗透检测已启用
- 外部 URL 白名单已配置(防止数据渗透)
- 幻觉检测策略已实施(至少一种检测方法)
- PII 检测和脱敏已启用
- 输出验证失败时的降级策略已定义
⏱️ 速率限制
- 边缘层速率限制已配置(IP 级)
- API 层速率限制已配置(用户级,按层级差异化)
- Token 预算已设置(每用户每日上限)
- 单次请求 Token 上限已设置
- 工具调用次数限制已设置(每会话上限)
- 工具调用循环检测已实现
- 成本熔断已配置(单次请求 + 每日 + 每月)
- 并发会话限制已设置
- 超限时的用户提示和降级策略已定义
🚨 事件响应
- 安全事件分级标准已定义(P0-P3)
- 事件响应运行手册已编写(至少覆盖 3 个核心场景)
- 安全告警已配置(P0/P1 事件即时通知)
- 值班制度已建立(7×24 覆盖)
- Agent 会话紧急终止机制已实现
- 用户临时封禁机制已实现
- 安全日志集中存储(至少保留 90 天)
- 事后分析(RCA)模板已准备
- 数据泄露通知流程已定义(合规要求)
🔍 监控与审计
- Agent 所有对话已记录到可观测性平台(Langfuse / LangSmith)
- 工具调用日志已记录(工具名、参数、结果、耗时)
- 安全事件检测规则已配置(异常频率、敏感操作、数据渗透)
- 成本监控仪表板已搭建(Token 消耗、API 费用、趋势)
- 安全指标已定义并追踪(攻击拦截率、误报率、响应时间)
- 定期安全审计计划已制定(月度/季度)
🏗️ 基础设施加固
- Agent 容器使用非 root 用户运行
- 容器镜像已扫描漏洞(Trivy / Snyk)
- 文件系统只读挂载(除必要的临时目录)
- 资源限制已设置(CPU、内存、磁盘)
- 健康检查已配置(存活探针 + 就绪探针)
- 自动扩缩容策略已配置
- 备份和灾难恢复计划已测试
✅ 合规验证
- 数据处理协议(DPA)已与 LLM 提供商签署
- 隐私影响评估(PIA)已完成
- 适用的合规框架已确认(GDPR / SOC 2 / HIPAA / EU AI Act)
- 用户知情同意机制已实现(告知 AI 参与)
- 数据驻留要求已满足
- 红队测试已完成并修复所有 P0/P1 漏洞
实战案例:SaaS 产品 AI 客服 Agent 安全部署全流程
背景
某 B2B SaaS 公司计划部署 AI 客服 Agent,替代部分人工客服。Agent 需要:
- 回答产品问题(连接 RAG 知识库)
- 查询客户订单状态(连接业务数据库)
- 创建工单(调用工单系统 API)
- 发送确认邮件(调用邮件服务)
用户规模:5,000 日活,3 个层级(免费/专业/企业)。
步骤 1:网络架构部署
# docker-compose.prod.yml — 安全部署配置
version: "3.9"
services:
# 反向代理(TLS 终止 + 速率限制)
nginx:
image: nginx:1.27-alpine
ports:
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/ssl/certs:ro
networks:
- dmz
deploy:
resources:
limits:
memory: 256M
# Agent API 服务
agent-api:
image: company/agent-api:v2.1
environment:
- VAULT_ADDR=http://vault:8200
- REDIS_URL=redis://redis:6379
- LOG_LEVEL=INFO
networks:
- dmz
- internal
deploy:
replicas: 3
resources:
limits:
memory: 1G
cpus: "1.0"
read_only: true
tmpfs:
- /tmp:size=100M
user: "1000:1000" # 非 root 用户
security_opt:
- no-new-privileges:true
# 向量数据库(隔离网络)
qdrant:
image: qdrant/qdrant:v1.12
networks:
- data
volumes:
- qdrant_data:/qdrant/storage
deploy:
resources:
limits:
memory: 2G
# Redis(速率限制 + 会话管理)
redis:
image: redis:7-alpine
command: >
redis-server
--requirepass ${REDIS_PASSWORD}
--maxmemory 256mb
--maxmemory-policy allkeys-lru
networks:
- internal
deploy:
resources:
limits:
memory: 512M
# Vault(密钥管理)
vault:
image: hashicorp/vault:1.18
cap_add:
- IPC_LOCK
networks:
- internal
volumes:
- vault_data:/vault/data
networks:
dmz: # Agent API + Nginx(可访问外部)
driver: bridge
internal: # Agent API + Redis + Vault(内部通信)
driver: bridge
internal: true # 禁止外部访问
data: # 数据库层(最严格隔离)
driver: bridge
internal: true
volumes:
qdrant_data:
vault_data:步骤 2:安全配置清单执行
执行结果:
✅ 网络安全(9/9)
✅ Agent 服务在 internal 网络,通过 Nginx 反向代理暴露
✅ Qdrant 在 data 网络,仅 Agent API 可访问
✅ 出站白名单:api.openai.com, api.anthropic.com
✅ TLS 1.3(Nginx 终止)
✅ 内部通信加密(Docker 网络隔离)
✅ Cloudflare WAF + DDoS 防护
✅ 安全头已配置
✅ 请求体限制 1MB
✅ 超时 120s
✅ 密钥管理(8/8)
✅ 所有密钥存储在 Vault
✅ OpenAI 密钥动态生成(TTL 1h)
✅ PostgreSQL 动态凭证(TTL 24h)
✅ 自动轮换已配置
✅ GitGuardian 集成到 CI/CD
✅ 日志中密钥已脱敏
✅ Vault 审计日志已启用
✅ 紧急撤销流程已测试
✅ 输出验证(8/8)
✅ 输出长度限制 10,000 字符
✅ OpenAI Moderation API 集成
✅ 密钥泄露检测已启用
✅ Markdown 图片渗透检测已启用
✅ URL 白名单已配置
✅ 自一致性幻觉检测已启用
✅ PII 检测(邮箱、电话、身份证号)
✅ 降级策略:返回"请联系人工客服"
✅ 速率限制(9/9)
✅ Cloudflare:100 请求/分钟/IP
✅ 免费用户:10 请求/分钟,50K token/天
✅ 专业用户:30 请求/分钟,500K token/天
✅ 企业用户:100 请求/分钟,5M token/天
✅ 单次请求最大 4K token
✅ 每会话最多 20 次工具调用
✅ 循环检测已启用
✅ 成本熔断:$0.50/请求,$10/天(免费用户)
✅ 并发限制:1/3/10(按层级)
✅ 事件响应(9/9)
✅ P0-P3 分级标准已定义
✅ 3 个核心场景运行手册已编写
✅ PagerDuty 告警已配置
✅ 7×24 值班已建立
✅ 会话终止机制已实现
✅ 用户封禁机制已实现
✅ 日志保留 180 天
✅ RCA 模板已准备
✅ GDPR 数据泄露通知流程已定义案例分析
本案例的关键决策点:
- 网络分层是基础:通过 Docker 网络隔离实现了 DMZ → 应用 → 数据的三层架构,即使 Agent 被劫持,也无法直接访问数据库
- 动态密钥消除了长期泄露风险:Vault 的动态密钥机制确保即使密钥被截获,也会在 1 小时内自动失效
- 多层速率限制防止成本失控:从边缘到应用的四层限流,确保单个恶意用户无法耗尽整个系统的资源
- 自动化响应减少人工干预:P0 事件自动终止会话和封禁用户,将响应时间从”人工发现”缩短到”秒级自动化”
避坑指南
❌ 常见错误
-
在环境变量中明文存储 LLM API 密钥就认为安全了
- 问题:环境变量可能通过错误日志、进程列表(
/proc/*/environ)、容器检查(docker inspect)泄露。更危险的是,密钥可能被 LLM 记忆并在输出中泄露 - 正确做法:使用专业的密钥管理服务(Vault / Secrets Manager),实施动态密钥生成和自动轮换,并在输出管线中添加密钥泄露检测
- 问题:环境变量可能通过错误日志、进程列表(
-
只在 API 层做速率限制,忽略 Token 预算
- 问题:一个恶意用户可以在速率限制内发送少量但超长的 prompt,消耗大量 Token 和费用。10 个精心构造的请求可能比 1000 个普通请求更昂贵
- 正确做法:实施多维度限制——请求频率 + Token 预算 + 成本熔断 + 工具调用次数,四个维度缺一不可
-
Agent 服务直接暴露公网,没有反向代理
- 问题:直接暴露意味着没有 TLS 终止、没有请求过滤、没有 DDoS 防护,攻击者可以直接探测 Agent 服务的漏洞
- 正确做法:Agent 服务部署在私有网络,通过反向代理(Nginx/Caddy)+ WAF + CDN 暴露,实施纵深防御
-
没有 Agent 输出验证,直接将 LLM 响应返回给用户
- 问题:LLM 可能输出包含密钥、PII、有害内容、Markdown 渗透链接的响应。在 Prompt 注入成功的情况下,输出可能包含攻击者指定的任意内容
- 正确做法:构建多层输出验证管线(格式→安全→密钥→渗透→幻觉),每层都有明确的处理策略(拦截/替换/标记)
-
没有事件响应预案,出事后手忙脚乱
- 问题:AI Agent 的安全事件具有独特性(非确定性、难以复现、影响范围难以评估),传统的事件响应流程不完全适用
- 正确做法:编写 AI Agent 专用的事件响应运行手册,覆盖 Prompt 注入、密钥泄露、成本异常等核心场景,定期演练
✅ 最佳实践
- 采用”默认拒绝”原则:Agent 的网络出站、工具调用、数据访问都应该默认拒绝,仅白名单放行。宁可误拦截,不可漏放行
- 将安全检查集成到 CI/CD:每次 prompt 模板、Agent 配置、工具定义变更时,自动运行红队测试和安全扫描
- 实施纵深防御:不依赖单一安全措施,而是在网络层、应用层、模型层、输出层都部署防御,任何一层被突破时其他层仍能提供保护
- 监控先于防御:在无法完全防御的情况下(如 Prompt 注入),优先建立全面的监控和快速响应能力
- 定期进行安全审计:每月运行自动化红队测试,每季度进行手动安全审计,持续更新攻击用例库
相关资源与延伸阅读
-
HashiCorp Vault — OpenAI 动态密钥插件
- 地址:https://github.com/hashicorp/vault-plugin-secrets-openai
- 说明:Vault 官方插件,支持为 OpenAI API 动态生成临时密钥并自动过期,消除长期密钥泄露风险
-
OWASP Top 10 for Agentic Applications(2026 版)
- 地址:https://owasp.org/www-project-top-10-for-large-language-model-applications/
- 说明:OWASP 发布的 Agentic AI 应用十大安全风险,是生产部署安全清单的权威参考框架
-
Guardrails AI — 开源 LLM 输出验证框架
- 地址:https://github.com/guardrails-ai/guardrails
- 说明:Python 框架,支持结构化输出验证、内容安全过滤、自定义验证规则
-
NeMo Guardrails — NVIDIA 对话安全护栏
- 地址:https://github.com/NVIDIA/NeMo-Guardrails
- 说明:NVIDIA 开源的对话安全框架,支持输入/输出安全控制、话题限制、事实性检查
-
Doppler — AI 管线密钥管理
- 地址:https://www.doppler.com/
- 说明:开发者友好的密钥管理平台,专门针对 AI 管线的密钥同步和泄露防护
-
Falco — 云原生运行时安全
- 地址:https://github.com/falcosecurity/falco
- 说明:CNCF 项目,容器和 Kubernetes 环境的运行时威胁检测,适合监控 Agent 容器的异常行为
-
HaluGate — Token 级实时幻觉检测
- 地址:https://blog.vllm.ai/2025/12/14/halugate.html
- 说明:vLLM 团队开发的条件式 Token 级幻觉检测管线,无需 LLM-as-judge,高性能实时拦截
-
Infisical — 开源密钥管理平台
- 地址:https://github.com/Infisical/infisical
- 说明:开源的端到端加密密钥管理平台,支持自托管,提供 SDK 和 CI/CD 集成
参考来源
- HashiCorp Blog: Managing OpenAI API Keys with Vault’s Dynamic Secrets Plugin (2025 年 7 月)
- Doppler Blog: Advanced LLM Security — Preventing Secret Leakage Across Agents and Prompts (2025 年 12 月)
- Doppler Blog: How to Manage Keys, Tokens, and Config Across AI Pipelines (2026 年 2 月)
- Scalekit Blog: What is a Token Vault? Secure Credential Management for AI Agent Workflows (2026 年 2 月)
- Composio Blog: From Auth to Action — The Complete Guide to Secure & Scalable AI Agent Infrastructure (2026 年 2 月)
- Skywork AI: AI Agent Safety FAQ — Risks, Controls & Best Practices (2026 年 2 月)
- Skywork AI: Agentic AI Safety Best Practices 2025 for Enterprise (2025 年 9 月)
- Fast.io: AI Agent Security Best Practices — Complete Checklist 2026 (2026 年)
- Adappt: The 90-Day Agentic AI Production Playbook for 2026 (2025 年 12 月)
- Sweet Security: How to Secure AI Agents in Production Environments (2026 年 2 月)
- Red Hat Developers: Manage AI Resource Use with TokenRateLimitPolicy (2026 年 2 月)
- vLLM Blog: HaluGate — Token-Level Truth: Real-Time Hallucination Detection for Production LLMs (2025 年 12 月)
- Getmaxim: Top 5 Tools to Monitor and Detect Hallucinations in AI Agents (2026 年 2 月)
- AWS Security Blog: Build Secure Network Architectures for Generative AI Applications (2025 年 9 月)
- SecureOps: Agentic AI Security Recommendations for the Next Phase of AI (2026 年 2 月)
📖 返回 总览与导航 | 上一节:22e-AI-Agent红队测试 | 下一节:23a-OpenClaw核心概念