Skip to Content

09d - Agent 记忆系统

本文是《AI Agent 实战手册》第 9 章第 4 节。 上一节:09c-Guardrails实现 | 下一节:09e-Agent架构模板

概述

LLM 本质上是无状态的——每次 API 调用之间不保留任何信息。Agent 记忆系统通过在 Agent 执行链路中引入持久化和检索机制,让 Agent 具备跨会话的连续性、个性化适应和经验学习能力。2025-2026 年,随着 Mem0、Letta(MemGPT)等专用记忆框架的成熟和向量数据库市场突破 22 亿美元规模,Agent 记忆已从”实验特性”演变为生产系统的核心基础设施。本节系统讲解四大记忆类型:短期记忆(上下文窗口)、工作记忆(草稿本)、长期记忆(向量存储)和情景记忆(经验回放),并提供可落地的实现模式。


1. 记忆系统架构全景

1.1 记忆层次模型

Agent 记忆系统借鉴人类认知科学的记忆分类,形成四层架构:

1.2 记忆生命周期

1.3 工具推荐

工具用途价格适用场景
Mem0 通用 Agent 记忆层开源免费;云版 $49/月起需要即插即用记忆能力的 Agent
Letta (MemGPT) 虚拟上下文管理开源免费;云版联系销售需要自主管理记忆的有状态 Agent
Pinecone 托管向量数据库免费层可用;$70/月起大规模生产级向量检索
Weaviate 开源向量数据库开源免费;云版 $25/月起需要混合搜索的 RAG 系统
ChromaDB 轻量向量数据库开源免费;云版即将推出原型开发和小规模项目
pgvector PostgreSQL 向量扩展免费(随 PostgreSQL)已有 PostgreSQL 基础设施的团队
LangChain Memory Agent 记忆中间件开源免费LangChain 生态的记忆管理
Redis 高速缓存 + 向量搜索开源免费;云版 $7/月起低延迟会话记忆和缓存
Graphlit 语义基础设施平台免费层可用;$99/月起多数据源的企业级记忆

2. 短期记忆(Short-term / Context Window)

短期记忆是 Agent 最基础的记忆形式——即 LLM 的上下文窗口。所有对话历史、系统提示和工具调用结果都存放在这个有限的窗口中。

2.1 上下文窗口限制

模型上下文窗口约等于输入价格 (每百万 Token)
GPT-4o128K tokens~96K 字$2.50
Claude 4 Sonnet200K tokens~150K 字$3.00
Gemini 2.5 Pro1M tokens~750K 字$1.25(≤200K)/ $2.50(>200K)
DeepSeek V3128K tokens~96K 字$0.27
Llama 4 Scout10M tokens~7.5M 字自托管成本

2.2 对话历史管理策略

策略 1:滑动窗口(Sliding Window)

保留最近 N 轮对话,丢弃更早的消息。简单但会丢失重要上下文。

策略 2:摘要压缩(Summary Compression)

当对话超过阈值时,用 LLM 将旧对话压缩为摘要。

策略 3:Token 预算管理(Token Budget)

为不同内容类型分配固定 Token 预算,动态调整。

2.3 实现模式

Python 实现——对话历史管理器:

from dataclasses import dataclass, field from typing import Literal import tiktoken @dataclass class Message: role: Literal["system", "user", "assistant", "tool"] content: str token_count: int = 0 timestamp: float = 0.0 importance: float = 0.5 # 0-1,用于优先级淘汰 def __post_init__(self): if self.token_count == 0: enc = tiktoken.encoding_for_model("gpt-4o") self.token_count = len(enc.encode(self.content)) class ShortTermMemory: """短期记忆管理器——上下文窗口管理""" def __init__( self, max_tokens: int = 120000, system_budget: int = 4000, summary_threshold: float = 0.8, # 80% 容量时触发压缩 ): self.max_tokens = max_tokens self.system_budget = system_budget self.summary_threshold = summary_threshold self.messages: list[Message] = [] self.summary: str | None = None # 压缩后的历史摘要 @property def total_tokens(self) -> int: return sum(m.token_count for m in self.messages) def add_message(self, message: Message): """添加消息,必要时触发压缩""" self.messages.append(message) # 检查是否需要压缩 if self.total_tokens > self.max_tokens * self.summary_threshold: self._compress_history() def _compress_history(self): """压缩旧对话为摘要""" # 保留系统消息和最近 10 轮对话 system_msgs = [m for m in self.messages if m.role == "system"] recent_msgs = [m for m in self.messages if m.role != "system"][-20:] old_msgs = [m for m in self.messages if m.role != "system"][:-20] if not old_msgs: return # 生成摘要(实际使用时调用 LLM) old_content = "\n".join( f"{m.role}: {m.content[:200]}" for m in old_msgs ) summary_prompt = f"""请将以下对话历史压缩为简洁摘要,保留关键信息: {old_content}""" # 这里应调用 LLM 生成摘要,示例用占位 self.summary = f"[历史摘要] 之前讨论了 {len(old_msgs)} 条消息的内容" # 重建消息列表 self.messages = system_msgs if self.summary: self.messages.append(Message( role="system", content=f"之前的对话摘要:{self.summary}", importance=0.8, )) self.messages.extend(recent_msgs) def get_context(self) -> list[dict]: """获取当前上下文(用于 LLM 调用)""" return [ {"role": m.role, "content": m.content} for m in self.messages ] def get_stats(self) -> dict: return { "message_count": len(self.messages), "total_tokens": self.total_tokens, "utilization": f"{self.total_tokens / self.max_tokens:.1%}", "has_summary": self.summary is not None, }

TypeScript 实现——滑动窗口 + Token 预算:

interface ChatMessage { role: "system" | "user" | "assistant" | "tool"; content: string; tokenCount: number; timestamp: number; } interface TokenBudget { system: number; history: number; tools: number; response: number; } class ShortTermMemory { private messages: ChatMessage[] = []; private maxTokens: number; private budget: TokenBudget; constructor( maxTokens: number = 120000, budget: TokenBudget = { system: 4000, history: 80000, tools: 16000, response: 20000, } ) { this.maxTokens = maxTokens; this.budget = budget; } addMessage(message: ChatMessage): void { this.messages.push(message); this.enforceTokenBudget(); } private enforceTokenBudget(): void { const historyMessages = this.messages.filter( (m) => m.role !== "system" ); let historyTokens = historyMessages.reduce( (sum, m) => sum + m.tokenCount, 0 ); // 从最旧的非系统消息开始淘汰 while (historyTokens > this.budget.history && historyMessages.length > 2) { const removed = historyMessages.shift()!; historyTokens -= removed.tokenCount; const idx = this.messages.indexOf(removed); if (idx !== -1) this.messages.splice(idx, 1); } } getContext(): Array<{ role: string; content: string }> { return this.messages.map(({ role, content }) => ({ role, content })); } getTokenUsage(): { total: number; byRole: Record<string, number>; utilization: string; } { const byRole: Record<string, number> = {}; let total = 0; for (const msg of this.messages) { byRole[msg.role] = (byRole[msg.role] ?? 0) + msg.tokenCount; total += msg.tokenCount; } return { total, byRole, utilization: `${((total / this.maxTokens) * 100).toFixed(1)}%`, }; } }

2.4 提示词模板

你是一个具有记忆管理能力的 AI 助手。 ## 上下文管理规则 1. 当对话超过 [TOKEN_LIMIT] tokens 时,自动压缩早期对话 2. 保留所有包含 [关键决策/用户偏好/任务目标] 的消息 3. 工具调用结果只保留最终输出,丢弃中间步骤 4. 系统提示词始终保留在上下文最前面 ## 之前的对话摘要 [CONVERSATION_SUMMARY] ## 当前任务 [CURRENT_TASK]

3. 工作记忆(Working Memory / Scratchpad)

工作记忆是 Agent 的”草稿本”——用于存储当前任务的中间状态、推理步骤和临时数据。它比短期记忆更结构化,比长期记忆更短暂。

3.1 工作记忆的作用

3.2 Letta (MemGPT) 的记忆块设计

Letta 框架引入了”记忆块”(Memory Blocks)概念——将上下文窗口划分为功能性区块,Agent 可以主动编辑这些区块:

记忆块用途持久性大小限制
Core Memory - Human用户信息和偏好跨会话持久可配置
Core Memory - PersonaAgent 角色和行为规则跨会话持久可配置
Recall Memory最近对话历史会话内自动管理
Archival Memory长期知识存储永久无限(外部存储)

3.3 实现模式

Python 实现——Agent 工作记忆草稿本:

from dataclasses import dataclass, field from typing import Any from datetime import datetime import json @dataclass class ScratchpadEntry: key: str value: Any entry_type: str # "plan", "result", "state", "hypothesis" created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) ttl_seconds: int | None = None # 自动过期时间 def is_expired(self) -> bool: if self.ttl_seconds is None: return False elapsed = (datetime.now() - self.updated_at).total_seconds() return elapsed > self.ttl_seconds @dataclass class TaskPlan: goal: str steps: list[str] current_step: int = 0 status: str = "in_progress" # in_progress, completed, failed class WorkingMemory: """Agent 工作记忆——任务执行期间的草稿本""" def __init__(self, max_entries: int = 100): self.max_entries = max_entries self._store: dict[str, ScratchpadEntry] = {} self._plan: TaskPlan | None = None # ---- 计划管理 ---- def set_plan(self, goal: str, steps: list[str]): """设置当前任务计划""" self._plan = TaskPlan(goal=goal, steps=steps) self.write("current_plan", { "goal": goal, "steps": steps, "current_step": 0, }, entry_type="plan") def advance_plan(self) -> str | None: """推进到下一步""" if self._plan and self._plan.current_step < len(self._plan.steps): step = self._plan.steps[self._plan.current_step] self._plan.current_step += 1 return step return None # ---- 读写操作 ---- def write( self, key: str, value: Any, entry_type: str = "state", ttl_seconds: int | None = None, ): """写入工作记忆""" self._store[key] = ScratchpadEntry( key=key, value=value, entry_type=entry_type, ttl_seconds=ttl_seconds, ) self._evict_if_needed() def read(self, key: str) -> Any | None: """读取工作记忆""" entry = self._store.get(key) if entry is None or entry.is_expired(): if entry and entry.is_expired(): del self._store[key] return None return entry.value def delete(self, key: str): """删除条目""" self._store.pop(key, None) # ---- 中间结果管理 ---- def store_result(self, step_name: str, result: Any): """存储工具调用或推理的中间结果""" self.write( f"result_{step_name}", result, entry_type="result", ttl_seconds=3600, # 1 小时后过期 ) def get_all_results(self) -> dict[str, Any]: """获取所有中间结果""" return { k: v.value for k, v in self._store.items() if v.entry_type == "result" and not v.is_expired() } # ---- 序列化(注入上下文窗口)---- def to_context_string(self) -> str: """将工作记忆序列化为可注入上下文的字符串""" sections = [] if self._plan: sections.append( f"## 当前计划\n" f"目标: {self._plan.goal}\n" f"进度: {self._plan.current_step}/{len(self._plan.steps)}\n" f"当前步骤: {self._plan.steps[self._plan.current_step] if self._plan.current_step < len(self._plan.steps) else '已完成'}" ) results = self.get_all_results() if results: sections.append( "## 中间结果\n" + "\n".join(f"- {k}: {json.dumps(v, ensure_ascii=False)[:200]}" for k, v in results.items()) ) states = { k: v.value for k, v in self._store.items() if v.entry_type == "state" and not v.is_expired() } if states: sections.append( "## 任务状态\n" + "\n".join(f"- {k}: {v}" for k, v in states.items()) ) return "\n\n".join(sections) def _evict_if_needed(self): """淘汰过期和超量条目""" # 先清理过期条目 expired = [k for k, v in self._store.items() if v.is_expired()] for k in expired: del self._store[k] # 超量时按创建时间淘汰 if len(self._store) > self.max_entries: sorted_entries = sorted( self._store.items(), key=lambda x: x[1].created_at, ) to_remove = len(self._store) - self.max_entries for k, _ in sorted_entries[:to_remove]: del self._store[k] # 使用示例 memory = WorkingMemory() # 设置任务计划 memory.set_plan( goal="分析用户上传的 CSV 文件并生成报告", steps=[ "读取 CSV 文件", "数据清洗和验证", "统计分析", "生成可视化图表", "输出报告", ] ) # 执行步骤并存储中间结果 step = memory.advance_plan() # "读取 CSV 文件" memory.store_result("csv_read", {"rows": 1500, "columns": 12}) step = memory.advance_plan() # "数据清洗和验证" memory.store_result("data_clean", {"removed_rows": 23, "valid_rows": 1477}) # 注入上下文 context = memory.to_context_string() print(context)

TypeScript 实现——结构化工作记忆:

interface PlanStep { name: string; status: "pending" | "in_progress" | "completed" | "failed"; result?: unknown; } interface WorkingMemoryState { plan: { goal: string; steps: PlanStep[]; currentStep: number; } | null; scratchpad: Map<string, { value: unknown; expiresAt?: number }>; hypotheses: Array<{ claim: string; confidence: number; evidence: string[] }>; } class AgentWorkingMemory { private state: WorkingMemoryState = { plan: null, scratchpad: new Map(), hypotheses: [], }; // ---- 计划管理 ---- setPlan(goal: string, stepNames: string[]): void { this.state.plan = { goal, steps: stepNames.map((name) => ({ name, status: "pending" })), currentStep: 0, }; } advanceStep(result?: unknown): string | null { const plan = this.state.plan; if (!plan || plan.currentStep >= plan.steps.length) return null; const current = plan.steps[plan.currentStep]; current.status = "completed"; current.result = result; plan.currentStep++; const next = plan.steps[plan.currentStep]; if (next) { next.status = "in_progress"; return next.name; } return null; } // ---- 草稿本操作 ---- write(key: string, value: unknown, ttlMs?: number): void { this.state.scratchpad.set(key, { value, expiresAt: ttlMs ? Date.now() + ttlMs : undefined, }); } read(key: string): unknown | null { const entry = this.state.scratchpad.get(key); if (!entry) return null; if (entry.expiresAt && Date.now() > entry.expiresAt) { this.state.scratchpad.delete(key); return null; } return entry.value; } // ---- 假设管理(用于推理链)---- addHypothesis(claim: string, confidence: number, evidence: string[]): void { this.state.hypotheses.push({ claim, confidence, evidence }); } getBestHypothesis(): { claim: string; confidence: number } | null { if (this.state.hypotheses.length === 0) return null; return this.state.hypotheses.reduce((best, h) => h.confidence > best.confidence ? h : best ); } // ---- 序列化 ---- toContextString(): string { const parts: string[] = []; if (this.state.plan) { const { goal, steps, currentStep } = this.state.plan; parts.push( `## 当前计划\n目标: ${goal}\n进度: ${currentStep}/${steps.length}\n` + steps.map((s, i) => `${i === currentStep ? "→" : " "} [${s.status}] ${s.name}` ).join("\n") ); } if (this.state.scratchpad.size > 0) { const entries = Array.from(this.state.scratchpad.entries()) .filter(([, v]) => !v.expiresAt || Date.now() <= v.expiresAt) .map(([k, v]) => `- ${k}: ${JSON.stringify(v.value).slice(0, 200)}`); if (entries.length) { parts.push(`## 草稿本\n${entries.join("\n")}`); } } return parts.join("\n\n"); } }

4. 长期记忆(Long-term / Vector Store)

长期记忆是 Agent 的”永久知识库”——通过向量数据库和嵌入模型将信息持久化存储,支持语义检索。这是 Agent 实现跨会话知识积累的核心机制。

4.1 向量存储架构

4.2 向量数据库对比

数据库类型价格最大向量数混合搜索自托管适用场景
Pinecone托管 SaaS免费层 2GB;$70/月起数十亿大规模生产,零运维
Weaviate开源 + 云开源免费;云 $25/月起数十亿需要混合搜索和多模态
ChromaDB开源免费数百万原型开发,本地实验
pgvectorPG 扩展免费(随 PostgreSQL)数千万✅(配合全文搜索)已有 PG 基础设施
Qdrant开源 + 云开源免费;云 $25/月起数十亿高性能过滤检索
Milvus开源 + 云开源免费;Zilliz 云 $65/月起数百亿超大规模企业部署

4.3 嵌入模型对比

模型维度价格 (每百万 Token)MTEB 排名适用场景
OpenAI text-embedding-3-large3072$0.13通用高质量嵌入
OpenAI text-embedding-3-small1536$0.02成本敏感场景
Voyage-3.52048$0.06极高性价比最优选择
Cohere Embed v31024$0.10多语言场景
BGE-M3 (开源)1024自托管成本自托管,数据隐私
Gemini text-embedding-004768$0.006中高Google 生态,极低成本

4.4 实现模式

Python 实现——基于 ChromaDB 的长期记忆:

import chromadb from chromadb.config import Settings from datetime import datetime from typing import Any import json import hashlib class LongTermMemory: """基于向量数据库的长期记忆系统""" def __init__( self, collection_name: str = "agent_memory", persist_directory: str = "./memory_store", ): self.client = chromadb.PersistentClient( path=persist_directory, settings=Settings(anonymized_telemetry=False), ) self.collection = self.client.get_or_create_collection( name=collection_name, metadata={"hnsw:space": "cosine"}, ) def store( self, content: str, metadata: dict[str, Any] | None = None, memory_type: str = "knowledge", ) -> str: """存储记忆到向量数据库""" doc_id = hashlib.md5(content.encode()).hexdigest()[:16] meta = { "memory_type": memory_type, "created_at": datetime.now().isoformat(), "content_length": len(content), **(metadata or {}), } self.collection.upsert( ids=[doc_id], documents=[content], metadatas=[meta], ) return doc_id def retrieve( self, query: str, n_results: int = 5, memory_type: str | None = None, min_relevance: float = 0.7, ) -> list[dict]: """语义检索相关记忆""" where_filter = None if memory_type: where_filter = {"memory_type": memory_type} results = self.collection.query( query_texts=[query], n_results=n_results, where=where_filter, include=["documents", "metadatas", "distances"], ) memories = [] for i, doc in enumerate(results["documents"][0]): distance = results["distances"][0][i] relevance = 1 - distance # cosine distance → similarity if relevance >= min_relevance: memories.append({ "content": doc, "metadata": results["metadatas"][0][i], "relevance": round(relevance, 3), }) return memories def forget(self, doc_id: str): """删除指定记忆""" self.collection.delete(ids=[doc_id]) def search_by_metadata( self, filters: dict[str, Any], limit: int = 10, ) -> list[dict]: """按元数据过滤检索""" results = self.collection.get( where=filters, limit=limit, include=["documents", "metadatas"], ) return [ {"content": doc, "metadata": meta} for doc, meta in zip(results["documents"], results["metadatas"]) ] def get_stats(self) -> dict: return { "total_memories": self.collection.count(), "collection_name": self.collection.name, } # 使用示例 ltm = LongTermMemory() # 存储用户偏好 ltm.store( "用户偏好使用 TypeScript 而非 JavaScript,喜欢函数式编程风格", metadata={"user_id": "user_001", "category": "preference"}, memory_type="user_profile", ) # 存储项目知识 ltm.store( "项目使用 Next.js 14 + tRPC + Prisma 技术栈,部署在 Vercel 上", metadata={"project": "my-saas", "category": "tech_stack"}, memory_type="knowledge", ) # 检索相关记忆 results = ltm.retrieve("用户喜欢什么编程语言?", n_results=3) for r in results: print(f"[{r['relevance']}] {r['content']}")

TypeScript 实现——基于 pgvector 的长期记忆:

import { Pool } from "pg"; import OpenAI from "openai"; interface MemoryRecord { id: string; content: string; embedding?: number[]; memoryType: string; metadata: Record<string, unknown>; relevance?: number; createdAt: Date; } class PgVectorLongTermMemory { private pool: Pool; private openai: OpenAI; private tableName: string; constructor(connectionString: string, tableName = "agent_memories") { this.pool = new Pool({ connectionString }); this.openai = new OpenAI(); this.tableName = tableName; } async initialize(): Promise<void> { await this.pool.query("CREATE EXTENSION IF NOT EXISTS vector"); await this.pool.query(` CREATE TABLE IF NOT EXISTS ${this.tableName} ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), content TEXT NOT NULL, embedding vector(1536), memory_type VARCHAR(50) NOT NULL DEFAULT 'knowledge', metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) `); await this.pool.query(` CREATE INDEX IF NOT EXISTS idx_${this.tableName}_embedding ON ${this.tableName} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100) `); } private async embed(text: string): Promise<number[]> { const response = await this.openai.embeddings.create({ model: "text-embedding-3-small", input: text, }); return response.data[0].embedding; } async store( content: string, memoryType = "knowledge", metadata: Record<string, unknown> = {} ): Promise<string> { const embedding = await this.embed(content); const result = await this.pool.query( `INSERT INTO ${this.tableName} (content, embedding, memory_type, metadata) VALUES ($1, $2::vector, $3, $4) RETURNING id`, [content, JSON.stringify(embedding), memoryType, JSON.stringify(metadata)] ); return result.rows[0].id; } async retrieve( query: string, limit = 5, memoryType?: string, minRelevance = 0.7 ): Promise<MemoryRecord[]> { const queryEmbedding = await this.embed(query); const typeFilter = memoryType ? "AND memory_type = $3" : ""; const params: unknown[] = [ JSON.stringify(queryEmbedding), limit, ]; if (memoryType) params.push(memoryType); const result = await this.pool.query( `SELECT id, content, memory_type, metadata, created_at, 1 - (embedding <=> $1::vector) AS relevance FROM ${this.tableName} WHERE 1 - (embedding <=> $1::vector) >= ${minRelevance} ${typeFilter} ORDER BY embedding <=> $1::vector LIMIT $2`, params ); return result.rows.map((row) => ({ id: row.id, content: row.content, memoryType: row.memory_type, metadata: row.metadata, relevance: parseFloat(row.relevance), createdAt: row.created_at, })); } async forget(id: string): Promise<void> { await this.pool.query( `DELETE FROM ${this.tableName} WHERE id = $1`, [id] ); } }

4.5 使用 Mem0 实现智能记忆层

Mem0 提供了一个开箱即用的 Agent 记忆层,自动处理记忆的提取、去重、更新和检索:

from mem0 import Memory # 初始化 Mem0 config = { "llm": { "provider": "openai", "config": {"model": "gpt-4o-mini"}, }, "vector_store": { "provider": "chroma", "config": {"collection_name": "agent_memory"}, }, } memory = Memory.from_config(config) # 添加记忆(Mem0 自动提取关键信息) memory.add( "我正在开发一个 SaaS 产品,使用 Next.js 和 Supabase。" "我偏好 TypeScript 和函数式编程风格。" "项目预算有限,需要控制 API 调用成本。", user_id="developer_001", ) # 检索相关记忆 results = memory.search( "推荐什么技术栈?", user_id="developer_001", ) for r in results: print(f"[{r['score']:.2f}] {r['memory']}") # 获取用户所有记忆 all_memories = memory.get_all(user_id="developer_001") for m in all_memories: print(f"- {m['memory']}") # 更新记忆(Mem0 自动处理冲突和去重) memory.add( "我决定从 Supabase 切换到 PlanetScale 作为数据库。", user_id="developer_001", )

4.6 提示词模板

你是一个具有长期记忆的 AI 助手。在回答问题前,请先检索相关记忆。 ## 记忆检索结果 以下是与当前问题相关的历史记忆(按相关度排序): [RETRIEVED_MEMORIES] ## 使用规则 1. 优先使用记忆中的信息回答问题 2. 如果记忆中的信息与当前上下文矛盾,以最新信息为准 3. 如果记忆不足以回答问题,明确告知用户并提供你的最佳推测 4. 回答后,判断是否需要将新信息存入长期记忆 ## 记忆更新判断 回答完成后,请判断: - 用户是否提供了新的偏好或需求?→ 存储为 user_profile - 是否产生了新的项目知识?→ 存储为 knowledge - 是否有值得记录的决策?→ 存储为 decision

5. 情景记忆(Episodic Memory)

情景记忆记录 Agent 的”经历”——过去的交互会话、成功和失败的操作、以及从中学到的经验。它让 Agent 能够从历史中学习,避免重复犯错,并在类似场景中做出更好的决策。

5.1 情景记忆与其他记忆的区别

维度短期记忆长期记忆情景记忆
存储内容当前对话事实和知识具体经历和事件
时间范围当前会话永久永久(带时间戳)
检索方式顺序读取语义相似度时间 + 语义 + 情境
类比”我刚才说了什么""我知道什么""我经历过什么”
用途维持对话连贯知识问答经验学习、模式识别

5.2 情景记忆架构

5.3 实现模式

Python 实现——情景记忆系统:

from dataclasses import dataclass, field from datetime import datetime from typing import Any import json @dataclass class Episode: """一次完整的 Agent 交互情景""" episode_id: str timestamp: datetime task_description: str actions_taken: list[dict] # 执行的操作序列 outcome: str # "success", "partial", "failure" outcome_details: str lessons_learned: list[str] user_feedback: str | None = None context_tags: list[str] = field(default_factory=list) duration_seconds: float = 0.0 def to_summary(self) -> str: """生成情景摘要(用于注入上下文)""" action_summary = ", ".join( a.get("tool", "unknown") for a in self.actions_taken[:5] ) return ( f"[{self.timestamp.strftime('%Y-%m-%d')}] " f"任务: {self.task_description[:100]} | " f"操作: {action_summary} | " f"结果: {self.outcome} | " f"教训: {'; '.join(self.lessons_learned[:3])}" ) class EpisodicMemory: """情景记忆系统——记录和检索 Agent 的历史经历""" def __init__(self, long_term_memory=None): self.episodes: list[Episode] = [] self.ltm = long_term_memory # 可选:连接长期记忆做语义检索 self._pattern_cache: dict[str, list[str]] = {} def record_episode( self, task_description: str, actions: list[dict], outcome: str, outcome_details: str, lessons: list[str] | None = None, tags: list[str] | None = None, ) -> Episode: """记录一次完整的交互情景""" episode = Episode( episode_id=f"ep_{len(self.episodes):04d}", timestamp=datetime.now(), task_description=task_description, actions_taken=actions, outcome=outcome, outcome_details=outcome_details, lessons_learned=lessons or [], context_tags=tags or [], ) self.episodes.append(episode) # 同步到长期记忆(如果可用) if self.ltm: self.ltm.store( episode.to_summary(), metadata={ "episode_id": episode.episode_id, "outcome": outcome, "tags": json.dumps(tags or []), }, memory_type="episode", ) return episode def recall_similar( self, current_task: str, n_results: int = 3, outcome_filter: str | None = None, ) -> list[Episode]: """检索与当前任务相似的历史情景""" if self.ltm: # 使用向量检索 results = self.ltm.retrieve( current_task, n_results=n_results, memory_type="episode", ) episode_ids = [ r["metadata"].get("episode_id") for r in results ] return [ ep for ep in self.episodes if ep.episode_id in episode_ids and (outcome_filter is None or ep.outcome == outcome_filter) ] else: # 简单的关键词匹配回退 scored = [] for ep in self.episodes: if outcome_filter and ep.outcome != outcome_filter: continue # 简单相关度:共同词数 task_words = set(current_task.lower().split()) ep_words = set(ep.task_description.lower().split()) overlap = len(task_words & ep_words) if overlap > 0: scored.append((overlap, ep)) scored.sort(key=lambda x: x[0], reverse=True) return [ep for _, ep in scored[:n_results]] def get_lessons_for_task(self, task_description: str) -> list[str]: """获取与当前任务相关的历史教训""" similar = self.recall_similar(task_description, n_results=5) lessons = [] for ep in similar: lessons.extend(ep.lessons_learned) # 去重 return list(dict.fromkeys(lessons)) def get_success_patterns(self) -> dict[str, int]: """分析成功模式——哪些操作组合最常导致成功""" patterns: dict[str, int] = {} for ep in self.episodes: if ep.outcome == "success": tools_used = tuple( sorted(set(a.get("tool", "") for a in ep.actions_taken)) ) key = " + ".join(tools_used) patterns[key] = patterns.get(key, 0) + 1 return dict(sorted(patterns.items(), key=lambda x: x[1], reverse=True)) def to_context_string(self, current_task: str, max_episodes: int = 3) -> str: """生成可注入上下文的情景记忆摘要""" similar = self.recall_similar(current_task, n_results=max_episodes) if not similar: return "## 相关历史经验\n暂无相关历史经验。" lines = ["## 相关历史经验"] for ep in similar: lines.append(f"\n### {ep.episode_id} ({ep.outcome})") lines.append(f"任务: {ep.task_description[:150]}") lines.append(f"结果: {ep.outcome_details[:150]}") if ep.lessons_learned: lines.append("教训:") for lesson in ep.lessons_learned[:3]: lines.append(f" - {lesson}") return "\n".join(lines)

TypeScript 实现——情景记忆与经验回放:

interface AgentAction { tool: string; input: Record<string, unknown>; output: string; success: boolean; durationMs: number; } interface Episode { id: string; timestamp: Date; taskDescription: string; actions: AgentAction[]; outcome: "success" | "partial" | "failure"; outcomeDetails: string; lessonsLearned: string[]; tags: string[]; } class EpisodicMemory { private episodes: Episode[] = []; private maxEpisodes: number; constructor(maxEpisodes = 1000) { this.maxEpisodes = maxEpisodes; } record(params: { taskDescription: string; actions: AgentAction[]; outcome: Episode["outcome"]; outcomeDetails: string; lessons?: string[]; tags?: string[]; }): Episode { const episode: Episode = { id: `ep_${this.episodes.length.toString().padStart(4, "0")}`, timestamp: new Date(), taskDescription: params.taskDescription, actions: params.actions, outcome: params.outcome, outcomeDetails: params.outcomeDetails, lessonsLearned: params.lessons ?? [], tags: params.tags ?? [], }; this.episodes.push(episode); // 超量时淘汰最旧的失败情景 if (this.episodes.length > this.maxEpisodes) { const failIdx = this.episodes.findIndex((e) => e.outcome === "failure"); if (failIdx !== -1) { this.episodes.splice(failIdx, 1); } else { this.episodes.shift(); } } return episode; } recallByOutcome(outcome: Episode["outcome"], limit = 5): Episode[] { return this.episodes .filter((e) => e.outcome === outcome) .slice(-limit); } getFailurePatterns(): Array<{ tool: string; failCount: number; commonErrors: string[] }> { const toolFailures = new Map<string, { count: number; errors: string[] }>(); for (const ep of this.episodes) { if (ep.outcome === "failure") { for (const action of ep.actions) { if (!action.success) { const existing = toolFailures.get(action.tool) ?? { count: 0, errors: [] }; existing.count++; existing.errors.push(action.output.slice(0, 100)); toolFailures.set(action.tool, existing); } } } } return Array.from(toolFailures.entries()) .map(([tool, data]) => ({ tool, failCount: data.count, commonErrors: [...new Set(data.errors)].slice(0, 3), })) .sort((a, b) => b.failCount - a.failCount); } generateReflection(): string { const total = this.episodes.length; if (total === 0) return "暂无历史经验可供反思。"; const successRate = this.episodes.filter((e) => e.outcome === "success").length / total; const allLessons = this.episodes.flatMap((e) => e.lessonsLearned); const uniqueLessons = [...new Set(allLessons)].slice(0, 5); const failPatterns = this.getFailurePatterns().slice(0, 3); return [ `## Agent 经验反思`, `总交互次数: ${total},成功率: ${(successRate * 100).toFixed(1)}%`, ``, `### 关键教训`, ...uniqueLessons.map((l) => `- ${l}`), ``, `### 常见失败模式`, ...failPatterns.map( (p) => `- ${p.tool}: 失败 ${p.failCount} 次 (${p.commonErrors[0] ?? "未知错误"})` ), ].join("\n"); } }

5.4 Generative Agents 的反思机制

斯坦福 Generative Agents 论文提出了一种经典的情景记忆反思机制:Agent 定期回顾近期经历,提取高层次的洞察和行为模式。

反思流程:

提示词模板——反思生成:

基于以下近期经历,请提取 3-5 条高层次洞察: ## 近期经历 [RECENT_EPISODES] ## 反思要求 1. 识别重复出现的模式(成功模式和失败模式) 2. 提取可泛化的经验教训 3. 发现用户偏好的变化趋势 4. 建议未来行为的调整方向 请以 JSON 格式输出: { "insights": [ { "observation": "观察到的模式", "lesson": "从中学到的教训", "action": "建议的行为调整", "confidence": 0.0-1.0 } ] }

6. 记忆系统集成架构

6.1 四层记忆协同工作

将四种记忆类型整合为统一的 Agent 记忆系统:

6.2 统一记忆管理器实现

Python 实现——统一记忆管理器:

from dataclasses import dataclass from typing import Any @dataclass class MemoryQuery: text: str memory_types: list[str] | None = None # None = 搜索所有类型 max_results: int = 5 min_relevance: float = 0.6 @dataclass class MemoryResult: content: str source: str # "short_term", "working", "long_term", "episodic" relevance: float metadata: dict[str, Any] class UnifiedMemoryManager: """统一记忆管理器——协调四层记忆""" def __init__( self, short_term: "ShortTermMemory", working: "WorkingMemory", long_term: "LongTermMemory", episodic: "EpisodicMemory", ): self.short_term = short_term self.working = working self.long_term = long_term self.episodic = episodic def build_context(self, user_query: str) -> str: """构建完整的记忆增强上下文""" sections = [] # 1. 工作记忆(当前任务状态) working_ctx = self.working.to_context_string() if working_ctx: sections.append(working_ctx) # 2. 长期记忆检索 ltm_results = self.long_term.retrieve( user_query, n_results=3, min_relevance=0.7 ) if ltm_results: sections.append( "## 相关知识\n" + "\n".join(f"- {r['content'][:200]}" for r in ltm_results) ) # 3. 情景记忆检索 episodic_ctx = self.episodic.to_context_string( user_query, max_episodes=2 ) if "暂无" not in episodic_ctx: sections.append(episodic_ctx) return "\n\n".join(sections) def on_session_end(self, session_summary: dict): """会话结束时的记忆巩固""" # 1. 将重要对话内容存入长期记忆 if session_summary.get("key_decisions"): for decision in session_summary["key_decisions"]: self.long_term.store( decision, metadata={"type": "decision"}, memory_type="knowledge", ) # 2. 记录情景 self.episodic.record_episode( task_description=session_summary.get("task", ""), actions=session_summary.get("actions", []), outcome=session_summary.get("outcome", "unknown"), outcome_details=session_summary.get("details", ""), lessons=session_summary.get("lessons", []), ) # 3. 清理工作记忆 self.working = WorkingMemory() def search(self, query: MemoryQuery) -> list[MemoryResult]: """跨记忆层统一搜索""" results: list[MemoryResult] = [] types = query.memory_types or [ "long_term", "episodic" ] if "long_term" in types: for r in self.long_term.retrieve( query.text, n_results=query.max_results ): results.append(MemoryResult( content=r["content"], source="long_term", relevance=r["relevance"], metadata=r.get("metadata", {}), )) if "episodic" in types: episodes = self.episodic.recall_similar( query.text, n_results=query.max_results ) for ep in episodes: results.append(MemoryResult( content=ep.to_summary(), source="episodic", relevance=0.8, # 情景记忆默认高相关度 metadata={"episode_id": ep.episode_id}, )) # 按相关度排序 results.sort(key=lambda r: r.relevance, reverse=True) return results[:query.max_results]

6.3 记忆框架对比

框架记忆类型自主编辑向量存储图存储价格适用场景
Mem0短期 + 长期 + 语义✅ 自动提取✅ 内置✅ 可选开源免费;云 $49/月起快速集成,通用 Agent
Letta (MemGPT)核心 + 回忆 + 归档✅ Agent 自主✅ 内置开源免费;云版联系销售有状态 Agent,虚拟上下文
LangChain Memory缓冲 + 摘要 + 向量❌ 被动✅ 可配置开源免费LangChain 生态集成
Graphlit语义 + 情景 + 知识✅ 自动✅ 内置✅ 知识图谱免费层;$99/月起企业级多数据源
Zep会话 + 事实 + 图✅ 自动提取✅ 内置✅ 知识图谱开源免费;云版联系销售对话密集型 Agent

实战案例:构建具有完整记忆系统的编码助手

场景描述

构建一个编码助手 Agent,需要:

  • 记住用户的技术栈偏好和编码风格
  • 在多次会话中积累项目知识
  • 从过去的代码审查中学习常见问题
  • 在当前任务中利用历史经验

架构设计

关键实现

class CodingAssistantMemory: """编码助手的记忆系统""" def __init__(self): self.short_term = ShortTermMemory(max_tokens=120000) self.working = WorkingMemory() self.long_term = LongTermMemory(collection_name="coding_assistant") self.episodic = EpisodicMemory(long_term_memory=self.long_term) def on_new_session(self, user_id: str): """新会话开始——加载用户画像""" # 检索用户偏好 prefs = self.long_term.retrieve( f"user:{user_id} 编程偏好和技术栈", memory_type="user_profile", n_results=5, ) if prefs: pref_text = "\n".join(p["content"] for p in prefs) self.short_term.add_message(Message( role="system", content=f"用户偏好:\n{pref_text}", importance=0.9, )) def on_code_review(self, code: str, feedback: str, outcome: str): """代码审查完成——记录经验""" self.episodic.record_episode( task_description=f"代码审查: {code[:100]}...", actions=[{"tool": "code_review", "input": code[:200]}], outcome=outcome, outcome_details=feedback, lessons=[feedback] if outcome == "failure" else [], tags=["code_review"], ) def get_review_hints(self, code_snippet: str) -> list[str]: """基于历史经验获取代码审查提示""" return self.episodic.get_lessons_for_task( f"代码审查: {code_snippet[:200]}" )

案例分析

  1. 冷启动问题:新用户首次使用时没有历史记忆,Agent 需要通过主动提问快速建立用户画像
  2. 记忆冲突:用户技术栈变更时(如从 JavaScript 切换到 TypeScript),需要更新而非追加记忆
  3. 隐私边界:代码内容可能包含敏感信息,记忆存储前需要脱敏处理
  4. 成本控制:每次检索都消耗嵌入 API 调用,需要设置检索频率限制和缓存策略

避坑指南

❌ 常见错误

  1. 无限增长的记忆库

    • 问题:只存不删,向量数据库持续膨胀,检索质量下降,成本失控
    • 正确做法:实施记忆生命周期管理——设置 TTL、定期清理低价值记忆、合并重复条目。建议每月审计记忆库大小和检索命中率
  2. 过度依赖语义检索

    • 问题:所有记忆都通过向量相似度检索,忽略了时间顺序和精确匹配的需求
    • 正确做法:采用混合检索策略——语义搜索 + 关键词匹配 + 时间衰减。对于精确信息(API Key、配置值)使用元数据过滤而非语义搜索
  3. 幻觉写入(Hallucinated Writes)

    • 问题:LLM 在提取记忆时产生幻觉,将错误信息写入长期记忆,后续检索时放大错误
    • 正确做法:对写入长期记忆的内容进行验证——交叉检查来源、设置置信度阈值、关键记忆需要人工确认
  4. 上下文窗口塞满记忆

    • 问题:检索到大量相关记忆后全部注入上下文,挤占了用户输入和推理空间
    • 正确做法:严格控制记忆注入的 Token 预算(建议不超过总上下文的 20%),按相关度截断,摘要压缩长记忆
  5. 忽略记忆隐私

    • 问题:Agent 记忆中存储了用户的敏感信息(密码、密钥、个人数据),没有加密或访问控制
    • 正确做法:记忆存储前进行 PII 检测和脱敏,敏感记忆加密存储,实施用户级别的记忆隔离和删除权(GDPR “被遗忘权”)
  6. 检索漂移(Retrieval Drift)

    • 问题:随着记忆库增长,早期存储的记忆因嵌入模型版本变化或数据分布偏移而检索质量下降
    • 正确做法:定期重新嵌入旧记忆、监控检索命中率指标、设置记忆”新鲜度”权重

✅ 最佳实践

  1. 从简单开始:先实现短期记忆(对话历史管理),再逐步添加长期和情景记忆
  2. 记忆分层存储:热数据用 Redis,温数据用向量数据库,冷数据归档到对象存储
  3. 设置记忆预算:为每种记忆类型分配明确的 Token 和存储预算
  4. 实施遗忘策略:不是所有信息都值得记住,设置重要性评分和自动淘汰机制
  5. 监控记忆质量:跟踪检索命中率、用户满意度和记忆利用率指标

相关资源与延伸阅读

  1. Mem0 官方文档  — Agent 记忆层框架的完整使用指南
  2. Letta (MemGPT) 文档  — 虚拟上下文管理和有状态 Agent 框架
  3. LangChain Memory 模块  — LangChain 生态的记忆组件文档
  4. ChromaDB 快速入门  — 轻量向量数据库的入门教程
  5. pgvector GitHub  — PostgreSQL 向量扩展的安装和使用
  6. Pinecone 学习中心  — 向量数据库和语义搜索的系统教程
  7. Generative Agents 论文  — 斯坦福大学关于 Agent 记忆和反思机制的开创性研究
  8. MemGPT 论文  — 虚拟上下文管理的原始论文

参考来源


📖 返回 总览与导航 | 上一节:09c-Guardrails实现 | 下一节:09e-Agent架构模板

Last updated on