22b - Agent 权限控制
本文是《AI Agent 实战手册》第 22 章第 2 节。 上一节:22a-AI安全概览 | 下一节:22c-数据隐私与合规 📖 返回 总览与导航
⏱ 阅读时间:80 分钟 | 难度:⭐⭐⭐⭐⭐ 高级 | 前置知识:AI Agent 基础概念、容器/虚拟化基础、安全基础
概述
当 AI Agent 从”回答问题”进化到”自主执行任务”,权限控制就成了安全的核心命题。一个拥有文件读写、终端执行、API 调用能力的 Agent,本质上就是一个自动化的特权用户——如果不加约束,它可能在一次推理循环中完成数据外泄、权限提升甚至系统入侵。本节系统讲解 Agent 权限控制的四大支柱:最小权限原则、沙箱隔离技术、人工审批工作流、工具级访问控制,并提供 Python 和 TypeScript 的完整实现示例。
1. 最小权限原则(Least Privilege / Least Agency)
核心理念
最小权限原则在 Agent 时代被 OWASP 扩展为”最小代理权限”(Least Agency):Agent 只应被授予完成其定义任务所需的最低自主权级别。这不仅包括传统的数据访问权限,还包括工具调用权限、网络访问范围、执行时间限制和自主决策深度。
最小权限的四个维度
┌─────────────────────────────────────────────────────────┐
│ 最小代理权限模型 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 数据权限 │ │ 工具权限 │ │ 网络权限 │ │
│ │ │ │ │ │ │ │
│ │ • 只读 vs │ │ • 白名单 │ │ • 域名白名单 │ │
│ │ 读写 │ │ • 参数约束 │ │ • 端口限制 │ │
│ │ • 目录范围 │ │ • 调用频率 │ │ • 出站控制 │ │
│ │ • 数据分类 │ │ • 审批要求 │ │ • DNS 过滤 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 执行权限 │ │ 时间权限 │ │ 自主权限 │ │
│ │ │ │ │ │ │ │
│ │ • 沙箱级别 │ │ • 超时限制 │ │ • 决策深度 │ │
│ │ • 资源配额 │ │ • 会话时长 │ │ • 循环次数 │ │
│ │ • 进程隔离 │ │ • 冷却期 │ │ • 升级阈值 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| E2B | AI Agent 沙箱运行时 | 免费(开源)/ Pro $29/月起 | Agent 代码执行隔离 |
| Firecracker | MicroVM 虚拟化引擎 | 免费(开源) | 高性能轻量级 VM 隔离 |
| gVisor | 用户态内核沙箱 | 免费(开源) | Kubernetes Pod 沙箱化 |
| Wassette | WebAssembly Agent 工具运行时 | 免费(开源) | MCP 工具沙箱执行 |
| Traefik Hub MCP Gateway | MCP 网关 + TBAC 访问控制 | 免费(社区版)/ 企业版联系销售 | Agent 工具调用治理 |
| Hoop.dev | 操作级审批平台 | 免费(基础版)/ 企业版联系销售 | Agent 敏感操作审批 |
| Agent Consent Protocol (ACP) | Agent 操作审批代理 | 免费(开源) | 工具调用拦截与审批 |
| Open Policy Agent (OPA) | 通用策略引擎 | 免费(开源) | RBAC/ABAC 策略执行 |
| Docker | 容器化隔离 | 免费(开源)/ Business $24/月 | 基础 Agent 隔离 |
| Daytona | 开发环境沙箱 | 免费(开源)/ 企业版联系销售 | Agent 开发环境隔离 |
操作步骤:实施最小权限
步骤 1:权限审计——盘点 Agent 当前权限
首先列出 Agent 可访问的所有资源和工具,评估每项权限的必要性:
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class PermissionLevel(Enum):
NONE = "none"
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
class Necessity(Enum):
REQUIRED = "required" # 核心功能必需
OPTIONAL = "optional" # 增强功能,可降级
UNNECESSARY = "unnecessary" # 不需要,应移除
@dataclass
class PermissionAuditEntry:
resource: str
current_level: PermissionLevel
required_level: PermissionLevel
necessity: Necessity
justification: str
risk_if_compromised: str
@dataclass
class AgentPermissionAudit:
agent_name: str
entries: list[PermissionAuditEntry] = field(default_factory=list)
def add(self, entry: PermissionAuditEntry):
self.entries.append(entry)
def get_over_privileged(self) -> list[PermissionAuditEntry]:
"""找出权限过高的条目"""
return [
e for e in self.entries
if e.current_level.value > e.required_level.value
or e.necessity == Necessity.UNNECESSARY
]
def generate_report(self) -> str:
over = self.get_over_privileged()
lines = [f"=== {self.agent_name} 权限审计报告 ==="]
lines.append(f"总权限条目: {len(self.entries)}")
lines.append(f"过度授权条目: {len(over)}")
for e in over:
lines.append(
f" ⚠️ {e.resource}: "
f"当前={e.current_level.value} → 建议={e.required_level.value} "
f"({e.necessity.value})"
)
return "\n".join(lines)
# 使用示例
audit = AgentPermissionAudit("CodingAssistant")
audit.add(PermissionAuditEntry(
resource="文件系统 /workspace/",
current_level=PermissionLevel.WRITE,
required_level=PermissionLevel.WRITE,
necessity=Necessity.REQUIRED,
justification="需要创建和修改项目文件",
risk_if_compromised="项目代码被篡改",
))
audit.add(PermissionAuditEntry(
resource="文件系统 ~/.ssh/",
current_level=PermissionLevel.READ,
required_level=PermissionLevel.NONE,
necessity=Necessity.UNNECESSARY,
justification="Agent 不需要访问 SSH 密钥",
risk_if_compromised="SSH 密钥泄露,服务器被入侵",
))
audit.add(PermissionAuditEntry(
resource="终端命令执行",
current_level=PermissionLevel.ADMIN,
required_level=PermissionLevel.EXECUTE,
necessity=Necessity.REQUIRED,
justification="需要运行构建和测试命令",
risk_if_compromised="任意命令执行,系统被完全控制",
))
print(audit.generate_report())步骤 2:定义权限策略文件
将 Agent 权限以声明式配置管理,便于审计和版本控制:
# agent-permissions.yaml — Agent 权限策略声明
agent:
name: "CodingAssistant"
version: "1.0"
description: "项目编码辅助 Agent"
permissions:
file_system:
allowed_paths:
- path: "/workspace/project/"
access: "read_write"
max_file_size: "10MB"
- path: "/tmp/agent-workspace/"
access: "read_write"
max_file_size: "50MB"
denied_paths:
- "~/.ssh/"
- "~/.aws/"
- "~/.config/"
- ".env"
- ".env.*"
- "**/*.pem"
- "**/*.key"
terminal:
sandbox: "docker" # docker | gvisor | firecracker | wasm
allowed_commands:
- "npm"
- "node"
- "python"
- "pip"
- "git status"
- "git diff"
- "git log"
- "cargo"
- "make"
- "cat"
- "ls"
denied_patterns:
- "curl *"
- "wget *"
- "ssh *"
- "scp *"
- "nc *"
- "rm -rf /"
- "| base64"
- "> /dev/tcp"
- "$(cat .env)"
timeout_seconds: 30
max_output_size: "1MB"
network:
outbound:
allowed_domains:
- "registry.npmjs.org"
- "pypi.org"
- "docs.python.org"
- "developer.mozilla.org"
denied_ports: [22, 23, 25, 3389]
inbound: "deny_all"
resources:
max_memory: "512MB"
max_cpu_percent: 50
max_disk: "1GB"
max_processes: 10
approval_required:
- action: "git push"
approvers: ["developer"]
timeout: "5m"
- action: "npm install *"
approvers: ["developer"]
timeout: "2m"
- action: "rm -rf *"
approvers: ["developer", "lead"]
timeout: "5m"
- action: "database_write"
approvers: ["developer", "dba"]
timeout: "10m"步骤 3:实施权限降级策略
从最高权限逐步收紧,而非从零开始添加:
class PermissionEnforcer:
"""Agent 权限执行器 — 强制执行最小权限"""
def __init__(self, policy_path: str):
self.policy = self._load_policy(policy_path)
def check_file_access(self, path: str, access_type: str) -> bool:
"""检查文件访问权限"""
import os
abs_path = os.path.abspath(path)
# 检查拒绝列表(优先级最高)
for denied in self.policy.get("denied_paths", []):
if self._match_pattern(abs_path, denied):
self._log_denied("file_access", path, "在拒绝列表中")
return False
# 检查允许列表
for allowed in self.policy.get("allowed_paths", []):
if abs_path.startswith(allowed["path"]):
if access_type == "write" and allowed["access"] == "read":
self._log_denied("file_access", path, "只读路径")
return False
return True
# 默认拒绝(deny-by-default)
self._log_denied("file_access", path, "不在允许列表中")
return False
def check_command(self, command: str) -> bool:
"""检查终端命令权限"""
cmd_parts = command.strip().split()
if not cmd_parts:
return False
base_cmd = cmd_parts[0]
# 检查拒绝模式
for pattern in self.policy.get("denied_patterns", []):
if self._match_command_pattern(command, pattern):
self._log_denied("command", command, f"匹配拒绝模式: {pattern}")
return False
# 检查允许列表
allowed = self.policy.get("allowed_commands", [])
if base_cmd in allowed or command in allowed:
return True
self._log_denied("command", command, "不在允许列表中")
return False
def _match_pattern(self, path: str, pattern: str) -> bool:
import fnmatch
expanded = os.path.expanduser(pattern)
return fnmatch.fnmatch(path, expanded)
def _match_command_pattern(self, cmd: str, pattern: str) -> bool:
import fnmatch
return fnmatch.fnmatch(cmd, pattern)
def _load_policy(self, path: str) -> dict:
import yaml
with open(path) as f:
return yaml.safe_load(f)
def _log_denied(self, category: str, target: str, reason: str):
print(f"🚫 权限拒绝 [{category}] {target} — {reason}")2. 沙箱隔离技术
为什么标准容器不够?
标准 Docker 容器共享宿主机内核,容器逃逸漏洞(如 CVE-2024-21626 Leaky Vessels)可以让攻击者突破隔离。对于执行 AI 生成代码的 Agent,需要更强的隔离边界。
沙箱隔离技术对比
隔离强度 ▲
│
最强 │ ┌──────────────┐
│ │ Firecracker │ 独立内核 + 极简设备模型
│ │ MicroVM │ 启动 ~125ms,内存开销 ~5MB
│ └──────────────┘
│
强 │ ┌──────────────┐
│ │ gVisor │ 用户态内核拦截系统调用
│ │ (runsc) │ 兼容 OCI,K8s 原生支持
│ └──────────────┘
│
中强 │ ┌──────────────┐
│ │ Wasm/WASI │ 能力模型,默认零权限
│ │ (Wasmtime) │ 极快启动,跨平台
│ └──────────────┘
│
中 │ ┌──────────────┐
│ │ Docker + │ Seccomp + AppArmor + 只读根
│ │ 加固配置 │ 共享内核,需额外加固
│ └──────────────┘
│
弱 │ ┌──────────────┐
│ │ 标准 Docker │ 默认配置,共享内核
│ │ (默认) │ 容器逃逸风险
│ └──────────────┘
│
└──────────────────────────────────────────▶ 性能开销
低 高沙箱技术详细对比
| 技术 | 隔离级别 | 启动时间 | 内存开销 | K8s 支持 | 适用场景 | 局限性 |
|---|---|---|---|---|---|---|
| Firecracker MicroVM | VM 级(独立内核) | ~125ms | ~5MB/实例 | 需适配 | 高安全要求的代码执行 | 需要 KVM 支持,不支持 GPU |
| gVisor (runsc) | 用户态内核 | ~毫秒级 | ~15-50MB | 原生 RuntimeClass | K8s 环境下的 Agent 隔离 | 部分系统调用不兼容 |
| Wasm/WASI (Wasmtime) | 能力模型沙箱 | <1ms | ~KB 级 | 通过 Spin/Fermyon | MCP 工具执行、轻量计算 | 生态尚不成熟,I/O 受限 |
| Docker + 加固 | 命名空间 + cgroup | ~秒级 | ~数十 MB | 原生 | 通用 Agent 隔离 | 共享内核,需额外加固 |
| E2B | Firecracker MicroVM | ~400ms | 托管 | N/A(云服务) | AI Agent 代码执行 | 依赖云服务,有成本 |
| Modal | 容器 + 隔离 | ~秒级 | 托管 | N/A(云服务) | ML 工作负载、GPU 任务 | Python 优先 |
操作步骤:部署各类沙箱
方案 A:Docker 加固沙箱
最容易上手的方案,适合大多数场景。关键是在默认配置基础上添加安全加固:
# Dockerfile.agent-sandbox — Agent 安全沙箱镜像
FROM python:3.12-slim AS agent-sandbox
# 创建非 root 用户
RUN groupadd -r agent && useradd -r -g agent -d /workspace -s /bin/bash agent
# 安装必要工具(最小化)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /workspace
RUN chown agent:agent /workspace
# 切换到非 root 用户
USER agent
# 默认入口
CMD ["bash"]# docker_sandbox.py — Docker 加固沙箱管理器
import subprocess
import json
import uuid
from dataclasses import dataclass
from typing import Optional
@dataclass
class SandboxConfig:
image: str = "agent-sandbox:latest"
memory_limit: str = "512m"
cpu_quota: int = 50000 # 50% CPU
network_mode: str = "none" # 默认禁止网络
read_only_root: bool = True
no_new_privileges: bool = True
timeout_seconds: int = 30
allowed_paths: list[str] = None
def __post_init__(self):
if self.allowed_paths is None:
self.allowed_paths = []
class DockerSandbox:
"""Docker 加固沙箱 — 用于 Agent 代码执行"""
def __init__(self, config: SandboxConfig):
self.config = config
self.container_id: Optional[str] = None
def execute(self, command: str, workspace_path: str = "") -> dict:
"""在沙箱中执行命令"""
container_name = f"agent-sandbox-{uuid.uuid4().hex[:8]}"
docker_cmd = [
"docker", "run",
"--name", container_name,
"--rm", # 执行后自动删除
"--memory", self.config.memory_limit,
"--cpu-quota", str(self.config.cpu_quota),
"--network", self.config.network_mode,
"--security-opt", "no-new-privileges:true",
"--cap-drop", "ALL", # 移除所有 Linux capabilities
"--pids-limit", "64", # 限制进程数
"--tmpfs", "/tmp:rw,noexec,nosuid,size=64m",
]
# 只读根文件系统
if self.config.read_only_root:
docker_cmd.append("--read-only")
# 挂载工作目录(如果提供)
if workspace_path:
docker_cmd.extend([
"-v", f"{workspace_path}:/workspace:rw"
])
# Seccomp 配置(限制系统调用)
docker_cmd.extend([
"--security-opt", "seccomp=default.json",
])
docker_cmd.extend([
self.config.image,
"bash", "-c", command,
])
try:
result = subprocess.run(
docker_cmd,
capture_output=True,
text=True,
timeout=self.config.timeout_seconds,
)
return {
"success": result.returncode == 0,
"stdout": result.stdout[:10000], # 限制输出大小
"stderr": result.stderr[:5000],
"exit_code": result.returncode,
}
except subprocess.TimeoutExpired:
# 超时强制终止
subprocess.run(["docker", "kill", container_name],
capture_output=True)
return {
"success": False,
"stdout": "",
"stderr": "执行超时,已强制终止",
"exit_code": -1,
}
# 使用示例
sandbox = DockerSandbox(SandboxConfig(
memory_limit="256m",
timeout_seconds=15,
network_mode="none",
))
result = sandbox.execute("python -c 'print(2 + 2)'")
print(result) # {"success": True, "stdout": "4\n", ...}方案 B:Firecracker MicroVM 沙箱
Firecracker 是 AWS Lambda 和 Fargate 背后的虚拟化引擎,提供 VM 级隔离但启动时间仅约 125ms。每个 MicroVM 拥有独立内核,即使发生内核漏洞也不会影响宿主机。
# firecracker_sandbox.py — Firecracker MicroVM 沙箱(概念实现)
import aiohttp
import asyncio
import json
from dataclasses import dataclass
@dataclass
class MicroVMConfig:
vcpu_count: int = 1
mem_size_mib: int = 128
kernel_image: str = "/opt/firecracker/vmlinux"
rootfs_image: str = "/opt/firecracker/rootfs.ext4"
network_enabled: bool = False
timeout_seconds: int = 30
class FirecrackerSandbox:
"""Firecracker MicroVM 沙箱 — VM 级隔离"""
def __init__(self, config: MicroVMConfig):
self.config = config
self.socket_path = "/tmp/firecracker.socket"
async def create_vm(self):
"""创建并启动 MicroVM"""
# 配置内核
await self._api_call("PUT", "/boot-source", {
"kernel_image_path": self.config.kernel_image,
"boot_args": "console=ttyS0 reboot=k panic=1 pci=off"
})
# 配置根文件系统
await self._api_call("PUT", "/drives/rootfs", {
"drive_id": "rootfs",
"path_on_host": self.config.rootfs_image,
"is_root_device": True,
"is_read_only": False,
})
# 配置机器资源
await self._api_call("PUT", "/machine-config", {
"vcpu_count": self.config.vcpu_count,
"mem_size_mib": self.config.mem_size_mib,
})
# 网络配置(可选)
if not self.config.network_enabled:
pass # 不配置网络接口 = 无网络访问
# 启动 VM
await self._api_call("PUT", "/actions", {
"action_type": "InstanceStart"
})
async def execute_in_vm(self, command: str) -> dict:
"""在 MicroVM 中执行命令(通过 vsock 或串口)"""
# 实际实现中通过 vsock 通信
# 这里展示概念流程
try:
result = await asyncio.wait_for(
self._send_command(command),
timeout=self.config.timeout_seconds,
)
return {"success": True, "output": result}
except asyncio.TimeoutError:
await self.destroy_vm()
return {"success": False, "output": "MicroVM 执行超时"}
async def destroy_vm(self):
"""销毁 MicroVM"""
await self._api_call("PUT", "/actions", {
"action_type": "SendCtrlAltDel"
})
async def _api_call(self, method: str, path: str, data: dict):
"""调用 Firecracker API"""
connector = aiohttp.UnixConnector(path=self.socket_path)
async with aiohttp.ClientSession(connector=connector) as session:
url = f"http://localhost{path}"
async with session.request(method, url, json=data) as resp:
return await resp.json() if resp.content_length else None
async def _send_command(self, command: str) -> str:
"""通过 vsock 发送命令到 VM(简化示意)"""
# 实际实现需要 vsock 客户端
return f"[MicroVM output for: {command}]"方案 C:gVisor 沙箱(Kubernetes 环境)
gVisor 在用户态实现了一个”Sentry”内核,拦截容器的系统调用,大幅缩小内核攻击面。在 GKE 上可以通过 RuntimeClass 直接启用。
# gvisor-runtime-class.yaml — 定义 gVisor 运行时类
apiVersion: node.k8s.io/v1
kind: RuntimeClass
metadata:
name: gvisor
handler: runsc # gVisor 的 OCI 运行时
---
# agent-pod-gvisor.yaml — 使用 gVisor 运行的 Agent Pod
apiVersion: v1
kind: Pod
metadata:
name: agent-sandbox
labels:
app: agent-sandbox
security-level: high
spec:
runtimeClassName: gvisor # 使用 gVisor 运行时
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: agent
image: agent-sandbox:latest
resources:
limits:
memory: "512Mi"
cpu: "500m"
requests:
memory: "256Mi"
cpu: "250m"
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
volumeMounts:
- name: workspace
mountPath: /workspace
- name: tmp
mountPath: /tmp
volumes:
- name: workspace
emptyDir:
sizeLimit: "1Gi"
- name: tmp
emptyDir:
medium: Memory
sizeLimit: "64Mi"
# 网络策略:默认拒绝所有出站
# 需要配合 NetworkPolicy 使用# network-policy-agent.yaml — Agent Pod 网络策略
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: agent-sandbox-netpol
spec:
podSelector:
matchLabels:
app: agent-sandbox
policyTypes:
- Ingress
- Egress
ingress: [] # 拒绝所有入站
egress:
# 只允许访问特定域名(通过 DNS)
- to:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: kube-system
ports:
- protocol: UDP
port: 53方案 D:WebAssembly (Wasm/WASI) 沙箱
Wasm 采用能力模型(Capability-based Security),默认零权限,所有系统资源访问必须显式授予。微软开源的 Wassette 项目专门为 AI Agent 工具执行设计了基于 Wasm 的安全运行时。
// wasm_sandbox.ts — Wasm 沙箱概念实现(TypeScript)
interface WasmPermissions {
fileSystem?: {
readPaths?: string[];
writePaths?: string[];
};
network?: {
allowedHosts?: string[];
allowedPorts?: number[];
};
environment?: {
allowedVars?: string[];
};
maxMemoryMB?: number;
maxExecutionMs?: number;
}
interface WasmExecutionResult {
success: boolean;
output: string;
error?: string;
resourceUsage: {
memoryUsedBytes: number;
executionTimeMs: number;
};
}
class WasmSandbox {
private permissions: WasmPermissions;
constructor(permissions: WasmPermissions) {
this.permissions = permissions;
}
/**
* 在 Wasm 沙箱中执行工具
* 所有权限必须显式声明,默认全部拒绝
*/
async executeModule(
wasmPath: string,
args: string[],
): Promise<WasmExecutionResult> {
const startTime = Date.now();
// 构建 WASI 预打开目录(只授予声明的路径)
const preopens: Record<string, string> = {};
if (this.permissions.fileSystem?.readPaths) {
for (const p of this.permissions.fileSystem.readPaths) {
preopens[p] = p; // 映射宿主路径到沙箱路径
}
}
// 构建环境变量(只传递允许的变量)
const env: Record<string, string> = {};
if (this.permissions.environment?.allowedVars) {
for (const varName of this.permissions.environment.allowedVars) {
const value = process.env[varName];
if (value) env[varName] = value;
}
}
try {
// 使用 Wasmtime/Wasmer 运行模块
// 实际实现中调用 Wasmtime CLI 或 SDK
const result = await this.runWasmModule(wasmPath, args, {
preopens,
env,
maxMemory: (this.permissions.maxMemoryMB ?? 64) * 1024 * 1024,
});
const executionTimeMs = Date.now() - startTime;
// 检查执行时间限制
if (
this.permissions.maxExecutionMs &&
executionTimeMs > this.permissions.maxExecutionMs
) {
return {
success: false,
output: '',
error: `执行超时: ${executionTimeMs}ms > ${this.permissions.maxExecutionMs}ms`,
resourceUsage: { memoryUsedBytes: 0, executionTimeMs },
};
}
return {
success: true,
output: result.stdout,
resourceUsage: {
memoryUsedBytes: result.memoryUsed,
executionTimeMs,
},
};
} catch (error) {
return {
success: false,
output: '',
error: String(error),
resourceUsage: {
memoryUsedBytes: 0,
executionTimeMs: Date.now() - startTime,
},
};
}
}
private async runWasmModule(
path: string,
args: string[],
opts: { preopens: Record<string, string>; env: Record<string, string>; maxMemory: number },
) {
// 实际实现中调用 Wasmtime SDK
// 这里为概念示意
return { stdout: '', memoryUsed: 0 };
}
}
// 使用示例:为 MCP 工具创建最小权限沙箱
const toolSandbox = new WasmSandbox({
fileSystem: {
readPaths: ['/workspace/project/src'],
// 注意:没有 writePaths = 只读
},
network: undefined, // 无网络权限
maxMemoryMB: 32,
maxExecutionMs: 5000,
});提示词模板:沙箱方案选择
你是一位基础设施安全架构师。请根据以下 Agent 系统需求,推荐最合适的沙箱隔离方案。
## Agent 信息
- Agent 类型:[编码Agent / 数据分析Agent / 自动化工作流Agent / 客服Agent]
- 需要执行的操作:[代码执行 / 文件读写 / API调用 / 数据库查询 / 浏览器操作]
- 部署环境:[Kubernetes / 裸机 / 云函数 / 本地开发]
- 并发需求:[每秒执行次数]
- 安全要求:[标准 / 高 / 极高(金融/医疗)]
- 预算约束:[有限 / 中等 / 充足]
- GPU 需求:[是/否]
## 请输出
### 1. 推荐方案
从以下选项中推荐最合适的方案,并说明理由:
- Docker 加固
- Firecracker MicroVM
- gVisor
- Wasm/WASI
- 托管平台(E2B/Modal)
- 组合方案
### 2. 配置模板
提供推荐方案的完整配置文件。
### 3. 安全加固清单
列出该方案需要额外加固的安全措施。
### 4. 性能基准
预估启动时间、内存开销、吞吐量。3. 人工审批工作流(Human-in-the-Loop)
为什么需要审批工作流?
Agent 的自主性是一把双刃剑。对于高风险操作(数据删除、代码部署、资金转账、权限变更),必须引入人工审批环节。审批工作流的核心思想是:Agent 可以自主完成低风险任务,但高风险操作必须经过人类确认。
审批工作流决策模型
Agent 发起操作
│
▼
┌──────────────┐
│ 风险评估引擎 │
└──────┬───────┘
│
├── 低风险(读取、查询、摘要)──────▶ 自动执行 ✅
│
├── 中风险(文件修改、API 调用)───▶ 异步通知 + 自动执行 📋
│
├── 高风险(部署、删除、安装依赖)─▶ 同步审批 ⏸️
│ │
│ ┌─────▼─────┐
│ │ 审批者审查 │
│ │ (Slack/ │
│ │ Teams/API)│
│ └─────┬─────┘
│ │
│ ┌─────┴─────┐
│ │ │
│ 批准 ✅ 拒绝 ❌
│ │ │
│ 执行操作 记录并通知
│
└── 极高风险(权限变更、资金操作)─▶ 多人审批 + MFA ⏸️⏸️
│
需要 ≥2 人批准
+ 多因素认证操作步骤:构建审批工作流
步骤 1:定义风险分级策略
# approval_workflow.py — Agent 操作审批工作流
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional
import time
import uuid
import re
class RiskLevel(Enum):
LOW = "low" # 自动执行
MEDIUM = "medium" # 通知 + 自动执行
HIGH = "high" # 需要审批
CRITICAL = "critical" # 多人审批 + MFA
@dataclass
class ApprovalRequest:
id: str
agent_id: str
action: str
parameters: dict
risk_level: RiskLevel
justification: str
timestamp: float
status: str = "pending" # pending | approved | denied | expired
approver: Optional[str] = None
approved_at: Optional[float] = None
class RiskAssessor:
"""操作风险评估引擎"""
def __init__(self):
self.rules: list[tuple[Callable, RiskLevel]] = []
self._register_default_rules()
def _register_default_rules(self):
"""注册默认风险评估规则"""
# 极高风险操作
self.rules.append((
lambda action, params: action in [
"permission_change", "fund_transfer",
"delete_database", "modify_credentials"
],
RiskLevel.CRITICAL,
))
# 高风险操作
self.rules.append((
lambda action, params: action in [
"deploy_production", "git_push", "npm_install",
"delete_files", "execute_sql_write",
],
RiskLevel.HIGH,
))
# 中风险操作
self.rules.append((
lambda action, params: action in [
"file_write", "api_call", "send_email",
"create_branch",
],
RiskLevel.MEDIUM,
))
# 基于参数的风险升级
self.rules.append((
lambda action, params: (
action == "execute_command"
and any(
p in str(params.get("command", ""))
for p in ["rm ", "drop ", "delete ", "truncate "]
)
),
RiskLevel.HIGH,
))
def assess(self, action: str, parameters: dict) -> RiskLevel:
"""评估操作风险级别"""
max_risk = RiskLevel.LOW
for rule_fn, risk_level in self.rules:
if rule_fn(action, parameters):
if list(RiskLevel).index(risk_level) > list(RiskLevel).index(max_risk):
max_risk = risk_level
return max_risk
class ApprovalWorkflow:
"""Agent 操作审批工作流管理器"""
def __init__(self, risk_assessor: RiskAssessor):
self.risk_assessor = risk_assessor
self.pending_requests: dict[str, ApprovalRequest] = {}
self.approval_timeout = 300 # 5 分钟超时
async def request_action(
self,
agent_id: str,
action: str,
parameters: dict,
justification: str = "",
) -> dict:
"""Agent 请求执行操作"""
risk = self.risk_assessor.assess(action, parameters)
if risk == RiskLevel.LOW:
return {"approved": True, "method": "auto", "risk": risk.value}
if risk == RiskLevel.MEDIUM:
# 异步通知,但自动执行
await self._notify_observers(action, parameters, risk)
return {"approved": True, "method": "auto_with_notification", "risk": risk.value}
# HIGH 和 CRITICAL 需要人工审批
request = ApprovalRequest(
id=str(uuid.uuid4()),
agent_id=agent_id,
action=action,
parameters=self._redact_sensitive(parameters),
risk_level=risk,
justification=justification,
timestamp=time.time(),
)
self.pending_requests[request.id] = request
# 发送审批请求到通知渠道
await self._send_approval_request(request)
# 等待审批结果
result = await self._wait_for_approval(request)
return result
async def approve(self, request_id: str, approver: str) -> bool:
"""审批者批准操作"""
request = self.pending_requests.get(request_id)
if not request or request.status != "pending":
return False
request.status = "approved"
request.approver = approver
request.approved_at = time.time()
return True
async def deny(self, request_id: str, approver: str, reason: str = "") -> bool:
"""审批者拒绝操作"""
request = self.pending_requests.get(request_id)
if not request or request.status != "pending":
return False
request.status = "denied"
request.approver = approver
return True
async def _wait_for_approval(self, request: ApprovalRequest) -> dict:
"""等待审批结果(带超时)"""
start = time.time()
while time.time() - start < self.approval_timeout:
if request.status == "approved":
return {
"approved": True,
"method": "human_approval",
"approver": request.approver,
"risk": request.risk_level.value,
}
if request.status == "denied":
return {
"approved": False,
"method": "human_denial",
"approver": request.approver,
"risk": request.risk_level.value,
}
await asyncio.sleep(1)
# 超时 = 拒绝(安全默认值)
request.status = "expired"
return {
"approved": False,
"method": "timeout",
"risk": request.risk_level.value,
}
async def _send_approval_request(self, request: ApprovalRequest):
"""发送审批请求到 Slack/Teams/API"""
message = (
f"🔐 Agent 操作审批请求\n"
f"Agent: {request.agent_id}\n"
f"操作: {request.action}\n"
f"风险级别: {request.risk_level.value}\n"
f"理由: {request.justification}\n"
f"请求 ID: {request.id}\n"
f"超时: {self.approval_timeout}s"
)
print(message) # 实际中发送到 Slack/Teams
async def _notify_observers(self, action, params, risk):
print(f"📋 通知: {action} (风险: {risk.value})")
def _redact_sensitive(self, params: dict) -> dict:
sensitive = {"password", "token", "secret", "key", "credential"}
return {
k: "***" if k.lower() in sensitive else v
for k, v in params.items()
}
import asyncio # 放在文件顶部步骤 2:集成 Agent Consent Protocol (ACP)
ACP 是一个开源的 Agent 操作审批代理,它作为代理层拦截 Agent 的工具调用,要求人类显式批准敏感操作。
// acp_integration.ts — Agent Consent Protocol 集成示例
interface ACPConfig {
/** ACP 代理服务地址 */
proxyUrl: string;
/** 需要审批的操作模式 */
sensitivePatterns: RegExp[];
/** 审批超时(毫秒) */
approvalTimeoutMs: number;
/** 通知渠道 */
notificationChannel: 'slack' | 'teams' | 'webhook';
}
interface ToolCallRequest {
toolName: string;
parameters: Record<string, unknown>;
agentId: string;
sessionId: string;
}
interface ACPDecision {
allowed: boolean;
requiresApproval: boolean;
approvalId?: string;
reason?: string;
}
class AgentConsentProxy {
private config: ACPConfig;
constructor(config: ACPConfig) {
this.config = config;
}
/**
* 拦截工具调用,检查是否需要审批
*/
async interceptToolCall(request: ToolCallRequest): Promise<ACPDecision> {
// 1. 检查是否匹配敏感操作模式
const isSensitive = this.config.sensitivePatterns.some(
(pattern) =>
pattern.test(request.toolName) ||
pattern.test(JSON.stringify(request.parameters)),
);
if (!isSensitive) {
// 非敏感操作,直接放行
return { allowed: true, requiresApproval: false };
}
// 2. 发送审批请求
const approvalId = crypto.randomUUID();
await this.sendApprovalNotification({
approvalId,
toolName: request.toolName,
parameters: this.redactParams(request.parameters),
agentId: request.agentId,
});
// 3. 等待审批结果
const decision = await this.waitForDecision(approvalId);
return decision;
}
private async sendApprovalNotification(details: {
approvalId: string;
toolName: string;
parameters: Record<string, unknown>;
agentId: string;
}): Promise<void> {
// 发送到 Slack/Teams/Webhook
const payload = {
text: `🔐 *Agent 操作审批*\n` +
`> Agent: \`${details.agentId}\`\n` +
`> 工具: \`${details.toolName}\`\n` +
`> 参数: \`${JSON.stringify(details.parameters)}\`\n` +
`> 审批 ID: \`${details.approvalId}\`\n` +
`> 回复 \`/approve ${details.approvalId}\` 批准\n` +
`> 回复 \`/deny ${details.approvalId}\` 拒绝`,
};
await fetch(this.config.proxyUrl + '/notify', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
}
private async waitForDecision(approvalId: string): Promise<ACPDecision> {
const startTime = Date.now();
while (Date.now() - startTime < this.config.approvalTimeoutMs) {
const response = await fetch(
`${this.config.proxyUrl}/decisions/${approvalId}`,
);
const data = await response.json();
if (data.status === 'approved') {
return { allowed: true, requiresApproval: true, approvalId };
}
if (data.status === 'denied') {
return {
allowed: false,
requiresApproval: true,
approvalId,
reason: data.reason,
};
}
// 等待 1 秒后重试
await new Promise((resolve) => setTimeout(resolve, 1000));
}
// 超时默认拒绝
return {
allowed: false,
requiresApproval: true,
approvalId,
reason: '审批超时',
};
}
private redactParams(
params: Record<string, unknown>,
): Record<string, unknown> {
const sensitive = new Set([
'password', 'token', 'secret', 'key', 'credential',
]);
return Object.fromEntries(
Object.entries(params).map(([k, v]) => [
k,
sensitive.has(k.toLowerCase()) ? '***REDACTED***' : v,
]),
);
}
}
// 使用示例
const acp = new AgentConsentProxy({
proxyUrl: 'http://localhost:8080',
sensitivePatterns: [
/git\s+push/i,
/npm\s+install/i,
/rm\s+-rf/i,
/deploy/i,
/database.*write/i,
/send.*email/i,
],
approvalTimeoutMs: 300_000, // 5 分钟
notificationChannel: 'slack',
});4. 工具级访问控制(RBAC / ABAC / TBAC)
访问控制模型对比
Agent 时代的访问控制不仅要管”谁能访问什么”,还要管”Agent 在什么任务上下文中能调用什么工具”。传统的 RBAC 和 ABAC 正在被 TBAC(Task-Based Access Control)补充。
三种访问控制模型对比
┌─────────────────────────────────────────────────────────────┐
│ RBAC(基于角色的访问控制) │
│ │
│ 用户/Agent ──▶ 角色 ──▶ 权限 │
│ │
│ 示例: │
│ CodingAgent ──▶ developer 角色 ──▶ [file_read, file_write, │
│ git_commit, run_tests] │
│ │
│ 优点:简单直观,易于管理 │
│ 局限:粒度不够细,无法根据上下文动态调整 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ABAC(基于属性的访问控制) │
│ │
│ 决策 = f(主体属性, 资源属性, 环境属性, 操作属性) │
│ │
│ 示例: │
│ IF agent.trust_level >= 3 │
│ AND resource.sensitivity <= "medium" │
│ AND time.is_business_hours == true │
│ AND action.type == "read" │
│ THEN allow │
│ │
│ 优点:细粒度,上下文感知 │
│ 局限:策略复杂,难以调试 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ TBAC(基于任务的访问控制)— Agent 时代新模型 │
│ │
│ 决策 = f(任务目标, 工具需求, 参数约束) │
│ │
│ 三个维度: │
│ ① 任务(Task):业务目标,如"修复 bug #123" │
│ ② 工具(Tool):允许使用的工具集 │
│ ③ 事务(Transaction):参数级约束 │
│ │
│ 示例: │
│ 任务 "修复 bug #123" 允许: │
│ - file_read(仅 src/ 目录) │
│ - file_write(仅 src/ 目录,排除 config/) │
│ - run_tests(无限制) │
│ - git_commit(仅 fix/ 分支) │
│ │
│ 优点:与 Agent 工作模式天然匹配 │
│ 代表:Traefik Hub MCP Gateway │
└─────────────────────────────────────────────────────────────┘操作步骤:实现工具级访问控制
步骤 1:基于 RBAC 的工具权限管理
# tool_access_control.py — Agent 工具级访问控制
from dataclasses import dataclass, field
from typing import Optional
import re
import time
@dataclass
class ToolPermission:
tool_name: str
allowed_actions: list[str] # 允许的操作类型
parameter_constraints: dict # 参数约束
rate_limit: Optional[int] = None # 每分钟最大调用次数
requires_approval: bool = False
time_window: Optional[tuple[int, int]] = None # 允许的时间窗口 (start_hour, end_hour)
@dataclass
class AgentRole:
name: str
description: str
tool_permissions: list[ToolPermission] = field(default_factory=list)
max_concurrent_tools: int = 5
session_timeout_minutes: int = 60
class ToolAccessController:
"""Agent 工具级访问控制器(RBAC 模型)"""
def __init__(self):
self.roles: dict[str, AgentRole] = {}
self.agent_roles: dict[str, str] = {} # agent_id -> role_name
self.call_history: dict[str, list[float]] = {} # tool_name -> timestamps
def define_role(self, role: AgentRole):
"""定义 Agent 角色"""
self.roles[role.name] = role
def assign_role(self, agent_id: str, role_name: str):
"""为 Agent 分配角色"""
if role_name not in self.roles:
raise ValueError(f"角色 {role_name} 不存在")
self.agent_roles[agent_id] = role_name
def check_access(
self,
agent_id: str,
tool_name: str,
action: str,
parameters: dict,
) -> dict:
"""检查 Agent 是否有权调用指定工具"""
# 1. 获取 Agent 角色
role_name = self.agent_roles.get(agent_id)
if not role_name:
return self._deny("Agent 未分配角色")
role = self.roles.get(role_name)
if not role:
return self._deny("角色不存在")
# 2. 查找工具权限
tool_perm = next(
(tp for tp in role.tool_permissions if tp.tool_name == tool_name),
None,
)
if not tool_perm:
return self._deny(f"角色 {role_name} 无权使用工具 {tool_name}")
# 3. 检查操作类型
if action not in tool_perm.allowed_actions:
return self._deny(f"操作 {action} 不在允许列表中")
# 4. 检查参数约束
constraint_result = self._check_constraints(
parameters, tool_perm.parameter_constraints
)
if not constraint_result["passed"]:
return self._deny(f"参数约束违反: {constraint_result['reason']}")
# 5. 检查速率限制
if tool_perm.rate_limit:
if not self._check_rate_limit(tool_name, tool_perm.rate_limit):
return self._deny(f"超过速率限制: {tool_perm.rate_limit}/分钟")
# 6. 检查时间窗口
if tool_perm.time_window:
current_hour = time.localtime().tm_hour
start, end = tool_perm.time_window
if not (start <= current_hour < end):
return self._deny(f"不在允许时间窗口内: {start}:00-{end}:00")
# 7. 检查是否需要审批
if tool_perm.requires_approval:
return {
"allowed": True,
"requires_approval": True,
"reason": "此操作需要人工审批",
}
# 记录调用
self._record_call(tool_name)
return {"allowed": True, "requires_approval": False}
def _check_constraints(self, params: dict, constraints: dict) -> dict:
"""检查参数约束"""
for key, constraint in constraints.items():
value = params.get(key)
if value is None:
continue
# 正则匹配约束
if "pattern" in constraint:
if not re.match(constraint["pattern"], str(value)):
return {
"passed": False,
"reason": f"{key} 不匹配模式 {constraint['pattern']}",
}
# 白名单约束
if "allowed_values" in constraint:
if value not in constraint["allowed_values"]:
return {
"passed": False,
"reason": f"{key}={value} 不在允许值列表中",
}
# 黑名单约束
if "denied_values" in constraint:
if value in constraint["denied_values"]:
return {
"passed": False,
"reason": f"{key}={value} 在拒绝值列表中",
}
# 范围约束
if "max_length" in constraint:
if len(str(value)) > constraint["max_length"]:
return {
"passed": False,
"reason": f"{key} 超过最大长度 {constraint['max_length']}",
}
return {"passed": True}
def _check_rate_limit(self, tool_name: str, limit: int) -> bool:
history = self.call_history.get(tool_name, [])
cutoff = time.time() - 60
recent = [t for t in history if t > cutoff]
self.call_history[tool_name] = recent
return len(recent) < limit
def _record_call(self, tool_name: str):
if tool_name not in self.call_history:
self.call_history[tool_name] = []
self.call_history[tool_name].append(time.time())
def _deny(self, reason: str) -> dict:
return {"allowed": False, "requires_approval": False, "reason": reason}
# ===== 使用示例 =====
controller = ToolAccessController()
# 定义角色
developer_role = AgentRole(
name="developer",
description="开发者 Agent 角色",
tool_permissions=[
ToolPermission(
tool_name="file_read",
allowed_actions=["read"],
parameter_constraints={
"path": {
"pattern": r"^/workspace/project/.*", # 只允许项目目录
"denied_values": ["/workspace/project/.env"],
},
},
rate_limit=100,
),
ToolPermission(
tool_name="file_write",
allowed_actions=["write", "create"],
parameter_constraints={
"path": {
"pattern": r"^/workspace/project/src/.*", # 只允许 src 目录
"max_length": 200,
},
},
rate_limit=50,
),
ToolPermission(
tool_name="terminal",
allowed_actions=["execute"],
parameter_constraints={
"command": {
"pattern": r"^(npm|node|python|cargo|git\s+(status|diff|log)).*",
},
},
rate_limit=30,
),
ToolPermission(
tool_name="git_push",
allowed_actions=["execute"],
parameter_constraints={},
requires_approval=True, # 推送代码需要审批
),
],
)
controller.define_role(developer_role)
controller.assign_role("coding-agent-001", "developer")
# 检查权限
result = controller.check_access(
agent_id="coding-agent-001",
tool_name="file_read",
action="read",
parameters={"path": "/workspace/project/src/main.py"},
)
print(result) # {"allowed": True, ...}
result = controller.check_access(
agent_id="coding-agent-001",
tool_name="file_read",
action="read",
parameters={"path": "/workspace/project/.env"},
)
print(result) # {"allowed": False, "reason": "参数约束违反: ..."}步骤 2:使用 Open Policy Agent (OPA) 实现 ABAC
OPA 是 CNCF 毕业项目,提供通用的策略引擎,可以用 Rego 语言编写细粒度的访问控制策略。
# agent_policy.rego — OPA 策略:Agent 工具访问控制
package agent.toolaccess
import rego.v1
# 默认拒绝
default allow := false
# 规则 1:开发者角色可以读取项目文件
allow if {
input.agent.role == "developer"
input.tool == "file_read"
startswith(input.parameters.path, "/workspace/project/")
not is_sensitive_path(input.parameters.path)
}
# 规则 2:开发者角色可以写入 src 目录
allow if {
input.agent.role == "developer"
input.tool == "file_write"
startswith(input.parameters.path, "/workspace/project/src/")
input.parameters.file_size_bytes <= 1048576 # 最大 1MB
}
# 规则 3:只在工作时间允许部署操作
allow if {
input.agent.role == "deployer"
input.tool == "deploy"
is_business_hours
input.environment != "production" # 非生产环境可自动部署
}
# 规则 4:生产部署需要审批标记
allow if {
input.agent.role == "deployer"
input.tool == "deploy"
input.environment == "production"
input.approval.status == "approved"
input.approval.approver_count >= 2 # 至少 2 人审批
}
# 规则 5:数据库查询只允许只读操作
allow if {
input.agent.role == "analyst"
input.tool == "database_query"
is_read_only_query(input.parameters.query)
not contains_sensitive_table(input.parameters.query)
}
# 辅助函数
is_sensitive_path(path) if {
sensitive_patterns := [".env", ".ssh", ".aws", ".key", ".pem"]
some pattern in sensitive_patterns
contains(path, pattern)
}
is_business_hours if {
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 18
}
is_read_only_query(query) if {
upper_query := upper(query)
startswith(upper_query, "SELECT")
not contains(upper_query, "INSERT")
not contains(upper_query, "UPDATE")
not contains(upper_query, "DELETE")
not contains(upper_query, "DROP")
not contains(upper_query, "ALTER")
not contains(upper_query, "TRUNCATE")
}
sensitive_tables := ["users_pii", "payment_info", "credentials", "audit_log"]
contains_sensitive_table(query) if {
some table in sensitive_tables
contains(lower(query), table)
}# opa_integration.py — OPA 集成示例
import aiohttp
from typing import Any
class OPAClient:
"""Open Policy Agent 客户端"""
def __init__(self, opa_url: str = "http://localhost:8181"):
self.opa_url = opa_url
async def check_tool_access(
self,
agent_id: str,
agent_role: str,
tool: str,
parameters: dict,
context: dict | None = None,
) -> dict:
"""向 OPA 查询工具访问权限"""
input_data = {
"input": {
"agent": {
"id": agent_id,
"role": agent_role,
},
"tool": tool,
"parameters": parameters,
"context": context or {},
"timestamp": __import__("time").time(),
}
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.opa_url}/v1/data/agent/toolaccess/allow",
json=input_data,
) as resp:
result = await resp.json()
allowed = result.get("result", False)
return {
"allowed": allowed,
"policy": "agent.toolaccess",
"input": input_data["input"],
}
# 使用示例
async def main():
opa = OPAClient()
result = await opa.check_tool_access(
agent_id="coding-agent-001",
agent_role="developer",
tool="file_read",
parameters={"path": "/workspace/project/src/main.py"},
)
print(result) # {"allowed": True, ...}步骤 3:使用 Traefik Hub MCP Gateway 实现 TBAC
Traefik Hub 的 MCP Gateway 引入了 TBAC(Task-Based Access Control),从三个维度控制 Agent 的工具访问:任务(业务目标)、工具(系统访问)、事务(参数级约束)。
# traefik-mcp-gateway.yaml — Traefik Hub MCP Gateway TBAC 配置示例
apiVersion: hub.traefik.io/v1alpha1
kind: MCPGateway
metadata:
name: agent-gateway
spec:
# 任务定义:Agent 可以执行的业务任务
tasks:
- name: "bug-fix"
description: "修复代码缺陷"
allowedTools:
- toolRef: "file-reader"
constraints:
paths: ["/workspace/project/src/**"]
- toolRef: "file-writer"
constraints:
paths: ["/workspace/project/src/**"]
excludePaths: ["**/config/**", "**/.env"]
- toolRef: "test-runner"
constraints:
commands: ["npm test", "cargo test"]
- toolRef: "git-operations"
constraints:
operations: ["commit", "branch"]
branchPattern: "fix/*"
- name: "code-review"
description: "代码审查(只读)"
allowedTools:
- toolRef: "file-reader"
constraints:
paths: ["/workspace/project/**"]
- toolRef: "git-operations"
constraints:
operations: ["log", "diff", "blame"]
# 注意:没有 commit/push 权限
- name: "deployment"
description: "部署到生产环境"
requiresApproval: true
minApprovers: 2
allowedTools:
- toolRef: "deploy-tool"
constraints:
environments: ["staging", "production"]
requiresHealthCheck: true
# 工具定义
tools:
- name: "file-reader"
mcpServer: "filesystem-mcp"
toolName: "read_file"
- name: "file-writer"
mcpServer: "filesystem-mcp"
toolName: "write_file"
- name: "test-runner"
mcpServer: "terminal-mcp"
toolName: "execute_command"
- name: "git-operations"
mcpServer: "git-mcp"
toolName: "git_*"
# 身份验证:On-Behalf-Of 模式
authentication:
mode: "on-behalf-of" # Agent 使用调用者的身份和权限
tokenPropagation: true
# 审计日志
audit:
enabled: true
logLevel: "detailed"
destination: "elasticsearch"5. 权限提升防御
常见权限提升攻击模式
Agent 系统中的权限提升攻击比传统系统更隐蔽,因为攻击者可以通过语义层(自然语言)而非技术层发起攻击。
Agent 权限提升攻击路径
┌──────────────────────────────────────────────────────────┐
│ 攻击模式 1:混淆代理人(Confused Deputy) │
│ │
│ 低权限用户 ──▶ 低权限 Agent ──▶ 高权限 Agent │
│ │ │ │
│ "请帮我查询 "Agent A 说需要 │
│ 所有用户数据" 查询用户数据" │
│ │ │
│ 高权限 Agent 未验证 │
│ 原始用户意图,直接执行 │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ 攻击模式 2:渐进式权限提升 │
│ │
│ 步骤 1: "请读取 config.yaml" ✅ 允许(只读) │
│ 步骤 2: "请修改 config.yaml 的端口" ✅ 允许(写入) │
│ 步骤 3: "请修改 config.yaml 的权限设置" ⚠️ 应该拒绝 │
│ 步骤 4: "请重启服务使配置生效" ⚠️ 应该拒绝 │
│ │
│ 每一步看起来都合理,但组合起来实现了权限提升 │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ 攻击模式 3:工具链利用 │
│ │
│ Agent 有权执行 "npm install" │
│ ↓ │
│ 安装包含 postinstall 脚本的恶意包 │
│ ↓ │
│ postinstall 脚本以 Agent 权限执行任意代码 │
│ ↓ │
│ 读取 .env、SSH 密钥、AWS 凭证 │
└──────────────────────────────────────────────────────────┘防御措施实现
// privilege_escalation_defense.ts — 权限提升防御
interface ActionRecord {
timestamp: number;
tool: string;
action: string;
parameters: Record<string, unknown>;
riskScore: number;
}
interface EscalationAlert {
type: string;
description: string;
actions: ActionRecord[];
recommendedAction: 'block' | 'require_approval' | 'alert';
}
class PrivilegeEscalationDetector {
private actionHistory: ActionRecord[] = [];
private readonly windowMs = 10 * 60 * 1000; // 10 分钟窗口
/**
* 记录操作并检测权限提升模式
*/
detectEscalation(action: ActionRecord): EscalationAlert | null {
this.actionHistory.push(action);
this.pruneOldActions();
// 检测模式 1:累积风险分数
const cumulativeRisk = this.actionHistory.reduce(
(sum, a) => sum + a.riskScore, 0,
);
if (cumulativeRisk > 10) {
return {
type: 'cumulative_risk',
description: `会话累积风险分数 ${cumulativeRisk} 超过阈值 10`,
actions: this.actionHistory.slice(-5),
recommendedAction: 'require_approval',
};
}
// 检测模式 2:权限升级序列
const escalationPattern = this.detectEscalationSequence();
if (escalationPattern) {
return escalationPattern;
}
// 检测模式 3:异常工具组合
const anomaly = this.detectAnomalousToolCombination();
if (anomaly) {
return anomaly;
}
return null;
}
private detectEscalationSequence(): EscalationAlert | null {
const recent = this.actionHistory.slice(-5);
if (recent.length < 3) return null;
// 检测:读取配置 → 修改配置 → 重启服务
const hasConfigRead = recent.some(
(a) => a.tool === 'file_read' && /config/i.test(String(a.parameters.path)),
);
const hasConfigWrite = recent.some(
(a) => a.tool === 'file_write' && /config/i.test(String(a.parameters.path)),
);
const hasRestart = recent.some(
(a) => a.tool === 'terminal' && /restart|reload/i.test(String(a.parameters.command)),
);
if (hasConfigRead && hasConfigWrite && hasRestart) {
return {
type: 'config_escalation',
description: '检测到配置修改 + 服务重启序列,可能是权限提升尝试',
actions: recent,
recommendedAction: 'block',
};
}
return null;
}
private detectAnomalousToolCombination(): EscalationAlert | null {
const recent = this.actionHistory.slice(-5);
const toolSet = new Set(recent.map((a) => a.tool));
// 危险组合:文件读取 + 网络请求(可能是数据外泄)
if (toolSet.has('file_read') && toolSet.has('http_request')) {
const fileReads = recent.filter((a) => a.tool === 'file_read');
const httpRequests = recent.filter((a) => a.tool === 'http_request');
// 检查是否读取了敏感文件后发起外部请求
const readsSensitive = fileReads.some((a) =>
/\.(env|key|pem|credentials)/i.test(String(a.parameters.path)),
);
if (readsSensitive) {
return {
type: 'data_exfiltration',
description: '检测到敏感文件读取后发起网络请求,可能是数据外泄',
actions: [...fileReads, ...httpRequests],
recommendedAction: 'block',
};
}
}
return null;
}
private pruneOldActions(): void {
const cutoff = Date.now() - this.windowMs;
this.actionHistory = this.actionHistory.filter(
(a) => a.timestamp > cutoff,
);
}
}实战案例:为多 Agent 系统构建完整权限控制
场景描述
一个电商平台部署了三个协作 Agent:
- ProductAgent:管理商品信息(读写商品数据库)
- OrderAgent:处理订单(读写订单数据库、调用支付 API)
- SupportAgent:客户支持(读取用户信息、发送邮件)
需要设计一套权限控制方案,确保每个 Agent 只能访问其职责范围内的资源。
完整实现
# multi_agent_permission.py — 多 Agent 权限控制完整示例
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time
import json
class AgentType(Enum):
PRODUCT = "product_agent"
ORDER = "order_agent"
SUPPORT = "support_agent"
@dataclass
class PermissionPolicy:
"""Agent 权限策略"""
agent_type: AgentType
allowed_databases: dict[str, list[str]] # db_name -> [allowed_tables]
allowed_apis: list[str]
allowed_actions: dict[str, list[str]] # tool -> [actions]
denied_fields: list[str] # 禁止访问的字段
requires_approval: list[str] # 需要审批的操作
rate_limits: dict[str, int] # tool -> max_calls_per_minute
# 定义各 Agent 的权限策略
POLICIES: dict[AgentType, PermissionPolicy] = {
AgentType.PRODUCT: PermissionPolicy(
agent_type=AgentType.PRODUCT,
allowed_databases={
"ecommerce": ["products", "categories", "inventory"],
},
allowed_apis=["image_upload", "search_index"],
allowed_actions={
"database": ["select", "insert", "update"],
"file": ["read", "write"],
"api": ["call"],
},
denied_fields=["user_email", "user_phone", "payment_info"],
requires_approval=["delete_product", "bulk_update"],
rate_limits={"database": 60, "api": 30},
),
AgentType.ORDER: PermissionPolicy(
agent_type=AgentType.ORDER,
allowed_databases={
"ecommerce": ["orders", "order_items", "products"], # products 只读
},
allowed_apis=["payment_gateway", "shipping_api"],
allowed_actions={
"database": ["select", "insert", "update"],
"payment": ["charge", "refund"],
"api": ["call"],
},
denied_fields=["user_password", "user_ssn"],
requires_approval=["refund", "cancel_order", "bulk_update"],
rate_limits={"database": 100, "payment": 10},
),
AgentType.SUPPORT: PermissionPolicy(
agent_type=AgentType.SUPPORT,
allowed_databases={
"ecommerce": ["users", "orders", "tickets"], # 全部只读
},
allowed_apis=["email_service", "ticket_system"],
allowed_actions={
"database": ["select"], # 只读!
"email": ["send"],
"ticket": ["create", "update", "close"],
},
denied_fields=["user_password", "payment_info", "user_ssn"],
requires_approval=["send_bulk_email", "close_ticket"],
rate_limits={"database": 50, "email": 20},
),
}
class MultiAgentPermissionGateway:
"""多 Agent 权限网关"""
def __init__(self):
self.policies = POLICIES
self.audit_log: list[dict] = []
self.call_counts: dict[str, list[float]] = {}
def authorize(
self,
agent_type: AgentType,
tool: str,
action: str,
resource: str,
parameters: dict,
) -> dict:
"""授权检查入口"""
policy = self.policies.get(agent_type)
if not policy:
return self._deny(agent_type, tool, action, "未知的 Agent 类型")
# 1. 检查工具权限
if tool not in policy.allowed_actions:
return self._deny(agent_type, tool, action, f"无权使用工具: {tool}")
# 2. 检查操作权限
if action not in policy.allowed_actions[tool]:
return self._deny(
agent_type, tool, action,
f"无权执行操作: {tool}.{action}"
)
# 3. 检查数据库表权限
if tool == "database":
db_name = parameters.get("database", "")
table = parameters.get("table", "")
allowed_tables = policy.allowed_databases.get(db_name, [])
if table not in allowed_tables:
return self._deny(
agent_type, tool, action,
f"无权访问表: {db_name}.{table}"
)
# 4. 检查字段过滤
if tool == "database" and "fields" in parameters:
for field_name in parameters["fields"]:
if field_name in policy.denied_fields:
return self._deny(
agent_type, tool, action,
f"禁止访问字段: {field_name}"
)
# 5. 检查 API 权限
if tool == "api":
api_name = parameters.get("api_name", "")
if api_name not in policy.allowed_apis:
return self._deny(
agent_type, tool, action,
f"无权调用 API: {api_name}"
)
# 6. 检查速率限制
rate_key = f"{agent_type.value}:{tool}"
limit = policy.rate_limits.get(tool, 100)
if not self._check_rate(rate_key, limit):
return self._deny(
agent_type, tool, action,
f"超过速率限制: {limit}/分钟"
)
# 7. 检查是否需要审批
action_key = f"{action}_{resource}" if resource else action
if action_key in policy.requires_approval:
self._audit(agent_type, tool, action, "requires_approval")
return {
"allowed": True,
"requires_approval": True,
"message": f"操作 {action_key} 需要人工审批",
}
self._audit(agent_type, tool, action, "allowed")
return {"allowed": True, "requires_approval": False}
def _check_rate(self, key: str, limit: int) -> bool:
now = time.time()
if key not in self.call_counts:
self.call_counts[key] = []
self.call_counts[key] = [
t for t in self.call_counts[key] if t > now - 60
]
if len(self.call_counts[key]) >= limit:
return False
self.call_counts[key].append(now)
return True
def _deny(self, agent_type, tool, action, reason) -> dict:
self._audit(agent_type, tool, action, f"denied: {reason}")
return {"allowed": False, "requires_approval": False, "reason": reason}
def _audit(self, agent_type, tool, action, result):
self.audit_log.append({
"timestamp": time.time(),
"agent": agent_type.value,
"tool": tool,
"action": action,
"result": result,
})
# ===== 使用示例 =====
gateway = MultiAgentPermissionGateway()
# ProductAgent 读取商品 — 允许
r1 = gateway.authorize(
AgentType.PRODUCT, "database", "select", "products",
{"database": "ecommerce", "table": "products", "fields": ["name", "price"]},
)
print(f"ProductAgent 读取商品: {r1}")
# {"allowed": True, ...}
# SupportAgent 尝试写入订单 — 拒绝
r2 = gateway.authorize(
AgentType.SUPPORT, "database", "update", "orders",
{"database": "ecommerce", "table": "orders"},
)
print(f"SupportAgent 写入订单: {r2}")
# {"allowed": False, "reason": "无权执行操作: database.update"}
# OrderAgent 尝试退款 — 需要审批
r3 = gateway.authorize(
AgentType.ORDER, "payment", "refund", "refund",
{"order_id": "ORD-12345", "amount": 99.99},
)
print(f"OrderAgent 退款: {r3}")
# {"allowed": True, "requires_approval": True, ...}
# SupportAgent 尝试访问密码字段 — 拒绝
r4 = gateway.authorize(
AgentType.SUPPORT, "database", "select", "users",
{"database": "ecommerce", "table": "users", "fields": ["name", "user_password"]},
)
print(f"SupportAgent 访问密码: {r4}")
# {"allowed": False, "reason": "禁止访问字段: user_password"}案例分析
这个多 Agent 权限控制方案体现了几个关键设计决策:
-
职责分离:每个 Agent 只能访问其职责范围内的数据库表和 API。SupportAgent 只有数据库只读权限,无法修改任何数据。
-
字段级过滤:即使 Agent 有权访问某张表,也不能访问敏感字段(密码、支付信息、SSN)。这防止了 Agent 被诱导泄露敏感数据。
-
分级审批:常规操作自动执行,高风险操作(退款、批量更新、批量邮件)需要人工审批。
-
速率限制:防止 Agent 在被入侵后大量读取数据或发起请求。
-
完整审计:每次授权检查都记录日志,便于事后分析和异常检测。
避坑指南
❌ 常见错误
-
使用黑名单而非白名单
- 问题:试图列出所有”不允许”的操作(如禁止
rm -rf /、禁止curl),但攻击者总能找到绕过方式(如用wget替代curl,用perl -e替代python -c)。 - 正确做法:采用白名单策略,只允许已知安全的操作。未在白名单中的操作默认拒绝(deny-by-default)。
- 问题:试图列出所有”不允许”的操作(如禁止
-
沙箱配置使用默认参数
- 问题:使用默认 Docker 配置运行 Agent,没有移除 capabilities、没有启用只读根文件系统、没有限制网络。默认 Docker 容器拥有大量不必要的权限。
- 正确做法:使用
--cap-drop ALL移除所有 Linux capabilities,启用--read-only只读根文件系统,设置--network none禁止网络(除非必要),限制--pids-limit和--memory。
-
Agent 间通信不验证身份
- 问题:在多 Agent 系统中,Agent A 向 Agent B 发送请求时,B 直接信任并执行,不验证 A 的身份和权限。被入侵的低权限 Agent 可以冒充高权限 Agent。
- 正确做法:Agent 间通信必须有签名验证。高权限 Agent 收到请求时,必须重新验证原始用户意图(On-Behalf-Of 模式),而非信任中间 Agent 的转述。
-
审批工作流超时后自动批准
- 问题:为了”不影响用户体验”,将审批超时的默认行为设为”自动批准”。攻击者只需等待超时即可绕过审批。
- 正确做法:审批超时的默认行为必须是”拒绝”(fail-closed)。如果审批者不可用,操作应该被阻止而非自动放行。
-
权限策略硬编码在代码中
- 问题:将 Agent 权限规则直接写在应用代码中,修改权限需要重新部署。无法快速响应安全事件。
- 正确做法:使用外部策略引擎(如 OPA)或声明式配置文件管理权限。权限变更应该能在不重启服务的情况下生效。支持紧急权限撤销。
-
忽视工具链的间接权限
- 问题:Agent 有权执行
npm install,看起来只是安装依赖。但 npm 包的postinstall脚本可以执行任意代码,等于间接授予了 RCE 权限。 - 正确做法:审计工具链的间接权限。
npm install应该在沙箱中执行,且使用--ignore-scripts禁止安装脚本。对所有包管理器操作进行安全审查。
- 问题:Agent 有权执行
✅ 最佳实践
- Deny-by-Default:所有权限默认拒绝,只显式授予必要的权限。这是最小权限原则的基础。
- 分层防御:不要依赖单一权限控制机制。结合 RBAC(角色)+ ABAC(属性)+ 沙箱(隔离)+ 审批(人工)构建多层防御。
- 权限定期审计:每月审查 Agent 权限配置,移除不再需要的权限。使用自动化工具检测权限漂移。
- 紧急撤销机制:建立一键撤销 Agent 所有权限的紧急机制,在检测到安全事件时立即生效。
- 权限即代码:将权限策略作为代码管理(Policy-as-Code),纳入版本控制和 CI/CD 流程,每次变更都有审查记录。
相关资源与延伸阅读
- OWASP Agentic Security Initiative — ASI Top 10 — Agent 安全风险分类框架
- Traefik Hub MCP Gateway — TBAC 文档 — 基于任务的访问控制详解
- Microsoft Wassette — WebAssembly Agent 工具运行时 — Wasm 沙箱方案
- E2B — AI Agent 云沙箱平台 — 开源 Firecracker 沙箱运行时
- Open Policy Agent (OPA) 官方文档 — 通用策略引擎
- gVisor 架构指南 — 用户态内核沙箱技术详解
- Firecracker 官方文档 — MicroVM 虚拟化引擎
- Agent Consent Protocol (ACP) — Agent 操作审批开源协议
参考来源
- Securing AI Agents: Principles of Least Privilege (2026-01)
- Sandboxing, Least Privilege & Data Exfiltration Guards (2025-10)
- How to Sandbox AI Agents in 2026: MicroVMs, gVisor & Isolation Strategies (2026)
- A Practical Guide to Running AI-Generated Code Safely (2026-02)
- Understanding Task-Based Access Control (TBAC) (2025-10)
- Attribute-based Access Control Policy for Enterprise AI (2026-01)
- Introducing Wassette: WebAssembly-based Tools for AI Agents (2025-08)
- Sandbox Management for AI Coding Agents (2026-01)
- Action-Level Approvals for AI Oversight (2025-10)
- Managing Excessive Agency: Least Privilege for AI Agents (2026-01)
📖 返回 总览与导航 | 上一节:22a-AI安全概览 | 下一节:22c-数据隐私与合规