08e - MCP 安全最佳实践
本文是《AI Agent 实战手册》第 8 章第 5 节。 上一节:08d-MCP工具模式目录 | 下一节:08f-MCP集成与推荐Server
概述
MCP 让 AI Agent 获得了连接外部工具和数据源的能力,但”能力越大,风险越大”。一个未经安全加固的 MCP Server 就像一扇没有锁的门——任何能连接的 Agent 都可以调用你的数据库、读取文件、触发 API。本节系统梳理 MCP 安全的六大核心领域:OAuth 2.1 认证集成、基于角色的访问控制(RBAC)、数据加密、输入验证与防注入、MCP Gateway 架构、以及审计日志,帮助你构建生产级安全的 MCP 部署。
1. MCP 安全威胁模型
在深入具体防御措施之前,先理解 MCP 环境面临的主要威胁:
1.1 MCP 特有攻击面
| 攻击类型 | 描述 | 风险等级 |
|---|---|---|
| 工具投毒(Tool Poisoning) | 恶意指令嵌入 MCP 工具描述中,对用户不可见但 AI 模型可读 | 🔴 高 |
| 工具响应注入 | 工具返回的响应中包含隐藏指令,操纵模型后续行为 | 🔴 高 |
| 间接 Prompt 注入(XPIA) | 恶意指令嵌入外部内容(文档、网页),通过 MCP 工具被 AI 处理 | 🔴 高 |
| 权限提升 | Agent 通过工具链组合绕过单个工具的权限限制 | 🟡 中 |
| 上下文泄露 | 不同用户会话间的上下文隔离不足,导致敏感信息泄露 | 🟡 中 |
| 凭证滥用 | 长期有效的 OAuth token 或 API Key 被截获后横向移动 | 🔴 高 |
| 会话劫持 | Agent 通信中的会话被劫持,冒充合法 Agent 调用工具 | 🟡 中 |
1.2 真实案例警示
2025 年 4 月 WhatsApp MCP 工具投毒事件:安全研究机构 Invariant Labs 演示了一种针对 WhatsApp MCP Server 的工具投毒攻击。攻击者创建了一个看似无害的”每日趣闻”工具,在初始安装后修改工具描述,将用户消息历史重定向到攻击者控制的号码。这一案例表明,MCP 工具的动态特性使得传统的一次性安全审查远远不够。
2. OAuth 2.1 认证集成
2.1 MCP 规范中的 OAuth 演进
MCP 协议的认证规范经历了快速演进:
| 规范版本 | 认证状态 | 关键变化 |
|---|---|---|
| 2024-11-05(初始版) | 无认证规范 | 协议发布,安全留给实现者 |
| 2025-03-26 | OAuth 2.1 作为可选特性 | 引入授权规范,PKCE 成为必需 |
| 2025-06-18 | OAuth 2.1 深度集成 | MCP Server 被定义为 OAuth 资源服务器,要求 Resource Indicators(RFC 8707) |
2025-06-18 规范的核心变化:将 MCP Server 明确定义为 OAuth 资源服务器(Resource Server),客户端必须实现 Resource Indicators(RFC 8707)来防止 token 滥用。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Auth0 | OAuth 2.1 身份提供商 | 免费(7,500 MAU)/ $35+/月 | 快速集成,丰富的 SDK |
| WorkOS | 企业级 SSO + MCP 认证 | 免费(1M MAU)/ 按用量计费 | 企业客户 SSO 需求 |
| Keycloak | 自托管 OAuth/OIDC 服务器 | 免费(开源) | 完全控制,合规要求 |
| Stytch | MCP 认证专用方案 | 免费(10K MAU)/ $0.05/MAU | MCP 原生支持 |
| Ory Hydra | 轻量级 OAuth 2.1 服务器 | 免费(开源)/ 云版 $29+/月 | 微服务架构 |
操作步骤
步骤 1:配置 OAuth 2.1 + PKCE 认证流程
MCP 规范强制要求所有客户端使用 PKCE(Proof Key for Code Exchange),这是 OAuth 2.1 的核心安全增强:
// MCP Client 端:OAuth 2.1 + PKCE 认证流程
import crypto from 'crypto';
class MCPOAuthClient {
private authServerUrl: string;
private clientId: string;
private redirectUri: string;
constructor(config: {
authServerUrl: string;
clientId: string;
redirectUri: string;
}) {
this.authServerUrl = config.authServerUrl;
this.clientId = config.clientId;
this.redirectUri = config.redirectUri;
}
// 步骤 1:生成 PKCE code_verifier 和 code_challenge
generatePKCE(): { verifier: string; challenge: string } {
const verifier = crypto.randomBytes(32)
.toString('base64url');
const challenge = crypto
.createHash('sha256')
.update(verifier)
.digest('base64url');
return { verifier, challenge };
}
// 步骤 2:构建授权 URL(含 Resource Indicator)
buildAuthUrl(mcpServerUrl: string): {
url: string;
pkce: { verifier: string; challenge: string };
state: string;
} {
const pkce = this.generatePKCE();
const state = crypto.randomBytes(16).toString('hex');
const params = new URLSearchParams({
response_type: 'code',
client_id: this.clientId,
redirect_uri: this.redirectUri,
code_challenge: pkce.challenge,
code_challenge_method: 'S256',
state,
// RFC 8707: Resource Indicator — 防止 token 被用于非目标服务器
resource: mcpServerUrl,
scope: 'mcp:tools:read mcp:tools:execute mcp:resources:read',
});
return {
url: `${this.authServerUrl}/authorize?${params}`,
pkce,
state,
};
}
// 步骤 3:用授权码换取 access_token
async exchangeCode(
code: string,
pkceVerifier: string,
mcpServerUrl: string
): Promise<{ accessToken: string; refreshToken: string; expiresIn: number }> {
const response = await fetch(`${this.authServerUrl}/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code,
client_id: this.clientId,
redirect_uri: this.redirectUri,
code_verifier: pkceVerifier, // PKCE 验证
resource: mcpServerUrl, // Resource Indicator
}),
});
if (!response.ok) {
throw new Error(`Token exchange failed: ${response.status}`);
}
return response.json();
}
}步骤 2:MCP Server 端验证 Token
// MCP Server 端:验证 Bearer Token
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import express from 'express';
import jwt from 'jsonwebtoken';
const app = express();
// Token 验证中间件
function validateMCPToken(req: express.Request, res: express.Response, next: express.NextFunction) {
const authHeader = req.headers['authorization'];
if (!authHeader?.startsWith('Bearer ')) {
return res.status(401).json({
error: 'missing_token',
message: 'Authorization header with Bearer token required',
});
}
const token = authHeader.slice(7);
try {
const decoded = jwt.verify(token, process.env.JWT_PUBLIC_KEY!, {
algorithms: ['RS256'],
// 验证 audience 匹配当前 MCP Server
audience: process.env.MCP_SERVER_URL,
issuer: process.env.OAUTH_ISSUER,
});
// 将用户身份附加到请求上下文
(req as any).mcpIdentity = {
sub: (decoded as any).sub,
roles: (decoded as any).roles || [],
scopes: (decoded as any).scope?.split(' ') || [],
exp: (decoded as any).exp,
};
next();
} catch (err) {
return res.status(401).json({
error: 'invalid_token',
message: err instanceof Error ? err.message : 'Token validation failed',
});
}
}
// 应用到 MCP 端点
app.use('/sse', validateMCPToken);
app.use('/messages', validateMCPToken);提示词模板
你是一个 MCP 安全审计助手。请审查以下 MCP Server 的认证配置:
[粘贴 MCP Server 代码或配置]
请检查以下安全要点:
1. 是否强制使用 PKCE(code_challenge_method: S256)?
2. Token 是否验证了 audience(防止 token 跨服务器滥用)?
3. 是否实现了 token 过期和刷新机制?
4. 是否使用了 Resource Indicators(RFC 8707)?
5. 敏感凭证是否通过环境变量或密钥管理器存储?
对每个要点给出 ✅ 通过 / ❌ 未通过 / ⚠️ 需改进 的评估,并提供修复建议。3. 基于角色的访问控制(RBAC)
3.1 为什么 MCP 需要细粒度 RBAC
MCP Server 默认暴露所有工具给连接的客户端——这在生产环境中是不可接受的。Kong 的工程团队将这一问题称为”Context Rot”:过多的工具暴露不仅是安全风险,还会降低 LLM 选择正确工具的能力。
3.2 RBAC 设计模式
┌─────────────────────────────────────────────────┐
│ MCP RBAC 层次 │
├─────────────────────────────────────────────────┤
│ Level 1: 服务器级别 │
│ ├── 哪些 Agent/用户可以连接此 MCP Server? │
│ │
│ Level 2: 工具级别 │
│ ├── 连接后可以看到/调用哪些工具? │
│ │
│ Level 3: 参数级别 │
│ ├── 调用工具时可以传递哪些参数值? │
│ │
│ Level 4: 数据级别 │
│ └── 工具返回的数据中哪些字段可见? │
└─────────────────────────────────────────────────┘操作步骤
步骤 1:定义角色和权限矩阵
// 角色-工具权限矩阵定义
interface MCPRole {
name: string;
description: string;
tools: {
[toolName: string]: {
allowed: boolean;
paramConstraints?: Record<string, any>; // 参数级约束
dataFilter?: string[]; // 可见字段白名单
};
};
}
const ROLES: Record<string, MCPRole> = {
viewer: {
name: 'viewer',
description: '只读访问,仅可查询数据',
tools: {
'db_query': {
allowed: true,
paramConstraints: {
query_type: ['SELECT'], // 只允许 SELECT
tables: ['public.*'], // 只允许公开表
},
},
'file_read': { allowed: true },
'file_write': { allowed: false },
'db_execute': { allowed: false },
},
},
developer: {
name: 'developer',
description: '开发环境完整访问',
tools: {
'db_query': { allowed: true },
'file_read': { allowed: true },
'file_write': {
allowed: true,
paramConstraints: {
path_pattern: ['^/workspace/.*'], // 只允许工作区
},
},
'db_execute': {
allowed: true,
paramConstraints: {
environment: ['development', 'staging'], // 禁止生产环境
},
},
},
},
admin: {
name: 'admin',
description: '完整管理权限',
tools: {
'db_query': { allowed: true },
'file_read': { allowed: true },
'file_write': { allowed: true },
'db_execute': { allowed: true },
'server_config': { allowed: true },
},
},
};步骤 2:实现工具级访问控制中间件
// 工具调用前的 RBAC 检查
function createRBACMiddleware(roles: Record<string, MCPRole>) {
return function checkToolAccess(
identity: { roles: string[] },
toolName: string,
params: Record<string, any>
): { allowed: boolean; reason?: string } {
// 合并用户所有角色的权限(取并集)
for (const roleName of identity.roles) {
const role = roles[roleName];
if (!role) continue;
const toolPerm = role.tools[toolName];
if (!toolPerm?.allowed) continue;
// 检查参数级约束
if (toolPerm.paramConstraints) {
for (const [param, allowedValues] of Object.entries(toolPerm.paramConstraints)) {
const actualValue = params[param];
if (actualValue && !matchesConstraint(actualValue, allowedValues as string[])) {
return {
allowed: false,
reason: `Parameter '${param}' value '${actualValue}' not allowed for role '${roleName}'`,
};
}
}
}
return { allowed: true };
}
return {
allowed: false,
reason: `No role grants access to tool '${toolName}'`,
};
};
}
function matchesConstraint(value: string, patterns: string[]): boolean {
return patterns.some(pattern => {
if (pattern.includes('*')) {
const regex = new RegExp('^' + pattern.replace(/\*/g, '.*') + '$');
return regex.test(value);
}
return value === pattern;
});
}工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Permit.io | MCP 动态授权引擎 | 免费(1K MAU)/ $99+/月 | 细粒度 RBAC/ABAC |
| Oso | 嵌入式授权框架 | 免费(开源)/ 云版按用量 | 代码内策略定义 |
| Open Policy Agent (OPA) | 通用策略引擎 | 免费(开源) | 复杂策略逻辑 |
| Cerbos | 可扩展访问控制 | 免费(开源)/ 云版 $499+/月 | 微服务策略管理 |
4. 数据加密
4.1 传输加密(In Transit)
MCP 支持三种传输层,安全要求各不相同:
| 传输方式 | 加密需求 | 推荐配置 |
|---|---|---|
| stdio | 不需要(本地进程间通信) | 确保进程权限隔离 |
| HTTP + SSE | 必须 TLS 1.3 | 强制 HTTPS,HSTS 头 |
| Streamable HTTP | 必须 TLS 1.3 | 强制 HTTPS,证书固定 |
# Nginx 反向代理配置:强制 TLS 1.3 for MCP Server
server {
listen 443 ssl http2;
server_name mcp.example.com;
# TLS 1.3 only
ssl_protocols TLSv1.3;
ssl_ciphers TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256;
ssl_prefer_server_ciphers off;
# 证书配置
ssl_certificate /etc/ssl/certs/mcp-server.crt;
ssl_certificate_key /etc/ssl/private/mcp-server.key;
# HSTS(强制浏览器使用 HTTPS)
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains" always;
# 安全头
add_header X-Content-Type-Options nosniff;
add_header X-Frame-Options DENY;
location / {
proxy_pass http://localhost:3001;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
# SSE 特殊配置
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
}
}4.2 静态加密(At Rest)
MCP Server 处理的敏感数据在存储时必须加密:
// 敏感配置加密存储示例
import crypto from 'crypto';
class MCPSecretStore {
private encryptionKey: Buffer;
constructor(masterKey: string) {
// 从主密钥派生加密密钥
this.encryptionKey = crypto.scryptSync(masterKey, 'mcp-salt', 32);
}
encrypt(plaintext: string): string {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv('aes-256-gcm', this.encryptionKey, iv);
const encrypted = Buffer.concat([
cipher.update(plaintext, 'utf8'),
cipher.final(),
]);
const authTag = cipher.getAuthTag();
// 格式:iv:authTag:ciphertext(全部 base64)
return `${iv.toString('base64')}:${authTag.toString('base64')}:${encrypted.toString('base64')}`;
}
decrypt(ciphertext: string): string {
const [ivB64, tagB64, dataB64] = ciphertext.split(':');
const iv = Buffer.from(ivB64, 'base64');
const authTag = Buffer.from(tagB64, 'base64');
const data = Buffer.from(dataB64, 'base64');
const decipher = crypto.createDecipheriv('aes-256-gcm', this.encryptionKey, iv);
decipher.setAuthTag(authTag);
return decipher.update(data) + decipher.final('utf8');
}
}
// 使用示例:加密存储 MCP Server 的数据库凭证
const store = new MCPSecretStore(process.env.MASTER_KEY!);
const encryptedDbUrl = store.encrypt('postgresql://user:pass@host:5432/db');
// 存储 encryptedDbUrl 到配置文件,运行时解密提示词模板
请审查以下 MCP Server 的数据加密配置:
[粘贴配置或代码]
检查要点:
1. 传输层是否强制 TLS 1.3?是否禁用了 TLS 1.0/1.1?
2. 敏感数据(API Key、数据库凭证)是否加密存储?
3. 加密算法是否使用 AES-256-GCM 或 ChaCha20-Poly1305?
4. 密钥是否通过环境变量或密钥管理器(如 AWS KMS、HashiCorp Vault)管理?
5. 日志中是否存在敏感信息明文输出?
请给出安全评级(A/B/C/D)和改进建议。5. 输入验证与防注入
5.1 MCP 输入验证的特殊挑战
与传统 API 不同,MCP 的输入来自 AI 模型的工具调用——这意味着输入可能被 Prompt 注入攻击操纵。防御必须在多个层面进行:
用户输入 → [AI 模型] → 工具调用参数 → [输入验证层] → MCP Server 执行
↑ ↑ ↑
直接注入 间接注入/XPIA 最后防线操作步骤
步骤 1:实现多层输入验证
import { z } from 'zod';
// 层 1:Schema 验证(结构正确性)
const FileReadSchema = z.object({
path: z.string()
.min(1)
.max(500)
.regex(/^[a-zA-Z0-9_\-\/\.]+$/, '路径只允许字母数字和 /_-. 字符'),
encoding: z.enum(['utf-8', 'ascii', 'base64']).default('utf-8'),
});
const DbQuerySchema = z.object({
query: z.string().min(1).max(2000),
params: z.array(z.unknown()).max(20).optional(),
});
// 层 2:语义验证(业务逻辑安全)
function validateFilePath(path: string): { safe: boolean; reason?: string } {
// 防止路径遍历
if (path.includes('..') || path.includes('~')) {
return { safe: false, reason: '路径遍历攻击:包含 .. 或 ~' };
}
// 限制访问范围
const allowedPrefixes = ['/workspace/', '/tmp/mcp/'];
if (!allowedPrefixes.some(prefix => path.startsWith(prefix))) {
return { safe: false, reason: `路径不在允许范围内:${allowedPrefixes.join(', ')}` };
}
// 禁止敏感文件
const blockedPatterns = [/\.env$/, /\.ssh\//, /\.aws\//, /password/i, /secret/i];
if (blockedPatterns.some(p => p.test(path))) {
return { safe: false, reason: '尝试访问敏感文件' };
}
return { safe: true };
}
// 层 3:SQL 注入防御
function validateSQLQuery(query: string): { safe: boolean; reason?: string } {
const upperQuery = query.toUpperCase().trim();
// 只允许 SELECT 语句
if (!upperQuery.startsWith('SELECT')) {
return { safe: false, reason: '只允许 SELECT 查询' };
}
// 检测危险关键字
const dangerous = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER',
'EXEC', 'EXECUTE', 'UNION', '--', '/*', 'xp_'];
for (const keyword of dangerous) {
if (upperQuery.includes(keyword)) {
return { safe: false, reason: `检测到危险关键字:${keyword}` };
}
}
return { safe: true };
}步骤 2:实现工具描述完整性验证
工具投毒攻击的核心是修改工具描述。通过签名验证确保工具描述未被篡改:
import crypto from 'crypto';
interface ToolDefinition {
name: string;
description: string;
inputSchema: Record<string, any>;
}
class ToolIntegrityVerifier {
private signingKey: string;
constructor(signingKey: string) {
this.signingKey = signingKey;
}
// 为工具定义生成签名
signTool(tool: ToolDefinition): string {
const canonical = JSON.stringify({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
});
return crypto
.createHmac('sha256', this.signingKey)
.update(canonical)
.digest('hex');
}
// 验证工具定义未被篡改
verifyTool(tool: ToolDefinition, expectedSignature: string): boolean {
const actualSignature = this.signTool(tool);
return crypto.timingSafeEqual(
Buffer.from(actualSignature, 'hex'),
Buffer.from(expectedSignature, 'hex')
);
}
}
// 在 MCP Server 启动时签名所有工具,运行时验证
const verifier = new ToolIntegrityVerifier(process.env.TOOL_SIGNING_KEY!);5.2 Prompt 注入防御清单
| 防御层 | 措施 | 实现方式 |
|---|---|---|
| 输入净化 | 移除/转义控制字符和特殊标记 | 正则过滤 \x00-\x1f、<|system|> 等 |
| Schema 强制 | 严格类型和格式验证 | Zod / JSON Schema 验证 |
| 路径限制 | 白名单目录 + 路径遍历检测 | realpath() 解析 + 前缀匹配 |
| SQL 参数化 | 永远使用参数化查询 | 禁止字符串拼接 SQL |
| 输出过滤 | 工具响应中移除敏感信息 | PII 检测 + 数据脱敏 |
| 速率限制 | 限制单个 Agent 的调用频率 | Token bucket / 滑动窗口 |
| 工具签名 | 验证工具描述完整性 | HMAC-SHA256 签名 |
6. MCP Gateway 架构
6.1 为什么需要 MCP Gateway
直接连接模式(Agent → MCP Server)在生产环境中面临三大问题:
- 凭证散布:每个 Agent 存储每个 MCP Server 的凭证,攻击面巨大
- 观测黑洞:无法统一监控 Agent-工具交互,调试靠猜
- 策略碎片化:安全策略分散在各个 MCP Server 中,无法统一管理
MCP Gateway 作为 Agent 和 MCP Server 之间的集中代理层,解决了这些问题:
┌──────────┐ ┌─────────────────────────────────┐ ┌──────────────┐
│ Agent A │────▶│ │────▶│ MCP Server 1 │
├──────────┤ │ MCP Gateway │ ├──────────────┤
│ Agent B │────▶│ │────▶│ MCP Server 2 │
├──────────┤ │ ┌─────────┐ ┌──────────────┐ │ ├──────────────┤
│ Agent C │────▶│ │ 认证/授权│ │ 工具 ACL 过滤 │ │────▶│ MCP Server 3 │
└──────────┘ │ └─────────┘ └──────────────┘ │ └──────────────┘
│ ┌─────────┐ ┌──────────────┐ │
│ │ 审计日志 │ │ 速率限制 │ │
│ └─────────┘ └──────────────┘ │
└─────────────────────────────────┘工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Kong AI Gateway | 企业级 MCP Gateway + 工具 ACL | 免费(OSS)/ 企业版联系销售 | 大规模生产部署 |
| Traefik Hub | MCP Gateway + OBO 认证 | 免费试用 / $299+/月 | Kubernetes 原生环境 |
| Lasso MCP Gateway | 安全优先的开源 MCP Gateway | 免费(开源) | 安全合规要求高的场景 |
| Pomerium | 零信任 MCP 访问代理 | 免费(开源)/ 企业版联系销售 | 零信任架构 |
| Composio | 托管 MCP Gateway 平台 | 免费层 / $49+/月 | 快速上手,托管服务 |
| MintMCP | 企业数据源 MCP Gateway | 联系销售 | 企业数据治理 |
操作步骤
步骤 1:使用 On-Behalf-Of(OBO)模式
MCP Gateway 的关键安全模式是 OBO(代表用户)认证——Gateway 不使用自己的高权限服务账号访问后端,而是以原始用户的身份和权限转发请求:
// MCP Gateway OBO 认证流程
class MCPGateway {
private tokenExchangeUrl: string;
constructor(config: { tokenExchangeUrl: string }) {
this.tokenExchangeUrl = config.tokenExchangeUrl;
}
// 将用户的 access_token 交换为目标 MCP Server 的 OBO token
async exchangeForOBOToken(
userToken: string,
targetMCPServer: string
): Promise<string> {
const response = await fetch(this.tokenExchangeUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'urn:ietf:params:oauth:grant-type:token-exchange',
subject_token: userToken,
subject_token_type: 'urn:ietf:params:oauth:token-type:access_token',
requested_token_type: 'urn:ietf:params:oauth:token-type:access_token',
resource: targetMCPServer, // 目标 MCP Server
scope: 'mcp:tools:execute',
}),
});
if (!response.ok) {
throw new Error(`OBO token exchange failed: ${response.status}`);
}
const data = await response.json();
return data.access_token;
}
// 转发工具调用请求,使用 OBO token
async forwardToolCall(
userToken: string,
targetServer: string,
toolName: string,
params: Record<string, any>
) {
const oboToken = await this.exchangeForOBOToken(userToken, targetServer);
return fetch(`${targetServer}/messages`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${oboToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
jsonrpc: '2.0',
method: 'tools/call',
params: { name: toolName, arguments: params },
id: crypto.randomUUID(),
}),
});
}
}步骤 2:配置工具级 ACL(访问控制列表)
# mcp-gateway-config.yaml — 工具级 ACL 配置
gateway:
listen: 0.0.0.0:8443
tls:
cert: /etc/ssl/mcp-gateway.crt
key: /etc/ssl/mcp-gateway.key
# 后端 MCP Server 注册
servers:
- name: database-server
url: http://mcp-db:3001
health_check: /health
- name: filesystem-server
url: http://mcp-fs:3002
health_check: /health
# 工具级 ACL 规则
acl:
default_policy: deny # 默认拒绝所有工具访问
rules:
- role: data-analyst
servers:
- name: database-server
tools:
- name: query_database
allow: true
constraints:
query_type: [SELECT]
- name: modify_database
allow: false
- name: filesystem-server
tools:
- name: read_file
allow: true
constraints:
path_prefix: /reports/
- name: write_file
allow: false
- role: developer
servers:
- name: database-server
tools: [allow_all]
- name: filesystem-server
tools: [allow_all]
# 速率限制
rate_limits:
per_user:
requests_per_minute: 60
requests_per_hour: 1000
per_tool:
query_database:
requests_per_minute: 30
modify_database:
requests_per_minute: 57. 审计日志
7.1 MCP 审计日志的必要性
AI Agent 的操作具有不可预测性——你无法提前知道 Agent 会调用哪些工具、传递什么参数。完整的审计日志是事后追溯和合规审查的唯一依据。
操作步骤
步骤 1:实现结构化审计日志
interface MCPAuditEvent {
timestamp: string;
eventType: 'tool_call' | 'tool_result' | 'auth_success' | 'auth_failure'
| 'access_denied' | 'rate_limited' | 'session_start' | 'session_end';
sessionId: string;
identity: {
userId: string;
roles: string[];
clientId: string;
};
tool?: {
name: string;
server: string;
params: Record<string, any>; // 脱敏后的参数
};
result?: {
status: 'success' | 'error' | 'denied';
durationMs: number;
errorCode?: string;
};
context: {
sourceIp: string;
userAgent: string;
traceId: string;
};
}
class MCPAuditLogger {
private logStream: NodeJS.WritableStream;
constructor(logPath: string) {
const fs = require('fs');
this.logStream = fs.createWriteStream(logPath, { flags: 'a' });
}
// 记录工具调用事件
logToolCall(event: Omit<MCPAuditEvent, 'timestamp'>) {
const auditEvent: MCPAuditEvent = {
...event,
timestamp: new Date().toISOString(),
};
// 脱敏处理:移除敏感参数值
if (auditEvent.tool?.params) {
auditEvent.tool.params = this.sanitizeParams(auditEvent.tool.params);
}
this.logStream.write(JSON.stringify(auditEvent) + '\n');
}
private sanitizeParams(params: Record<string, any>): Record<string, any> {
const sensitiveKeys = ['password', 'secret', 'token', 'key', 'credential'];
const sanitized = { ...params };
for (const key of Object.keys(sanitized)) {
if (sensitiveKeys.some(s => key.toLowerCase().includes(s))) {
sanitized[key] = '[REDACTED]';
}
}
return sanitized;
}
}步骤 2:配置日志聚合与告警
# Grafana Loki + Promtail 配置:MCP 审计日志聚合
# promtail-config.yaml
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: mcp-audit
static_configs:
- targets: [localhost]
labels:
job: mcp-audit
__path__: /var/log/mcp/audit/*.jsonl
pipeline_stages:
- json:
expressions:
eventType: eventType
toolName: tool.name
resultStatus: result.status
userId: identity.userId
- labels:
eventType:
toolName:
resultStatus:
userId:# Grafana 告警规则:MCP 安全异常检测
# mcp-alert-rules.yaml
groups:
- name: mcp-security-alerts
rules:
# 告警:认证失败率过高
- alert: MCPAuthFailureSpike
expr: |
rate(mcp_auth_failures_total[5m]) > 10
for: 2m
labels:
severity: critical
annotations:
summary: "MCP 认证失败率异常升高"
description: "过去 5 分钟内认证失败超过 10 次/分钟"
# 告警:工具调用被拒绝率过高
- alert: MCPAccessDeniedSpike
expr: |
rate(mcp_access_denied_total[5m]) > 5
for: 3m
labels:
severity: warning
annotations:
summary: "MCP 工具访问拒绝率异常"
# 告警:单用户调用量异常
- alert: MCPUserAnomalyDetected
expr: |
rate(mcp_tool_calls_total[10m]) by (userId) > 100
for: 5m
labels:
severity: warning
annotations:
summary: "用户 {{ $labels.userId }} 工具调用量异常"7.2 合规审计要点
| 合规框架 | MCP 审计要求 | 日志保留期 |
|---|---|---|
| SOC 2 | 所有工具调用的身份、时间、操作记录 | ≥ 1 年 |
| GDPR | 涉及个人数据的工具调用需记录法律依据 | ≥ 3 年 |
| HIPAA | 医疗数据访问的完整审计追踪 | ≥ 6 年 |
| SOX | 财务数据相关操作的不可篡改日志 | ≥ 7 年 |
实战案例:为生产 MCP Server 实施安全加固
场景描述
一家 SaaS 公司部署了 3 个 MCP Server(数据库查询、文件系统、Slack 通知),供内部 AI 助手使用。初始部署没有认证,所有工具对所有用户开放。
加固步骤
第 1 天:部署 MCP Gateway(Lasso 开源版)
├── 配置 TLS 终止
├── 接入 Auth0 作为 OAuth 2.1 提供商
└── 启用审计日志
第 2 天:实施 RBAC
├── 定义 3 个角色:analyst(只读)、developer(读写)、admin(全部)
├── 配置工具级 ACL
└── 数据库工具限制 analyst 只能 SELECT
第 3 天:输入验证加固
├── 为每个工具添加 Zod Schema 验证
├── 实现路径遍历防护
└── SQL 查询参数化强制
第 4 天:监控与告警
├── 部署 Grafana + Loki 日志聚合
├── 配置认证失败告警
└── 配置异常调用量告警
第 5 天:安全测试
├── 工具投毒模拟测试
├── 路径遍历渗透测试
└── Token 过期和刷新测试案例分析
- 关键决策:选择 MCP Gateway 而非在每个 Server 中单独实现安全逻辑,将安全策略集中管理
- 成本影响:Auth0 免费层(7,500 MAU)+ Lasso 开源 + Grafana 开源 = 初期零成本
- 效果:从”任何人可调用任何工具”到”经过认证的用户只能调用授权的工具”,攻击面缩小约 90%
避坑指南
❌ 常见错误
-
使用长期有效的 API Key 代替 OAuth Token
- 问题:API Key 一旦泄露,攻击者可以无限期使用;无法实现细粒度权限控制
- 正确做法:使用 OAuth 2.1 短期 access_token(建议 15-60 分钟过期)+ refresh_token 轮换
-
MCP Server 暴露所有工具给所有连接者
- 问题:违反最小权限原则,增加攻击面,且过多工具会导致 LLM “Context Rot”
- 正确做法:通过 MCP Gateway 或 Server 内置 RBAC 实现工具级访问控制,默认策略设为 deny
-
信任 AI 模型传递的工具参数
- 问题:AI 模型可能被 Prompt 注入攻击操纵,传递恶意参数(路径遍历、SQL 注入)
- 正确做法:对所有工具参数进行 Schema 验证 + 语义验证 + 白名单过滤,永远不信任输入
-
stdio 传输模式下忽略安全
- 问题:虽然 stdio 是本地通信,但恶意 MCP Server 仍可读取本地文件、执行命令
- 正确做法:即使 stdio 模式也要实现进程沙箱(Docker/gVisor)和文件系统权限隔离
-
审计日志中记录敏感信息明文
- 问题:日志中包含密码、Token、个人数据,日志泄露等于数据泄露
- 正确做法:日志写入前对敏感字段进行脱敏处理(
[REDACTED]),使用结构化日志格式
-
忽略工具描述的完整性验证
- 问题:工具投毒攻击通过修改工具描述注入恶意指令,传统安全审查无法检测
- 正确做法:对工具定义进行签名验证,运行时检测描述变更并告警
✅ 最佳实践
- 纵深防御(Defense in Depth):在 Gateway、Server、工具三个层面都实施安全控制,不依赖单一防线
- 默认拒绝(Default Deny):ACL 默认策略设为拒绝,显式授权每个角色可访问的工具
- 最小权限(Least Privilege):使用 OBO 模式,Gateway 以用户身份而非服务账号访问后端
- 短期凭证:access_token 有效期 15-60 分钟,refresh_token 单次使用后轮换
- 不可变审计日志:审计日志写入后不可修改,使用追加写入模式或 WORM 存储
- 定期安全审计:每季度进行 MCP 安全审计,包括工具描述审查、权限矩阵验证、渗透测试
相关资源与延伸阅读
- MCP 官方规范 2025-06-18 — MCP 协议最新规范,包含授权和安全章节
- OWASP GenAI Security Project: MCP Server 安全使用指南 v1.0 — OWASP 发布的第三方 MCP Server 安全使用实践指南
- Elastic Security Labs: MCP 工具攻击向量与防御 — MCP 工具的攻击面分析和防御建议
- The Vulnerable MCP Project — MCP 安全漏洞演示和学习平台
- Kong: MCP 工具治理与安全 — 通过 Gateway 实现工具级 ACL 的实践
- Traefik Hub: MCP Gateway 最佳实践 — OBO 认证和 Gateway 安全配置指南
- WorkOS: MCP 认证入门 — OAuth 2.1 在 MCP 中的实现指南
- Permit.io: MCP 授权策略 — RBAC/ABAC 在 MCP 中的应用策略
参考来源
- MCP Specification 2025-06-18 (2025-06-18)
- MCP Specification 2025-03-26 (2025-03-26)
- Securing MCP: OAuth, mTLS, Zero Trust — dasroot.net (2026-02)
- OAuth 2.1 and MCP — Scalekit (2026-02)
- MCP Server Authentication and Authorization — Grizzly Peak Software (2026-01)
- MCP Security: Dynamic Authorization — Prefactor (2025-12)
- MCP Gateways: A Developer’s Guide — Composio (2026-01)
- Lasso Open Source MCP Security Gateway (2025-04)
- MCP Security Risks & Mitigations — SOCPrime (2026-02)
- Microsoft MCP Governance Framework — WindowsNews (2026-02)
- Securing MCP: Defense-First Architecture — Christian Schneider (2026-02)
- MCP Security Best Practices — The Vulnerable MCP Project (2025-05)
📖 返回 总览与导航 | 上一节:08d-MCP工具模式目录 | 下一节:08f-MCP集成与推荐Server