Skip to Content

22b - Agent 权限控制

本文是《AI Agent 实战手册》第 22 章第 2 节。 上一节:22a-AI安全概览 | 下一节:22c-数据隐私与合规 📖 返回 总览与导航

⏱ 阅读时间:80 分钟 | 难度:⭐⭐⭐⭐⭐ 高级 | 前置知识:AI Agent 基础概念、容器/虚拟化基础、安全基础

概述

当 AI Agent 从”回答问题”进化到”自主执行任务”,权限控制就成了安全的核心命题。一个拥有文件读写、终端执行、API 调用能力的 Agent,本质上就是一个自动化的特权用户——如果不加约束,它可能在一次推理循环中完成数据外泄、权限提升甚至系统入侵。本节系统讲解 Agent 权限控制的四大支柱:最小权限原则、沙箱隔离技术、人工审批工作流、工具级访问控制,并提供 Python 和 TypeScript 的完整实现示例。


1. 最小权限原则(Least Privilege / Least Agency)

核心理念

最小权限原则在 Agent 时代被 OWASP 扩展为”最小代理权限”(Least Agency):Agent 只应被授予完成其定义任务所需的最低自主权级别。这不仅包括传统的数据访问权限,还包括工具调用权限、网络访问范围、执行时间限制和自主决策深度。

最小权限的四个维度 ┌─────────────────────────────────────────────────────────┐ │ 最小代理权限模型 │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 数据权限 │ │ 工具权限 │ │ 网络权限 │ │ │ │ │ │ │ │ │ │ │ │ • 只读 vs │ │ • 白名单 │ │ • 域名白名单 │ │ │ │ 读写 │ │ • 参数约束 │ │ • 端口限制 │ │ │ │ • 目录范围 │ │ • 调用频率 │ │ • 出站控制 │ │ │ │ • 数据分类 │ │ • 审批要求 │ │ • DNS 过滤 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 执行权限 │ │ 时间权限 │ │ 自主权限 │ │ │ │ │ │ │ │ │ │ │ │ • 沙箱级别 │ │ • 超时限制 │ │ • 决策深度 │ │ │ │ • 资源配额 │ │ • 会话时长 │ │ • 循环次数 │ │ │ │ • 进程隔离 │ │ • 冷却期 │ │ • 升级阈值 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘

工具推荐

工具用途价格适用场景
E2BAI Agent 沙箱运行时免费(开源)/ Pro $29/月起Agent 代码执行隔离
FirecrackerMicroVM 虚拟化引擎免费(开源)高性能轻量级 VM 隔离
gVisor用户态内核沙箱免费(开源)Kubernetes Pod 沙箱化
WassetteWebAssembly Agent 工具运行时免费(开源)MCP 工具沙箱执行
Traefik Hub MCP GatewayMCP 网关 + TBAC 访问控制免费(社区版)/ 企业版联系销售Agent 工具调用治理
Hoop.dev操作级审批平台免费(基础版)/ 企业版联系销售Agent 敏感操作审批
Agent Consent Protocol (ACP)Agent 操作审批代理免费(开源)工具调用拦截与审批
Open Policy Agent (OPA)通用策略引擎免费(开源)RBAC/ABAC 策略执行
Docker容器化隔离免费(开源)/ Business $24/月基础 Agent 隔离
Daytona开发环境沙箱免费(开源)/ 企业版联系销售Agent 开发环境隔离

操作步骤:实施最小权限

步骤 1:权限审计——盘点 Agent 当前权限

首先列出 Agent 可访问的所有资源和工具,评估每项权限的必要性:

from dataclasses import dataclass, field from enum import Enum from typing import Optional class PermissionLevel(Enum): NONE = "none" READ = "read" WRITE = "write" EXECUTE = "execute" ADMIN = "admin" class Necessity(Enum): REQUIRED = "required" # 核心功能必需 OPTIONAL = "optional" # 增强功能,可降级 UNNECESSARY = "unnecessary" # 不需要,应移除 @dataclass class PermissionAuditEntry: resource: str current_level: PermissionLevel required_level: PermissionLevel necessity: Necessity justification: str risk_if_compromised: str @dataclass class AgentPermissionAudit: agent_name: str entries: list[PermissionAuditEntry] = field(default_factory=list) def add(self, entry: PermissionAuditEntry): self.entries.append(entry) def get_over_privileged(self) -> list[PermissionAuditEntry]: """找出权限过高的条目""" return [ e for e in self.entries if e.current_level.value > e.required_level.value or e.necessity == Necessity.UNNECESSARY ] def generate_report(self) -> str: over = self.get_over_privileged() lines = [f"=== {self.agent_name} 权限审计报告 ==="] lines.append(f"总权限条目: {len(self.entries)}") lines.append(f"过度授权条目: {len(over)}") for e in over: lines.append( f" ⚠️ {e.resource}: " f"当前={e.current_level.value} → 建议={e.required_level.value} " f"({e.necessity.value})" ) return "\n".join(lines) # 使用示例 audit = AgentPermissionAudit("CodingAssistant") audit.add(PermissionAuditEntry( resource="文件系统 /workspace/", current_level=PermissionLevel.WRITE, required_level=PermissionLevel.WRITE, necessity=Necessity.REQUIRED, justification="需要创建和修改项目文件", risk_if_compromised="项目代码被篡改", )) audit.add(PermissionAuditEntry( resource="文件系统 ~/.ssh/", current_level=PermissionLevel.READ, required_level=PermissionLevel.NONE, necessity=Necessity.UNNECESSARY, justification="Agent 不需要访问 SSH 密钥", risk_if_compromised="SSH 密钥泄露,服务器被入侵", )) audit.add(PermissionAuditEntry( resource="终端命令执行", current_level=PermissionLevel.ADMIN, required_level=PermissionLevel.EXECUTE, necessity=Necessity.REQUIRED, justification="需要运行构建和测试命令", risk_if_compromised="任意命令执行,系统被完全控制", )) print(audit.generate_report())

步骤 2:定义权限策略文件

将 Agent 权限以声明式配置管理,便于审计和版本控制:

# agent-permissions.yaml — Agent 权限策略声明 agent: name: "CodingAssistant" version: "1.0" description: "项目编码辅助 Agent" permissions: file_system: allowed_paths: - path: "/workspace/project/" access: "read_write" max_file_size: "10MB" - path: "/tmp/agent-workspace/" access: "read_write" max_file_size: "50MB" denied_paths: - "~/.ssh/" - "~/.aws/" - "~/.config/" - ".env" - ".env.*" - "**/*.pem" - "**/*.key" terminal: sandbox: "docker" # docker | gvisor | firecracker | wasm allowed_commands: - "npm" - "node" - "python" - "pip" - "git status" - "git diff" - "git log" - "cargo" - "make" - "cat" - "ls" denied_patterns: - "curl *" - "wget *" - "ssh *" - "scp *" - "nc *" - "rm -rf /" - "| base64" - "> /dev/tcp" - "$(cat .env)" timeout_seconds: 30 max_output_size: "1MB" network: outbound: allowed_domains: - "registry.npmjs.org" - "pypi.org" - "docs.python.org" - "developer.mozilla.org" denied_ports: [22, 23, 25, 3389] inbound: "deny_all" resources: max_memory: "512MB" max_cpu_percent: 50 max_disk: "1GB" max_processes: 10 approval_required: - action: "git push" approvers: ["developer"] timeout: "5m" - action: "npm install *" approvers: ["developer"] timeout: "2m" - action: "rm -rf *" approvers: ["developer", "lead"] timeout: "5m" - action: "database_write" approvers: ["developer", "dba"] timeout: "10m"

步骤 3:实施权限降级策略

从最高权限逐步收紧,而非从零开始添加:

class PermissionEnforcer: """Agent 权限执行器 — 强制执行最小权限""" def __init__(self, policy_path: str): self.policy = self._load_policy(policy_path) def check_file_access(self, path: str, access_type: str) -> bool: """检查文件访问权限""" import os abs_path = os.path.abspath(path) # 检查拒绝列表(优先级最高) for denied in self.policy.get("denied_paths", []): if self._match_pattern(abs_path, denied): self._log_denied("file_access", path, "在拒绝列表中") return False # 检查允许列表 for allowed in self.policy.get("allowed_paths", []): if abs_path.startswith(allowed["path"]): if access_type == "write" and allowed["access"] == "read": self._log_denied("file_access", path, "只读路径") return False return True # 默认拒绝(deny-by-default) self._log_denied("file_access", path, "不在允许列表中") return False def check_command(self, command: str) -> bool: """检查终端命令权限""" cmd_parts = command.strip().split() if not cmd_parts: return False base_cmd = cmd_parts[0] # 检查拒绝模式 for pattern in self.policy.get("denied_patterns", []): if self._match_command_pattern(command, pattern): self._log_denied("command", command, f"匹配拒绝模式: {pattern}") return False # 检查允许列表 allowed = self.policy.get("allowed_commands", []) if base_cmd in allowed or command in allowed: return True self._log_denied("command", command, "不在允许列表中") return False def _match_pattern(self, path: str, pattern: str) -> bool: import fnmatch expanded = os.path.expanduser(pattern) return fnmatch.fnmatch(path, expanded) def _match_command_pattern(self, cmd: str, pattern: str) -> bool: import fnmatch return fnmatch.fnmatch(cmd, pattern) def _load_policy(self, path: str) -> dict: import yaml with open(path) as f: return yaml.safe_load(f) def _log_denied(self, category: str, target: str, reason: str): print(f"🚫 权限拒绝 [{category}] {target}{reason}")

2. 沙箱隔离技术

为什么标准容器不够?

标准 Docker 容器共享宿主机内核,容器逃逸漏洞(如 CVE-2024-21626 Leaky Vessels)可以让攻击者突破隔离。对于执行 AI 生成代码的 Agent,需要更强的隔离边界。

沙箱隔离技术对比 隔离强度 ▲ 最强 │ ┌──────────────┐ │ │ Firecracker │ 独立内核 + 极简设备模型 │ │ MicroVM │ 启动 ~125ms,内存开销 ~5MB │ └──────────────┘ 强 │ ┌──────────────┐ │ │ gVisor │ 用户态内核拦截系统调用 │ │ (runsc) │ 兼容 OCI,K8s 原生支持 │ └──────────────┘ 中强 │ ┌──────────────┐ │ │ Wasm/WASI │ 能力模型,默认零权限 │ │ (Wasmtime) │ 极快启动,跨平台 │ └──────────────┘ 中 │ ┌──────────────┐ │ │ Docker + │ Seccomp + AppArmor + 只读根 │ │ 加固配置 │ 共享内核,需额外加固 │ └──────────────┘ 弱 │ ┌──────────────┐ │ │ 标准 Docker │ 默认配置,共享内核 │ │ (默认) │ 容器逃逸风险 │ └──────────────┘ └──────────────────────────────────────────▶ 性能开销 低 高

沙箱技术详细对比

技术隔离级别启动时间内存开销K8s 支持适用场景局限性
Firecracker MicroVMVM 级(独立内核)~125ms~5MB/实例需适配高安全要求的代码执行需要 KVM 支持,不支持 GPU
gVisor (runsc)用户态内核~毫秒级~15-50MB原生 RuntimeClassK8s 环境下的 Agent 隔离部分系统调用不兼容
Wasm/WASI (Wasmtime)能力模型沙箱<1ms~KB 级通过 Spin/FermyonMCP 工具执行、轻量计算生态尚不成熟,I/O 受限
Docker + 加固命名空间 + cgroup~秒级~数十 MB原生通用 Agent 隔离共享内核,需额外加固
E2BFirecracker MicroVM~400ms托管N/A(云服务)AI Agent 代码执行依赖云服务,有成本
Modal容器 + 隔离~秒级托管N/A(云服务)ML 工作负载、GPU 任务Python 优先

操作步骤:部署各类沙箱

方案 A:Docker 加固沙箱

最容易上手的方案,适合大多数场景。关键是在默认配置基础上添加安全加固:

# Dockerfile.agent-sandbox — Agent 安全沙箱镜像 FROM python:3.12-slim AS agent-sandbox # 创建非 root 用户 RUN groupadd -r agent && useradd -r -g agent -d /workspace -s /bin/bash agent # 安装必要工具(最小化) RUN apt-get update && apt-get install -y --no-install-recommends \ git \ && rm -rf /var/lib/apt/lists/* # 设置工作目录 WORKDIR /workspace RUN chown agent:agent /workspace # 切换到非 root 用户 USER agent # 默认入口 CMD ["bash"]
# docker_sandbox.py — Docker 加固沙箱管理器 import subprocess import json import uuid from dataclasses import dataclass from typing import Optional @dataclass class SandboxConfig: image: str = "agent-sandbox:latest" memory_limit: str = "512m" cpu_quota: int = 50000 # 50% CPU network_mode: str = "none" # 默认禁止网络 read_only_root: bool = True no_new_privileges: bool = True timeout_seconds: int = 30 allowed_paths: list[str] = None def __post_init__(self): if self.allowed_paths is None: self.allowed_paths = [] class DockerSandbox: """Docker 加固沙箱 — 用于 Agent 代码执行""" def __init__(self, config: SandboxConfig): self.config = config self.container_id: Optional[str] = None def execute(self, command: str, workspace_path: str = "") -> dict: """在沙箱中执行命令""" container_name = f"agent-sandbox-{uuid.uuid4().hex[:8]}" docker_cmd = [ "docker", "run", "--name", container_name, "--rm", # 执行后自动删除 "--memory", self.config.memory_limit, "--cpu-quota", str(self.config.cpu_quota), "--network", self.config.network_mode, "--security-opt", "no-new-privileges:true", "--cap-drop", "ALL", # 移除所有 Linux capabilities "--pids-limit", "64", # 限制进程数 "--tmpfs", "/tmp:rw,noexec,nosuid,size=64m", ] # 只读根文件系统 if self.config.read_only_root: docker_cmd.append("--read-only") # 挂载工作目录(如果提供) if workspace_path: docker_cmd.extend([ "-v", f"{workspace_path}:/workspace:rw" ]) # Seccomp 配置(限制系统调用) docker_cmd.extend([ "--security-opt", "seccomp=default.json", ]) docker_cmd.extend([ self.config.image, "bash", "-c", command, ]) try: result = subprocess.run( docker_cmd, capture_output=True, text=True, timeout=self.config.timeout_seconds, ) return { "success": result.returncode == 0, "stdout": result.stdout[:10000], # 限制输出大小 "stderr": result.stderr[:5000], "exit_code": result.returncode, } except subprocess.TimeoutExpired: # 超时强制终止 subprocess.run(["docker", "kill", container_name], capture_output=True) return { "success": False, "stdout": "", "stderr": "执行超时,已强制终止", "exit_code": -1, } # 使用示例 sandbox = DockerSandbox(SandboxConfig( memory_limit="256m", timeout_seconds=15, network_mode="none", )) result = sandbox.execute("python -c 'print(2 + 2)'") print(result) # {"success": True, "stdout": "4\n", ...}

方案 B:Firecracker MicroVM 沙箱

Firecracker 是 AWS Lambda 和 Fargate 背后的虚拟化引擎,提供 VM 级隔离但启动时间仅约 125ms。每个 MicroVM 拥有独立内核,即使发生内核漏洞也不会影响宿主机。

# firecracker_sandbox.py — Firecracker MicroVM 沙箱(概念实现) import aiohttp import asyncio import json from dataclasses import dataclass @dataclass class MicroVMConfig: vcpu_count: int = 1 mem_size_mib: int = 128 kernel_image: str = "/opt/firecracker/vmlinux" rootfs_image: str = "/opt/firecracker/rootfs.ext4" network_enabled: bool = False timeout_seconds: int = 30 class FirecrackerSandbox: """Firecracker MicroVM 沙箱 — VM 级隔离""" def __init__(self, config: MicroVMConfig): self.config = config self.socket_path = "/tmp/firecracker.socket" async def create_vm(self): """创建并启动 MicroVM""" # 配置内核 await self._api_call("PUT", "/boot-source", { "kernel_image_path": self.config.kernel_image, "boot_args": "console=ttyS0 reboot=k panic=1 pci=off" }) # 配置根文件系统 await self._api_call("PUT", "/drives/rootfs", { "drive_id": "rootfs", "path_on_host": self.config.rootfs_image, "is_root_device": True, "is_read_only": False, }) # 配置机器资源 await self._api_call("PUT", "/machine-config", { "vcpu_count": self.config.vcpu_count, "mem_size_mib": self.config.mem_size_mib, }) # 网络配置(可选) if not self.config.network_enabled: pass # 不配置网络接口 = 无网络访问 # 启动 VM await self._api_call("PUT", "/actions", { "action_type": "InstanceStart" }) async def execute_in_vm(self, command: str) -> dict: """在 MicroVM 中执行命令(通过 vsock 或串口)""" # 实际实现中通过 vsock 通信 # 这里展示概念流程 try: result = await asyncio.wait_for( self._send_command(command), timeout=self.config.timeout_seconds, ) return {"success": True, "output": result} except asyncio.TimeoutError: await self.destroy_vm() return {"success": False, "output": "MicroVM 执行超时"} async def destroy_vm(self): """销毁 MicroVM""" await self._api_call("PUT", "/actions", { "action_type": "SendCtrlAltDel" }) async def _api_call(self, method: str, path: str, data: dict): """调用 Firecracker API""" connector = aiohttp.UnixConnector(path=self.socket_path) async with aiohttp.ClientSession(connector=connector) as session: url = f"http://localhost{path}" async with session.request(method, url, json=data) as resp: return await resp.json() if resp.content_length else None async def _send_command(self, command: str) -> str: """通过 vsock 发送命令到 VM(简化示意)""" # 实际实现需要 vsock 客户端 return f"[MicroVM output for: {command}]"

方案 C:gVisor 沙箱(Kubernetes 环境)

gVisor 在用户态实现了一个”Sentry”内核,拦截容器的系统调用,大幅缩小内核攻击面。在 GKE 上可以通过 RuntimeClass 直接启用。

# gvisor-runtime-class.yaml — 定义 gVisor 运行时类 apiVersion: node.k8s.io/v1 kind: RuntimeClass metadata: name: gvisor handler: runsc # gVisor 的 OCI 运行时 --- # agent-pod-gvisor.yaml — 使用 gVisor 运行的 Agent Pod apiVersion: v1 kind: Pod metadata: name: agent-sandbox labels: app: agent-sandbox security-level: high spec: runtimeClassName: gvisor # 使用 gVisor 运行时 securityContext: runAsNonRoot: true runAsUser: 1000 fsGroup: 1000 seccompProfile: type: RuntimeDefault containers: - name: agent image: agent-sandbox:latest resources: limits: memory: "512Mi" cpu: "500m" requests: memory: "256Mi" cpu: "250m" securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true capabilities: drop: ["ALL"] volumeMounts: - name: workspace mountPath: /workspace - name: tmp mountPath: /tmp volumes: - name: workspace emptyDir: sizeLimit: "1Gi" - name: tmp emptyDir: medium: Memory sizeLimit: "64Mi" # 网络策略:默认拒绝所有出站 # 需要配合 NetworkPolicy 使用
# network-policy-agent.yaml — Agent Pod 网络策略 apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: agent-sandbox-netpol spec: podSelector: matchLabels: app: agent-sandbox policyTypes: - Ingress - Egress ingress: [] # 拒绝所有入站 egress: # 只允许访问特定域名(通过 DNS) - to: - namespaceSelector: matchLabels: kubernetes.io/metadata.name: kube-system ports: - protocol: UDP port: 53

方案 D:WebAssembly (Wasm/WASI) 沙箱

Wasm 采用能力模型(Capability-based Security),默认零权限,所有系统资源访问必须显式授予。微软开源的 Wassette 项目专门为 AI Agent 工具执行设计了基于 Wasm 的安全运行时。

// wasm_sandbox.ts — Wasm 沙箱概念实现(TypeScript) interface WasmPermissions { fileSystem?: { readPaths?: string[]; writePaths?: string[]; }; network?: { allowedHosts?: string[]; allowedPorts?: number[]; }; environment?: { allowedVars?: string[]; }; maxMemoryMB?: number; maxExecutionMs?: number; } interface WasmExecutionResult { success: boolean; output: string; error?: string; resourceUsage: { memoryUsedBytes: number; executionTimeMs: number; }; } class WasmSandbox { private permissions: WasmPermissions; constructor(permissions: WasmPermissions) { this.permissions = permissions; } /** * 在 Wasm 沙箱中执行工具 * 所有权限必须显式声明,默认全部拒绝 */ async executeModule( wasmPath: string, args: string[], ): Promise<WasmExecutionResult> { const startTime = Date.now(); // 构建 WASI 预打开目录(只授予声明的路径) const preopens: Record<string, string> = {}; if (this.permissions.fileSystem?.readPaths) { for (const p of this.permissions.fileSystem.readPaths) { preopens[p] = p; // 映射宿主路径到沙箱路径 } } // 构建环境变量(只传递允许的变量) const env: Record<string, string> = {}; if (this.permissions.environment?.allowedVars) { for (const varName of this.permissions.environment.allowedVars) { const value = process.env[varName]; if (value) env[varName] = value; } } try { // 使用 Wasmtime/Wasmer 运行模块 // 实际实现中调用 Wasmtime CLI 或 SDK const result = await this.runWasmModule(wasmPath, args, { preopens, env, maxMemory: (this.permissions.maxMemoryMB ?? 64) * 1024 * 1024, }); const executionTimeMs = Date.now() - startTime; // 检查执行时间限制 if ( this.permissions.maxExecutionMs && executionTimeMs > this.permissions.maxExecutionMs ) { return { success: false, output: '', error: `执行超时: ${executionTimeMs}ms > ${this.permissions.maxExecutionMs}ms`, resourceUsage: { memoryUsedBytes: 0, executionTimeMs }, }; } return { success: true, output: result.stdout, resourceUsage: { memoryUsedBytes: result.memoryUsed, executionTimeMs, }, }; } catch (error) { return { success: false, output: '', error: String(error), resourceUsage: { memoryUsedBytes: 0, executionTimeMs: Date.now() - startTime, }, }; } } private async runWasmModule( path: string, args: string[], opts: { preopens: Record<string, string>; env: Record<string, string>; maxMemory: number }, ) { // 实际实现中调用 Wasmtime SDK // 这里为概念示意 return { stdout: '', memoryUsed: 0 }; } } // 使用示例:为 MCP 工具创建最小权限沙箱 const toolSandbox = new WasmSandbox({ fileSystem: { readPaths: ['/workspace/project/src'], // 注意:没有 writePaths = 只读 }, network: undefined, // 无网络权限 maxMemoryMB: 32, maxExecutionMs: 5000, });

提示词模板:沙箱方案选择

你是一位基础设施安全架构师。请根据以下 Agent 系统需求,推荐最合适的沙箱隔离方案。 ## Agent 信息 - Agent 类型:[编码Agent / 数据分析Agent / 自动化工作流Agent / 客服Agent] - 需要执行的操作:[代码执行 / 文件读写 / API调用 / 数据库查询 / 浏览器操作] - 部署环境:[Kubernetes / 裸机 / 云函数 / 本地开发] - 并发需求:[每秒执行次数] - 安全要求:[标准 / 高 / 极高(金融/医疗)] - 预算约束:[有限 / 中等 / 充足] - GPU 需求:[是/否] ## 请输出 ### 1. 推荐方案 从以下选项中推荐最合适的方案,并说明理由: - Docker 加固 - Firecracker MicroVM - gVisor - Wasm/WASI - 托管平台(E2B/Modal) - 组合方案 ### 2. 配置模板 提供推荐方案的完整配置文件。 ### 3. 安全加固清单 列出该方案需要额外加固的安全措施。 ### 4. 性能基准 预估启动时间、内存开销、吞吐量。

3. 人工审批工作流(Human-in-the-Loop)

为什么需要审批工作流?

Agent 的自主性是一把双刃剑。对于高风险操作(数据删除、代码部署、资金转账、权限变更),必须引入人工审批环节。审批工作流的核心思想是:Agent 可以自主完成低风险任务,但高风险操作必须经过人类确认。

审批工作流决策模型 Agent 发起操作 ┌──────────────┐ │ 风险评估引擎 │ └──────┬───────┘ ├── 低风险(读取、查询、摘要)──────▶ 自动执行 ✅ ├── 中风险(文件修改、API 调用)───▶ 异步通知 + 自动执行 📋 ├── 高风险(部署、删除、安装依赖)─▶ 同步审批 ⏸️ │ │ │ ┌─────▼─────┐ │ │ 审批者审查 │ │ │ (Slack/ │ │ │ Teams/API)│ │ └─────┬─────┘ │ │ │ ┌─────┴─────┐ │ │ │ │ 批准 ✅ 拒绝 ❌ │ │ │ │ 执行操作 记录并通知 └── 极高风险(权限变更、资金操作)─▶ 多人审批 + MFA ⏸️⏸️ 需要 ≥2 人批准 + 多因素认证

操作步骤:构建审批工作流

步骤 1:定义风险分级策略

# approval_workflow.py — Agent 操作审批工作流 from dataclasses import dataclass, field from enum import Enum from typing import Callable, Optional import time import uuid import re class RiskLevel(Enum): LOW = "low" # 自动执行 MEDIUM = "medium" # 通知 + 自动执行 HIGH = "high" # 需要审批 CRITICAL = "critical" # 多人审批 + MFA @dataclass class ApprovalRequest: id: str agent_id: str action: str parameters: dict risk_level: RiskLevel justification: str timestamp: float status: str = "pending" # pending | approved | denied | expired approver: Optional[str] = None approved_at: Optional[float] = None class RiskAssessor: """操作风险评估引擎""" def __init__(self): self.rules: list[tuple[Callable, RiskLevel]] = [] self._register_default_rules() def _register_default_rules(self): """注册默认风险评估规则""" # 极高风险操作 self.rules.append(( lambda action, params: action in [ "permission_change", "fund_transfer", "delete_database", "modify_credentials" ], RiskLevel.CRITICAL, )) # 高风险操作 self.rules.append(( lambda action, params: action in [ "deploy_production", "git_push", "npm_install", "delete_files", "execute_sql_write", ], RiskLevel.HIGH, )) # 中风险操作 self.rules.append(( lambda action, params: action in [ "file_write", "api_call", "send_email", "create_branch", ], RiskLevel.MEDIUM, )) # 基于参数的风险升级 self.rules.append(( lambda action, params: ( action == "execute_command" and any( p in str(params.get("command", "")) for p in ["rm ", "drop ", "delete ", "truncate "] ) ), RiskLevel.HIGH, )) def assess(self, action: str, parameters: dict) -> RiskLevel: """评估操作风险级别""" max_risk = RiskLevel.LOW for rule_fn, risk_level in self.rules: if rule_fn(action, parameters): if list(RiskLevel).index(risk_level) > list(RiskLevel).index(max_risk): max_risk = risk_level return max_risk class ApprovalWorkflow: """Agent 操作审批工作流管理器""" def __init__(self, risk_assessor: RiskAssessor): self.risk_assessor = risk_assessor self.pending_requests: dict[str, ApprovalRequest] = {} self.approval_timeout = 300 # 5 分钟超时 async def request_action( self, agent_id: str, action: str, parameters: dict, justification: str = "", ) -> dict: """Agent 请求执行操作""" risk = self.risk_assessor.assess(action, parameters) if risk == RiskLevel.LOW: return {"approved": True, "method": "auto", "risk": risk.value} if risk == RiskLevel.MEDIUM: # 异步通知,但自动执行 await self._notify_observers(action, parameters, risk) return {"approved": True, "method": "auto_with_notification", "risk": risk.value} # HIGH 和 CRITICAL 需要人工审批 request = ApprovalRequest( id=str(uuid.uuid4()), agent_id=agent_id, action=action, parameters=self._redact_sensitive(parameters), risk_level=risk, justification=justification, timestamp=time.time(), ) self.pending_requests[request.id] = request # 发送审批请求到通知渠道 await self._send_approval_request(request) # 等待审批结果 result = await self._wait_for_approval(request) return result async def approve(self, request_id: str, approver: str) -> bool: """审批者批准操作""" request = self.pending_requests.get(request_id) if not request or request.status != "pending": return False request.status = "approved" request.approver = approver request.approved_at = time.time() return True async def deny(self, request_id: str, approver: str, reason: str = "") -> bool: """审批者拒绝操作""" request = self.pending_requests.get(request_id) if not request or request.status != "pending": return False request.status = "denied" request.approver = approver return True async def _wait_for_approval(self, request: ApprovalRequest) -> dict: """等待审批结果(带超时)""" start = time.time() while time.time() - start < self.approval_timeout: if request.status == "approved": return { "approved": True, "method": "human_approval", "approver": request.approver, "risk": request.risk_level.value, } if request.status == "denied": return { "approved": False, "method": "human_denial", "approver": request.approver, "risk": request.risk_level.value, } await asyncio.sleep(1) # 超时 = 拒绝(安全默认值) request.status = "expired" return { "approved": False, "method": "timeout", "risk": request.risk_level.value, } async def _send_approval_request(self, request: ApprovalRequest): """发送审批请求到 Slack/Teams/API""" message = ( f"🔐 Agent 操作审批请求\n" f"Agent: {request.agent_id}\n" f"操作: {request.action}\n" f"风险级别: {request.risk_level.value}\n" f"理由: {request.justification}\n" f"请求 ID: {request.id}\n" f"超时: {self.approval_timeout}s" ) print(message) # 实际中发送到 Slack/Teams async def _notify_observers(self, action, params, risk): print(f"📋 通知: {action} (风险: {risk.value})") def _redact_sensitive(self, params: dict) -> dict: sensitive = {"password", "token", "secret", "key", "credential"} return { k: "***" if k.lower() in sensitive else v for k, v in params.items() } import asyncio # 放在文件顶部

ACP 是一个开源的 Agent 操作审批代理,它作为代理层拦截 Agent 的工具调用,要求人类显式批准敏感操作。

// acp_integration.ts — Agent Consent Protocol 集成示例 interface ACPConfig { /** ACP 代理服务地址 */ proxyUrl: string; /** 需要审批的操作模式 */ sensitivePatterns: RegExp[]; /** 审批超时(毫秒) */ approvalTimeoutMs: number; /** 通知渠道 */ notificationChannel: 'slack' | 'teams' | 'webhook'; } interface ToolCallRequest { toolName: string; parameters: Record<string, unknown>; agentId: string; sessionId: string; } interface ACPDecision { allowed: boolean; requiresApproval: boolean; approvalId?: string; reason?: string; } class AgentConsentProxy { private config: ACPConfig; constructor(config: ACPConfig) { this.config = config; } /** * 拦截工具调用,检查是否需要审批 */ async interceptToolCall(request: ToolCallRequest): Promise<ACPDecision> { // 1. 检查是否匹配敏感操作模式 const isSensitive = this.config.sensitivePatterns.some( (pattern) => pattern.test(request.toolName) || pattern.test(JSON.stringify(request.parameters)), ); if (!isSensitive) { // 非敏感操作,直接放行 return { allowed: true, requiresApproval: false }; } // 2. 发送审批请求 const approvalId = crypto.randomUUID(); await this.sendApprovalNotification({ approvalId, toolName: request.toolName, parameters: this.redactParams(request.parameters), agentId: request.agentId, }); // 3. 等待审批结果 const decision = await this.waitForDecision(approvalId); return decision; } private async sendApprovalNotification(details: { approvalId: string; toolName: string; parameters: Record<string, unknown>; agentId: string; }): Promise<void> { // 发送到 Slack/Teams/Webhook const payload = { text: `🔐 *Agent 操作审批*\n` + `> Agent: \`${details.agentId}\`\n` + `> 工具: \`${details.toolName}\`\n` + `> 参数: \`${JSON.stringify(details.parameters)}\`\n` + `> 审批 ID: \`${details.approvalId}\`\n` + `> 回复 \`/approve ${details.approvalId}\` 批准\n` + `> 回复 \`/deny ${details.approvalId}\` 拒绝`, }; await fetch(this.config.proxyUrl + '/notify', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); } private async waitForDecision(approvalId: string): Promise<ACPDecision> { const startTime = Date.now(); while (Date.now() - startTime < this.config.approvalTimeoutMs) { const response = await fetch( `${this.config.proxyUrl}/decisions/${approvalId}`, ); const data = await response.json(); if (data.status === 'approved') { return { allowed: true, requiresApproval: true, approvalId }; } if (data.status === 'denied') { return { allowed: false, requiresApproval: true, approvalId, reason: data.reason, }; } // 等待 1 秒后重试 await new Promise((resolve) => setTimeout(resolve, 1000)); } // 超时默认拒绝 return { allowed: false, requiresApproval: true, approvalId, reason: '审批超时', }; } private redactParams( params: Record<string, unknown>, ): Record<string, unknown> { const sensitive = new Set([ 'password', 'token', 'secret', 'key', 'credential', ]); return Object.fromEntries( Object.entries(params).map(([k, v]) => [ k, sensitive.has(k.toLowerCase()) ? '***REDACTED***' : v, ]), ); } } // 使用示例 const acp = new AgentConsentProxy({ proxyUrl: 'http://localhost:8080', sensitivePatterns: [ /git\s+push/i, /npm\s+install/i, /rm\s+-rf/i, /deploy/i, /database.*write/i, /send.*email/i, ], approvalTimeoutMs: 300_000, // 5 分钟 notificationChannel: 'slack', });

4. 工具级访问控制(RBAC / ABAC / TBAC)

访问控制模型对比

Agent 时代的访问控制不仅要管”谁能访问什么”,还要管”Agent 在什么任务上下文中能调用什么工具”。传统的 RBAC 和 ABAC 正在被 TBAC(Task-Based Access Control)补充。

三种访问控制模型对比 ┌─────────────────────────────────────────────────────────────┐ │ RBAC(基于角色的访问控制) │ │ │ │ 用户/Agent ──▶ 角色 ──▶ 权限 │ │ │ │ 示例: │ │ CodingAgent ──▶ developer 角色 ──▶ [file_read, file_write, │ │ git_commit, run_tests] │ │ │ │ 优点:简单直观,易于管理 │ │ 局限:粒度不够细,无法根据上下文动态调整 │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ ABAC(基于属性的访问控制) │ │ │ │ 决策 = f(主体属性, 资源属性, 环境属性, 操作属性) │ │ │ │ 示例: │ │ IF agent.trust_level >= 3 │ │ AND resource.sensitivity <= "medium" │ │ AND time.is_business_hours == true │ │ AND action.type == "read" │ │ THEN allow │ │ │ │ 优点:细粒度,上下文感知 │ │ 局限:策略复杂,难以调试 │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ TBAC(基于任务的访问控制)— Agent 时代新模型 │ │ │ │ 决策 = f(任务目标, 工具需求, 参数约束) │ │ │ │ 三个维度: │ │ ① 任务(Task):业务目标,如"修复 bug #123" │ │ ② 工具(Tool):允许使用的工具集 │ │ ③ 事务(Transaction):参数级约束 │ │ │ │ 示例: │ │ 任务 "修复 bug #123" 允许: │ │ - file_read(仅 src/ 目录) │ │ - file_write(仅 src/ 目录,排除 config/) │ │ - run_tests(无限制) │ │ - git_commit(仅 fix/ 分支) │ │ │ │ 优点:与 Agent 工作模式天然匹配 │ │ 代表:Traefik Hub MCP Gateway │ └─────────────────────────────────────────────────────────────┘

操作步骤:实现工具级访问控制

步骤 1:基于 RBAC 的工具权限管理

# tool_access_control.py — Agent 工具级访问控制 from dataclasses import dataclass, field from typing import Optional import re import time @dataclass class ToolPermission: tool_name: str allowed_actions: list[str] # 允许的操作类型 parameter_constraints: dict # 参数约束 rate_limit: Optional[int] = None # 每分钟最大调用次数 requires_approval: bool = False time_window: Optional[tuple[int, int]] = None # 允许的时间窗口 (start_hour, end_hour) @dataclass class AgentRole: name: str description: str tool_permissions: list[ToolPermission] = field(default_factory=list) max_concurrent_tools: int = 5 session_timeout_minutes: int = 60 class ToolAccessController: """Agent 工具级访问控制器(RBAC 模型)""" def __init__(self): self.roles: dict[str, AgentRole] = {} self.agent_roles: dict[str, str] = {} # agent_id -> role_name self.call_history: dict[str, list[float]] = {} # tool_name -> timestamps def define_role(self, role: AgentRole): """定义 Agent 角色""" self.roles[role.name] = role def assign_role(self, agent_id: str, role_name: str): """为 Agent 分配角色""" if role_name not in self.roles: raise ValueError(f"角色 {role_name} 不存在") self.agent_roles[agent_id] = role_name def check_access( self, agent_id: str, tool_name: str, action: str, parameters: dict, ) -> dict: """检查 Agent 是否有权调用指定工具""" # 1. 获取 Agent 角色 role_name = self.agent_roles.get(agent_id) if not role_name: return self._deny("Agent 未分配角色") role = self.roles.get(role_name) if not role: return self._deny("角色不存在") # 2. 查找工具权限 tool_perm = next( (tp for tp in role.tool_permissions if tp.tool_name == tool_name), None, ) if not tool_perm: return self._deny(f"角色 {role_name} 无权使用工具 {tool_name}") # 3. 检查操作类型 if action not in tool_perm.allowed_actions: return self._deny(f"操作 {action} 不在允许列表中") # 4. 检查参数约束 constraint_result = self._check_constraints( parameters, tool_perm.parameter_constraints ) if not constraint_result["passed"]: return self._deny(f"参数约束违反: {constraint_result['reason']}") # 5. 检查速率限制 if tool_perm.rate_limit: if not self._check_rate_limit(tool_name, tool_perm.rate_limit): return self._deny(f"超过速率限制: {tool_perm.rate_limit}/分钟") # 6. 检查时间窗口 if tool_perm.time_window: current_hour = time.localtime().tm_hour start, end = tool_perm.time_window if not (start <= current_hour < end): return self._deny(f"不在允许时间窗口内: {start}:00-{end}:00") # 7. 检查是否需要审批 if tool_perm.requires_approval: return { "allowed": True, "requires_approval": True, "reason": "此操作需要人工审批", } # 记录调用 self._record_call(tool_name) return {"allowed": True, "requires_approval": False} def _check_constraints(self, params: dict, constraints: dict) -> dict: """检查参数约束""" for key, constraint in constraints.items(): value = params.get(key) if value is None: continue # 正则匹配约束 if "pattern" in constraint: if not re.match(constraint["pattern"], str(value)): return { "passed": False, "reason": f"{key} 不匹配模式 {constraint['pattern']}", } # 白名单约束 if "allowed_values" in constraint: if value not in constraint["allowed_values"]: return { "passed": False, "reason": f"{key}={value} 不在允许值列表中", } # 黑名单约束 if "denied_values" in constraint: if value in constraint["denied_values"]: return { "passed": False, "reason": f"{key}={value} 在拒绝值列表中", } # 范围约束 if "max_length" in constraint: if len(str(value)) > constraint["max_length"]: return { "passed": False, "reason": f"{key} 超过最大长度 {constraint['max_length']}", } return {"passed": True} def _check_rate_limit(self, tool_name: str, limit: int) -> bool: history = self.call_history.get(tool_name, []) cutoff = time.time() - 60 recent = [t for t in history if t > cutoff] self.call_history[tool_name] = recent return len(recent) < limit def _record_call(self, tool_name: str): if tool_name not in self.call_history: self.call_history[tool_name] = [] self.call_history[tool_name].append(time.time()) def _deny(self, reason: str) -> dict: return {"allowed": False, "requires_approval": False, "reason": reason} # ===== 使用示例 ===== controller = ToolAccessController() # 定义角色 developer_role = AgentRole( name="developer", description="开发者 Agent 角色", tool_permissions=[ ToolPermission( tool_name="file_read", allowed_actions=["read"], parameter_constraints={ "path": { "pattern": r"^/workspace/project/.*", # 只允许项目目录 "denied_values": ["/workspace/project/.env"], }, }, rate_limit=100, ), ToolPermission( tool_name="file_write", allowed_actions=["write", "create"], parameter_constraints={ "path": { "pattern": r"^/workspace/project/src/.*", # 只允许 src 目录 "max_length": 200, }, }, rate_limit=50, ), ToolPermission( tool_name="terminal", allowed_actions=["execute"], parameter_constraints={ "command": { "pattern": r"^(npm|node|python|cargo|git\s+(status|diff|log)).*", }, }, rate_limit=30, ), ToolPermission( tool_name="git_push", allowed_actions=["execute"], parameter_constraints={}, requires_approval=True, # 推送代码需要审批 ), ], ) controller.define_role(developer_role) controller.assign_role("coding-agent-001", "developer") # 检查权限 result = controller.check_access( agent_id="coding-agent-001", tool_name="file_read", action="read", parameters={"path": "/workspace/project/src/main.py"}, ) print(result) # {"allowed": True, ...} result = controller.check_access( agent_id="coding-agent-001", tool_name="file_read", action="read", parameters={"path": "/workspace/project/.env"}, ) print(result) # {"allowed": False, "reason": "参数约束违反: ..."}

步骤 2:使用 Open Policy Agent (OPA) 实现 ABAC

OPA 是 CNCF 毕业项目,提供通用的策略引擎,可以用 Rego 语言编写细粒度的访问控制策略。

# agent_policy.rego — OPA 策略:Agent 工具访问控制 package agent.toolaccess import rego.v1 # 默认拒绝 default allow := false # 规则 1:开发者角色可以读取项目文件 allow if { input.agent.role == "developer" input.tool == "file_read" startswith(input.parameters.path, "/workspace/project/") not is_sensitive_path(input.parameters.path) } # 规则 2:开发者角色可以写入 src 目录 allow if { input.agent.role == "developer" input.tool == "file_write" startswith(input.parameters.path, "/workspace/project/src/") input.parameters.file_size_bytes <= 1048576 # 最大 1MB } # 规则 3:只在工作时间允许部署操作 allow if { input.agent.role == "deployer" input.tool == "deploy" is_business_hours input.environment != "production" # 非生产环境可自动部署 } # 规则 4:生产部署需要审批标记 allow if { input.agent.role == "deployer" input.tool == "deploy" input.environment == "production" input.approval.status == "approved" input.approval.approver_count >= 2 # 至少 2 人审批 } # 规则 5:数据库查询只允许只读操作 allow if { input.agent.role == "analyst" input.tool == "database_query" is_read_only_query(input.parameters.query) not contains_sensitive_table(input.parameters.query) } # 辅助函数 is_sensitive_path(path) if { sensitive_patterns := [".env", ".ssh", ".aws", ".key", ".pem"] some pattern in sensitive_patterns contains(path, pattern) } is_business_hours if { hour := time.clock(time.now_ns())[0] hour >= 9 hour < 18 } is_read_only_query(query) if { upper_query := upper(query) startswith(upper_query, "SELECT") not contains(upper_query, "INSERT") not contains(upper_query, "UPDATE") not contains(upper_query, "DELETE") not contains(upper_query, "DROP") not contains(upper_query, "ALTER") not contains(upper_query, "TRUNCATE") } sensitive_tables := ["users_pii", "payment_info", "credentials", "audit_log"] contains_sensitive_table(query) if { some table in sensitive_tables contains(lower(query), table) }
# opa_integration.py — OPA 集成示例 import aiohttp from typing import Any class OPAClient: """Open Policy Agent 客户端""" def __init__(self, opa_url: str = "http://localhost:8181"): self.opa_url = opa_url async def check_tool_access( self, agent_id: str, agent_role: str, tool: str, parameters: dict, context: dict | None = None, ) -> dict: """向 OPA 查询工具访问权限""" input_data = { "input": { "agent": { "id": agent_id, "role": agent_role, }, "tool": tool, "parameters": parameters, "context": context or {}, "timestamp": __import__("time").time(), } } async with aiohttp.ClientSession() as session: async with session.post( f"{self.opa_url}/v1/data/agent/toolaccess/allow", json=input_data, ) as resp: result = await resp.json() allowed = result.get("result", False) return { "allowed": allowed, "policy": "agent.toolaccess", "input": input_data["input"], } # 使用示例 async def main(): opa = OPAClient() result = await opa.check_tool_access( agent_id="coding-agent-001", agent_role="developer", tool="file_read", parameters={"path": "/workspace/project/src/main.py"}, ) print(result) # {"allowed": True, ...}

步骤 3:使用 Traefik Hub MCP Gateway 实现 TBAC

Traefik Hub 的 MCP Gateway 引入了 TBAC(Task-Based Access Control),从三个维度控制 Agent 的工具访问:任务(业务目标)、工具(系统访问)、事务(参数级约束)。

# traefik-mcp-gateway.yaml — Traefik Hub MCP Gateway TBAC 配置示例 apiVersion: hub.traefik.io/v1alpha1 kind: MCPGateway metadata: name: agent-gateway spec: # 任务定义:Agent 可以执行的业务任务 tasks: - name: "bug-fix" description: "修复代码缺陷" allowedTools: - toolRef: "file-reader" constraints: paths: ["/workspace/project/src/**"] - toolRef: "file-writer" constraints: paths: ["/workspace/project/src/**"] excludePaths: ["**/config/**", "**/.env"] - toolRef: "test-runner" constraints: commands: ["npm test", "cargo test"] - toolRef: "git-operations" constraints: operations: ["commit", "branch"] branchPattern: "fix/*" - name: "code-review" description: "代码审查(只读)" allowedTools: - toolRef: "file-reader" constraints: paths: ["/workspace/project/**"] - toolRef: "git-operations" constraints: operations: ["log", "diff", "blame"] # 注意:没有 commit/push 权限 - name: "deployment" description: "部署到生产环境" requiresApproval: true minApprovers: 2 allowedTools: - toolRef: "deploy-tool" constraints: environments: ["staging", "production"] requiresHealthCheck: true # 工具定义 tools: - name: "file-reader" mcpServer: "filesystem-mcp" toolName: "read_file" - name: "file-writer" mcpServer: "filesystem-mcp" toolName: "write_file" - name: "test-runner" mcpServer: "terminal-mcp" toolName: "execute_command" - name: "git-operations" mcpServer: "git-mcp" toolName: "git_*" # 身份验证:On-Behalf-Of 模式 authentication: mode: "on-behalf-of" # Agent 使用调用者的身份和权限 tokenPropagation: true # 审计日志 audit: enabled: true logLevel: "detailed" destination: "elasticsearch"

5. 权限提升防御

常见权限提升攻击模式

Agent 系统中的权限提升攻击比传统系统更隐蔽,因为攻击者可以通过语义层(自然语言)而非技术层发起攻击。

Agent 权限提升攻击路径 ┌──────────────────────────────────────────────────────────┐ │ 攻击模式 1:混淆代理人(Confused Deputy) │ │ │ │ 低权限用户 ──▶ 低权限 Agent ──▶ 高权限 Agent │ │ │ │ │ │ "请帮我查询 "Agent A 说需要 │ │ 所有用户数据" 查询用户数据" │ │ │ │ │ 高权限 Agent 未验证 │ │ 原始用户意图,直接执行 │ └──────────────────────────────────────────────────────────┘ ┌──────────────────────────────────────────────────────────┐ │ 攻击模式 2:渐进式权限提升 │ │ │ │ 步骤 1: "请读取 config.yaml" ✅ 允许(只读) │ │ 步骤 2: "请修改 config.yaml 的端口" ✅ 允许(写入) │ │ 步骤 3: "请修改 config.yaml 的权限设置" ⚠️ 应该拒绝 │ │ 步骤 4: "请重启服务使配置生效" ⚠️ 应该拒绝 │ │ │ │ 每一步看起来都合理,但组合起来实现了权限提升 │ └──────────────────────────────────────────────────────────┘ ┌──────────────────────────────────────────────────────────┐ │ 攻击模式 3:工具链利用 │ │ │ │ Agent 有权执行 "npm install" │ │ ↓ │ │ 安装包含 postinstall 脚本的恶意包 │ │ ↓ │ │ postinstall 脚本以 Agent 权限执行任意代码 │ │ ↓ │ │ 读取 .env、SSH 密钥、AWS 凭证 │ └──────────────────────────────────────────────────────────┘

防御措施实现

// privilege_escalation_defense.ts — 权限提升防御 interface ActionRecord { timestamp: number; tool: string; action: string; parameters: Record<string, unknown>; riskScore: number; } interface EscalationAlert { type: string; description: string; actions: ActionRecord[]; recommendedAction: 'block' | 'require_approval' | 'alert'; } class PrivilegeEscalationDetector { private actionHistory: ActionRecord[] = []; private readonly windowMs = 10 * 60 * 1000; // 10 分钟窗口 /** * 记录操作并检测权限提升模式 */ detectEscalation(action: ActionRecord): EscalationAlert | null { this.actionHistory.push(action); this.pruneOldActions(); // 检测模式 1:累积风险分数 const cumulativeRisk = this.actionHistory.reduce( (sum, a) => sum + a.riskScore, 0, ); if (cumulativeRisk > 10) { return { type: 'cumulative_risk', description: `会话累积风险分数 ${cumulativeRisk} 超过阈值 10`, actions: this.actionHistory.slice(-5), recommendedAction: 'require_approval', }; } // 检测模式 2:权限升级序列 const escalationPattern = this.detectEscalationSequence(); if (escalationPattern) { return escalationPattern; } // 检测模式 3:异常工具组合 const anomaly = this.detectAnomalousToolCombination(); if (anomaly) { return anomaly; } return null; } private detectEscalationSequence(): EscalationAlert | null { const recent = this.actionHistory.slice(-5); if (recent.length < 3) return null; // 检测:读取配置 → 修改配置 → 重启服务 const hasConfigRead = recent.some( (a) => a.tool === 'file_read' && /config/i.test(String(a.parameters.path)), ); const hasConfigWrite = recent.some( (a) => a.tool === 'file_write' && /config/i.test(String(a.parameters.path)), ); const hasRestart = recent.some( (a) => a.tool === 'terminal' && /restart|reload/i.test(String(a.parameters.command)), ); if (hasConfigRead && hasConfigWrite && hasRestart) { return { type: 'config_escalation', description: '检测到配置修改 + 服务重启序列,可能是权限提升尝试', actions: recent, recommendedAction: 'block', }; } return null; } private detectAnomalousToolCombination(): EscalationAlert | null { const recent = this.actionHistory.slice(-5); const toolSet = new Set(recent.map((a) => a.tool)); // 危险组合:文件读取 + 网络请求(可能是数据外泄) if (toolSet.has('file_read') && toolSet.has('http_request')) { const fileReads = recent.filter((a) => a.tool === 'file_read'); const httpRequests = recent.filter((a) => a.tool === 'http_request'); // 检查是否读取了敏感文件后发起外部请求 const readsSensitive = fileReads.some((a) => /\.(env|key|pem|credentials)/i.test(String(a.parameters.path)), ); if (readsSensitive) { return { type: 'data_exfiltration', description: '检测到敏感文件读取后发起网络请求,可能是数据外泄', actions: [...fileReads, ...httpRequests], recommendedAction: 'block', }; } } return null; } private pruneOldActions(): void { const cutoff = Date.now() - this.windowMs; this.actionHistory = this.actionHistory.filter( (a) => a.timestamp > cutoff, ); } }

实战案例:为多 Agent 系统构建完整权限控制

场景描述

一个电商平台部署了三个协作 Agent:

  • ProductAgent:管理商品信息(读写商品数据库)
  • OrderAgent:处理订单(读写订单数据库、调用支付 API)
  • SupportAgent:客户支持(读取用户信息、发送邮件)

需要设计一套权限控制方案,确保每个 Agent 只能访问其职责范围内的资源。

完整实现

# multi_agent_permission.py — 多 Agent 权限控制完整示例 from dataclasses import dataclass, field from enum import Enum from typing import Optional import time import json class AgentType(Enum): PRODUCT = "product_agent" ORDER = "order_agent" SUPPORT = "support_agent" @dataclass class PermissionPolicy: """Agent 权限策略""" agent_type: AgentType allowed_databases: dict[str, list[str]] # db_name -> [allowed_tables] allowed_apis: list[str] allowed_actions: dict[str, list[str]] # tool -> [actions] denied_fields: list[str] # 禁止访问的字段 requires_approval: list[str] # 需要审批的操作 rate_limits: dict[str, int] # tool -> max_calls_per_minute # 定义各 Agent 的权限策略 POLICIES: dict[AgentType, PermissionPolicy] = { AgentType.PRODUCT: PermissionPolicy( agent_type=AgentType.PRODUCT, allowed_databases={ "ecommerce": ["products", "categories", "inventory"], }, allowed_apis=["image_upload", "search_index"], allowed_actions={ "database": ["select", "insert", "update"], "file": ["read", "write"], "api": ["call"], }, denied_fields=["user_email", "user_phone", "payment_info"], requires_approval=["delete_product", "bulk_update"], rate_limits={"database": 60, "api": 30}, ), AgentType.ORDER: PermissionPolicy( agent_type=AgentType.ORDER, allowed_databases={ "ecommerce": ["orders", "order_items", "products"], # products 只读 }, allowed_apis=["payment_gateway", "shipping_api"], allowed_actions={ "database": ["select", "insert", "update"], "payment": ["charge", "refund"], "api": ["call"], }, denied_fields=["user_password", "user_ssn"], requires_approval=["refund", "cancel_order", "bulk_update"], rate_limits={"database": 100, "payment": 10}, ), AgentType.SUPPORT: PermissionPolicy( agent_type=AgentType.SUPPORT, allowed_databases={ "ecommerce": ["users", "orders", "tickets"], # 全部只读 }, allowed_apis=["email_service", "ticket_system"], allowed_actions={ "database": ["select"], # 只读! "email": ["send"], "ticket": ["create", "update", "close"], }, denied_fields=["user_password", "payment_info", "user_ssn"], requires_approval=["send_bulk_email", "close_ticket"], rate_limits={"database": 50, "email": 20}, ), } class MultiAgentPermissionGateway: """多 Agent 权限网关""" def __init__(self): self.policies = POLICIES self.audit_log: list[dict] = [] self.call_counts: dict[str, list[float]] = {} def authorize( self, agent_type: AgentType, tool: str, action: str, resource: str, parameters: dict, ) -> dict: """授权检查入口""" policy = self.policies.get(agent_type) if not policy: return self._deny(agent_type, tool, action, "未知的 Agent 类型") # 1. 检查工具权限 if tool not in policy.allowed_actions: return self._deny(agent_type, tool, action, f"无权使用工具: {tool}") # 2. 检查操作权限 if action not in policy.allowed_actions[tool]: return self._deny( agent_type, tool, action, f"无权执行操作: {tool}.{action}" ) # 3. 检查数据库表权限 if tool == "database": db_name = parameters.get("database", "") table = parameters.get("table", "") allowed_tables = policy.allowed_databases.get(db_name, []) if table not in allowed_tables: return self._deny( agent_type, tool, action, f"无权访问表: {db_name}.{table}" ) # 4. 检查字段过滤 if tool == "database" and "fields" in parameters: for field_name in parameters["fields"]: if field_name in policy.denied_fields: return self._deny( agent_type, tool, action, f"禁止访问字段: {field_name}" ) # 5. 检查 API 权限 if tool == "api": api_name = parameters.get("api_name", "") if api_name not in policy.allowed_apis: return self._deny( agent_type, tool, action, f"无权调用 API: {api_name}" ) # 6. 检查速率限制 rate_key = f"{agent_type.value}:{tool}" limit = policy.rate_limits.get(tool, 100) if not self._check_rate(rate_key, limit): return self._deny( agent_type, tool, action, f"超过速率限制: {limit}/分钟" ) # 7. 检查是否需要审批 action_key = f"{action}_{resource}" if resource else action if action_key in policy.requires_approval: self._audit(agent_type, tool, action, "requires_approval") return { "allowed": True, "requires_approval": True, "message": f"操作 {action_key} 需要人工审批", } self._audit(agent_type, tool, action, "allowed") return {"allowed": True, "requires_approval": False} def _check_rate(self, key: str, limit: int) -> bool: now = time.time() if key not in self.call_counts: self.call_counts[key] = [] self.call_counts[key] = [ t for t in self.call_counts[key] if t > now - 60 ] if len(self.call_counts[key]) >= limit: return False self.call_counts[key].append(now) return True def _deny(self, agent_type, tool, action, reason) -> dict: self._audit(agent_type, tool, action, f"denied: {reason}") return {"allowed": False, "requires_approval": False, "reason": reason} def _audit(self, agent_type, tool, action, result): self.audit_log.append({ "timestamp": time.time(), "agent": agent_type.value, "tool": tool, "action": action, "result": result, }) # ===== 使用示例 ===== gateway = MultiAgentPermissionGateway() # ProductAgent 读取商品 — 允许 r1 = gateway.authorize( AgentType.PRODUCT, "database", "select", "products", {"database": "ecommerce", "table": "products", "fields": ["name", "price"]}, ) print(f"ProductAgent 读取商品: {r1}") # {"allowed": True, ...} # SupportAgent 尝试写入订单 — 拒绝 r2 = gateway.authorize( AgentType.SUPPORT, "database", "update", "orders", {"database": "ecommerce", "table": "orders"}, ) print(f"SupportAgent 写入订单: {r2}") # {"allowed": False, "reason": "无权执行操作: database.update"} # OrderAgent 尝试退款 — 需要审批 r3 = gateway.authorize( AgentType.ORDER, "payment", "refund", "refund", {"order_id": "ORD-12345", "amount": 99.99}, ) print(f"OrderAgent 退款: {r3}") # {"allowed": True, "requires_approval": True, ...} # SupportAgent 尝试访问密码字段 — 拒绝 r4 = gateway.authorize( AgentType.SUPPORT, "database", "select", "users", {"database": "ecommerce", "table": "users", "fields": ["name", "user_password"]}, ) print(f"SupportAgent 访问密码: {r4}") # {"allowed": False, "reason": "禁止访问字段: user_password"}

案例分析

这个多 Agent 权限控制方案体现了几个关键设计决策:

  1. 职责分离:每个 Agent 只能访问其职责范围内的数据库表和 API。SupportAgent 只有数据库只读权限,无法修改任何数据。

  2. 字段级过滤:即使 Agent 有权访问某张表,也不能访问敏感字段(密码、支付信息、SSN)。这防止了 Agent 被诱导泄露敏感数据。

  3. 分级审批:常规操作自动执行,高风险操作(退款、批量更新、批量邮件)需要人工审批。

  4. 速率限制:防止 Agent 在被入侵后大量读取数据或发起请求。

  5. 完整审计:每次授权检查都记录日志,便于事后分析和异常检测。


避坑指南

❌ 常见错误

  1. 使用黑名单而非白名单

    • 问题:试图列出所有”不允许”的操作(如禁止 rm -rf /、禁止 curl),但攻击者总能找到绕过方式(如用 wget 替代 curl,用 perl -e 替代 python -c)。
    • 正确做法:采用白名单策略,只允许已知安全的操作。未在白名单中的操作默认拒绝(deny-by-default)。
  2. 沙箱配置使用默认参数

    • 问题:使用默认 Docker 配置运行 Agent,没有移除 capabilities、没有启用只读根文件系统、没有限制网络。默认 Docker 容器拥有大量不必要的权限。
    • 正确做法:使用 --cap-drop ALL 移除所有 Linux capabilities,启用 --read-only 只读根文件系统,设置 --network none 禁止网络(除非必要),限制 --pids-limit--memory
  3. Agent 间通信不验证身份

    • 问题:在多 Agent 系统中,Agent A 向 Agent B 发送请求时,B 直接信任并执行,不验证 A 的身份和权限。被入侵的低权限 Agent 可以冒充高权限 Agent。
    • 正确做法:Agent 间通信必须有签名验证。高权限 Agent 收到请求时,必须重新验证原始用户意图(On-Behalf-Of 模式),而非信任中间 Agent 的转述。
  4. 审批工作流超时后自动批准

    • 问题:为了”不影响用户体验”,将审批超时的默认行为设为”自动批准”。攻击者只需等待超时即可绕过审批。
    • 正确做法:审批超时的默认行为必须是”拒绝”(fail-closed)。如果审批者不可用,操作应该被阻止而非自动放行。
  5. 权限策略硬编码在代码中

    • 问题:将 Agent 权限规则直接写在应用代码中,修改权限需要重新部署。无法快速响应安全事件。
    • 正确做法:使用外部策略引擎(如 OPA)或声明式配置文件管理权限。权限变更应该能在不重启服务的情况下生效。支持紧急权限撤销。
  6. 忽视工具链的间接权限

    • 问题:Agent 有权执行 npm install,看起来只是安装依赖。但 npm 包的 postinstall 脚本可以执行任意代码,等于间接授予了 RCE 权限。
    • 正确做法:审计工具链的间接权限。npm install 应该在沙箱中执行,且使用 --ignore-scripts 禁止安装脚本。对所有包管理器操作进行安全审查。

✅ 最佳实践

  1. Deny-by-Default:所有权限默认拒绝,只显式授予必要的权限。这是最小权限原则的基础。
  2. 分层防御:不要依赖单一权限控制机制。结合 RBAC(角色)+ ABAC(属性)+ 沙箱(隔离)+ 审批(人工)构建多层防御。
  3. 权限定期审计:每月审查 Agent 权限配置,移除不再需要的权限。使用自动化工具检测权限漂移。
  4. 紧急撤销机制:建立一键撤销 Agent 所有权限的紧急机制,在检测到安全事件时立即生效。
  5. 权限即代码:将权限策略作为代码管理(Policy-as-Code),纳入版本控制和 CI/CD 流程,每次变更都有审查记录。

相关资源与延伸阅读


参考来源


📖 返回 总览与导航 | 上一节:22a-AI安全概览 | 下一节:22c-数据隐私与合规

Last updated on