11d - RAG 系统构建教程
本文是《AI Agent 实战手册》第 11 章第 4 节。 上一节:11c-向量数据库对比 | 下一节:11e-高级RAG技术
概述
本节是一份从零构建生产级 RAG 系统的逐步教程。我们将从环境搭建开始,经历文档摄入、分块、嵌入、向量存储、检索、重排序到最终生成的完整流程,分别提供 Python(LlamaIndex/LangChain)和 TypeScript(Vercel AI SDK)两套完整代码实现。教程不仅覆盖”能跑起来”的原型,更关注生产环境中的错误处理、性能优化、成本控制和监控评估,帮助你构建一个真正可部署的 RAG 系统。
1. 环境准备与技术选型
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| LlamaIndex | RAG 专用框架(Python) | 免费(MIT 开源) | 文档密集型 RAG,快速原型到生产 |
| LangChain | 通用 LLM 编排框架(Python) | 免费(MIT 开源) | 复杂多步骤工作流,Agent + RAG |
| Vercel AI SDK | TypeScript AI 开发工具包 | 免费(开源) | Next.js 全栈 RAG 应用 |
| Qdrant | 高性能向量数据库 | 免费(自托管);云版 ~$10/月起 | 生产级向量存储,过滤搜索优秀 |
| ChromaDB | 轻量级向量数据库 | 免费(开源) | 本地开发、原型验证 |
| pgvector | PostgreSQL 向量扩展 | 免费(随 PostgreSQL) | 已有 PostgreSQL 基础设施 |
| OpenAI Embeddings | 文本嵌入 API | text-embedding-3-small: $0.02/百万 tokens | 快速集成,高质量通用嵌入 |
| Cohere Rerank | 重排序 API | 免费层 100 次/分钟;$1/千次查询 | 提升检索精度 10-30% |
| RAGAS | RAG 评估框架 | 免费(开源) | 自动化评估 RAG 系统质量 |
| Unstructured | 文档解析 | 免费(开源);API 版 $0.01/页起 | 复杂文档格式解析 |
操作步骤
步骤 1:确定技术栈
根据你的项目需求选择技术栈:
你的主要语言是什么?
│
├── Python
│ ├── 纯 RAG 应用 ──▶ LlamaIndex(API 最简洁)
│ ├── RAG + Agent + 复杂工作流 ──▶ LangChain(生态最丰富)
│ └── 学术研究/实验 ──▶ Haystack(管线抽象清晰)
│
└── TypeScript / JavaScript
├── Next.js 全栈应用 ──▶ Vercel AI SDK + pgvector
└── Node.js 后端服务 ──▶ LangChain.js + Qdrant步骤 2:Python 环境搭建
# 创建项目目录
mkdir rag-production && cd rag-production
# 创建虚拟环境
python -m venv .venv
source .venv/bin/activate # Linux/Mac
# .venv\Scripts\activate # Windows
# 安装核心依赖(LlamaIndex 方案)
pip install llama-index openai chromadb qdrant-client
pip install llama-index-vector-stores-qdrant
pip install llama-index-embeddings-openai
pip install llama-index-postprocessor-cohere-rerank
pip install unstructured # 文档解析
# 或安装核心依赖(LangChain 方案)
pip install langchain langchain-openai langchain-community
pip install chromadb qdrant-client cohere
pip install ragas # 评估框架步骤 3:TypeScript 环境搭建
# 创建 Next.js 项目
npx create-next-app@latest rag-app --typescript --tailwind --app
cd rag-app
# 安装 AI SDK 和向量数据库依赖
npm install ai @ai-sdk/openai
npm install @neondatabase/serverless # Neon PostgreSQL
npm install drizzle-orm # ORM
npm install openai # 嵌入 API
# 开发依赖
npm install -D drizzle-kit dotenv步骤 4:配置环境变量
# .env 文件
OPENAI_API_KEY=sk-your-openai-api-key
COHERE_API_KEY=your-cohere-api-key # 可选,用于重排序
QDRANT_URL=http://localhost:6333 # 本地 Qdrant
QDRANT_API_KEY= # 云端 Qdrant 需要
DATABASE_URL=postgresql://... # TypeScript 方案用提示词模板
请帮我评估以下 RAG 项目的技术选型是否合理:
## 项目信息
- 业务场景:[内部知识库问答 / 客服机器人 / 代码搜索 / ...]
- 文档规模:[文档数量和总大小]
- 文档类型:[PDF / Markdown / 代码 / 混合]
- 查询量:[日均查询次数]
- 团队技术栈:[Python / TypeScript / 两者都有]
- 月预算:[金额]
## 当前选型
- 框架:[LlamaIndex / LangChain / Vercel AI SDK]
- 向量数据库:[选择的数据库]
- 嵌入模型:[选择的模型]
- 生成模型:[选择的 LLM]
## 请评估
1. 选型是否匹配业务需求?
2. 有哪些潜在风险?
3. 是否有更优方案?2. Python 完整实现:LlamaIndex + Qdrant
本节提供一个完整的生产级 RAG 系统实现,使用 LlamaIndex 作为 RAG 框架,Qdrant 作为向量数据库,OpenAI 提供嵌入和生成能力,Cohere 提供重排序。
操作步骤
步骤 1:项目结构
rag-production/
├── src/
│ ├── __init__.py
│ ├── config.py # 配置管理
│ ├── ingestion.py # 文档摄入与索引
│ ├── retrieval.py # 检索与查询
│ ├── evaluation.py # 质量评估
│ └── api.py # FastAPI 服务
├── data/ # 原始文档目录
├── tests/
│ └── test_rag.py
├── .env
├── requirements.txt
└── docker-compose.yml # Qdrant 本地部署步骤 2:启动 Qdrant(Docker)
# docker-compose.yml
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333" # REST API
- "6334:6334" # gRPC
volumes:
- qdrant_data:/qdrant/storage
environment:
- QDRANT__SERVICE__GRPC_PORT=6334
volumes:
qdrant_data:docker compose up -d
# 验证:访问 http://localhost:6333/dashboard步骤 3:配置管理
# src/config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class RAGConfig:
"""RAG 系统配置"""
# 嵌入模型
embed_model: str = "text-embedding-3-small"
embed_dimensions: int = 512 # 维度缩减,节省存储
# 生成模型
llm_model: str = "gpt-4o-mini"
llm_temperature: float = 0.1 # 低温度减少幻觉
# 分块参数
chunk_size: int = 512
chunk_overlap: int = 100
# 检索参数
similarity_top_k: int = 20 # 初始召回数量
rerank_top_n: int = 5 # 重排序后保留数量
# 向量数据库
qdrant_url: str = os.getenv("QDRANT_URL", "http://localhost:6333")
qdrant_api_key: str | None = os.getenv("QDRANT_API_KEY")
collection_name: str = "knowledge_base"
# API Keys
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
cohere_api_key: str = os.getenv("COHERE_API_KEY", "")
config = RAGConfig()步骤 4:文档摄入与索引构建
# src/ingestion.py
"""文档摄入管线:解析 → 分块 → 嵌入 → 存储"""
from pathlib import Path
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
StorageContext,
Document,
)
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from qdrant_client import QdrantClient
from .config import config
def init_settings():
"""初始化全局 LLM 和嵌入模型设置"""
Settings.llm = OpenAI(
model=config.llm_model,
temperature=config.llm_temperature,
api_key=config.openai_api_key,
)
Settings.embed_model = OpenAIEmbedding(
model=config.embed_model,
dimensions=config.embed_dimensions,
api_key=config.openai_api_key,
)
def create_vector_store() -> QdrantVectorStore:
"""创建 Qdrant 向量存储连接"""
client = QdrantClient(
url=config.qdrant_url,
api_key=config.qdrant_api_key,
)
return QdrantVectorStore(
client=client,
collection_name=config.collection_name,
)
def load_documents(data_dir: str = "./data") -> list[Document]:
"""加载多格式文档"""
reader = SimpleDirectoryReader(
input_dir=data_dir,
recursive=True,
required_exts=[".md", ".txt", ".pdf", ".py", ".ts", ".html"],
filename_as_id=True, # 用文件名作为文档 ID,支持增量更新
)
documents = reader.load_data()
print(f"✅ 加载了 {len(documents)} 个文档")
return documents
def build_index(documents: list[Document]) -> VectorStoreIndex:
"""构建向量索引(完整流程:分块 → 嵌入 → 存储)"""
init_settings()
# 配置分块器
splitter = SentenceSplitter(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap,
paragraph_separator="\n\n",
)
# 配置向量存储
vector_store = create_vector_store()
storage_context = StorageContext.from_defaults(
vector_store=vector_store,
)
# 构建索引(自动完成分块 → 嵌入 → 存储)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
transformations=[splitter],
show_progress=True,
)
print(f"✅ 索引构建完成,共 {len(documents)} 个文档")
return index
def load_existing_index() -> VectorStoreIndex:
"""加载已有索引(不重新构建)"""
init_settings()
vector_store = create_vector_store()
return VectorStoreIndex.from_vector_store(vector_store)
# ============================================
# 入口:运行索引构建
# ============================================
if __name__ == "__main__":
docs = load_documents("./data")
index = build_index(docs)
print("🎉 索引构建完成!")步骤 5:检索与查询引擎
# src/retrieval.py
"""检索与查询引擎:混合检索 + 重排序 + 带引用生成"""
from llama_index.core import VectorStoreIndex
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import get_response_synthesizer
from .config import config
from .ingestion import load_existing_index
def create_reranker():
"""创建重排序器(优先使用 Cohere API,备选本地模型)"""
if config.cohere_api_key:
return CohereRerank(
model="rerank-v3.5",
top_n=config.rerank_top_n,
api_key=config.cohere_api_key,
)
# 备选:本地开源重排序模型(无需 API Key)
return SentenceTransformerRerank(
model="BAAI/bge-reranker-v2-m3",
top_n=config.rerank_top_n,
)
def create_query_engine(index: VectorStoreIndex | None = None):
"""创建生产级查询引擎"""
if index is None:
index = load_existing_index()
reranker = create_reranker()
# 配置响应合成器
response_synthesizer = get_response_synthesizer(
response_mode="tree_summarize", # 树状摘要,适合多文档
use_async=True,
)
# 创建查询引擎
query_engine = index.as_query_engine(
similarity_top_k=config.similarity_top_k,
node_postprocessors=[reranker],
response_synthesizer=response_synthesizer,
)
return query_engine
def query(question: str, engine=None) -> dict:
"""执行 RAG 查询,返回回答和来源"""
if engine is None:
engine = create_query_engine()
response = engine.query(question)
# 提取来源信息
sources = []
for node in response.source_nodes:
sources.append({
"file": node.metadata.get("file_name", "未知"),
"score": round(node.score, 4) if node.score else None,
"text_preview": node.text[:200] + "...",
})
return {
"answer": str(response),
"sources": sources,
"source_count": len(sources),
}
# ============================================
# 入口:交互式查询
# ============================================
if __name__ == "__main__":
engine = create_query_engine()
print("🔍 RAG 查询系统已就绪,输入 'quit' 退出\n")
while True:
question = input("你的问题: ").strip()
if question.lower() in ("quit", "exit", "q"):
break
if not question:
continue
result = query(question, engine)
print(f"\n📝 回答:\n{result['answer']}")
print(f"\n📚 来源 ({result['source_count']} 个):")
for src in result["sources"]:
print(f" - {src['file']} (相关度: {src['score']})")
print("-" * 60)步骤 6:FastAPI 服务化
# src/api.py
"""FastAPI 服务:将 RAG 系统暴露为 REST API"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from contextlib import asynccontextmanager
from .ingestion import load_existing_index
from .retrieval import create_query_engine, query
# 全局查询引擎(应用启动时初始化一次)
query_engine = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
global query_engine
print("🚀 正在初始化 RAG 查询引擎...")
index = load_existing_index()
query_engine = create_query_engine(index)
print("✅ RAG 查询引擎就绪")
yield
print("👋 RAG 服务关闭")
app = FastAPI(title="RAG API", version="1.0.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class SourceInfo(BaseModel):
file: str
score: float | None
text_preview: str
class QueryResponse(BaseModel):
answer: str
sources: list[SourceInfo]
source_count: int
@app.post("/query", response_model=QueryResponse)
async def handle_query(request: QueryRequest):
"""处理 RAG 查询请求"""
if not request.question.strip():
raise HTTPException(status_code=400, detail="问题不能为空")
try:
result = query(request.question, query_engine)
return QueryResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}")
@app.get("/health")
async def health_check():
return {"status": "healthy", "engine_ready": query_engine is not None}# 启动服务
uvicorn src.api:app --host 0.0.0.0 --port 8000 --reload
# 测试查询
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "RAG 系统的核心组件有哪些?"}'3. Python 完整实现:LangChain + pgvector
对于已有 PostgreSQL 基础设施的团队,LangChain + pgvector 是一个更轻量的选择,无需引入额外的向量数据库。
操作步骤
步骤 1:安装依赖
pip install langchain langchain-openai langchain-postgres
pip install psycopg2-binary # PostgreSQL 驱动
pip install cohere # 重排序(可选)步骤 2:启用 pgvector
-- 在 PostgreSQL 中启用 pgvector 扩展
CREATE EXTENSION IF NOT EXISTS vector;步骤 3:完整实现
"""LangChain + pgvector 生产级 RAG 实现"""
import os
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
# ============================================
# 1. 配置
# ============================================
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL") # postgresql://user:pass@host:5432/db
COLLECTION_NAME = "knowledge_base"
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key=OPENAI_API_KEY,
)
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
api_key=OPENAI_API_KEY,
)
# ============================================
# 2. 文档加载与分块
# ============================================
def ingest_documents(data_dir: str = "./data"):
"""加载文档并构建向量索引"""
# 加载文档
loader = DirectoryLoader(
data_dir,
glob="**/*.md",
loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"},
)
documents = loader.load()
print(f"加载了 {len(documents)} 个文档")
# 分块(中文优化分隔符)
splitter = RecursiveCharacterTextSplitter(
chunk_size=600,
chunk_overlap=120,
separators=["\n\n\n", "\n\n", "\n", "。", ";", ",", ".", " ", ""],
)
chunks = splitter.split_documents(documents)
print(f"分块后共 {len(chunks)} 个片段")
# 存入 pgvector
vectorstore = PGVector.from_documents(
documents=chunks,
embedding=embeddings,
collection_name=COLLECTION_NAME,
connection=DATABASE_URL,
pre_delete_collection=True, # 首次构建时清空旧数据
)
print("✅ 向量索引构建完成")
return vectorstore
# ============================================
# 3. 检索与生成
# ============================================
RAG_PROMPT = PromptTemplate(
template="""你是一个基于知识库的问答助手。请严格根据以下上下文信息回答问题。
## 规则
1. 只使用提供的上下文信息回答
2. 在回答中标注信息来源
3. 如果上下文中没有相关信息,明确说明
4. 不要编造不存在的信息
## 上下文
{context}
## 问题
{question}
## 回答(附带来源引用):""",
input_variables=["context", "question"],
)
def create_rag_chain():
"""创建 RAG 查询链"""
# 连接已有向量存储
vectorstore = PGVector(
embeddings=embeddings,
collection_name=COLLECTION_NAME,
connection=DATABASE_URL,
)
# 创建检索器(混合搜索)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 10},
)
# 创建 RAG 链
chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": RAG_PROMPT},
)
return chain
def query_rag(question: str) -> dict:
"""执行 RAG 查询"""
chain = create_rag_chain()
result = chain.invoke({"query": question})
sources = [
{
"source": doc.metadata.get("source", "未知"),
"content_preview": doc.page_content[:150] + "...",
}
for doc in result.get("source_documents", [])
]
return {
"answer": result["result"],
"sources": sources,
}
# ============================================
# 入口
# ============================================
if __name__ == "__main__":
# 首次运行:构建索引
# ingest_documents("./data")
# 查询
result = query_rag("项目的主要功能是什么?")
print(f"回答: {result['answer']}")
for src in result["sources"]:
print(f" 来源: {src['source']}")4. TypeScript 完整实现:Vercel AI SDK + pgvector
对于 TypeScript/Next.js 开发者,Vercel AI SDK 提供了一流的 RAG 开发体验,结合 Neon(Serverless PostgreSQL)和 pgvector 可以快速构建全栈 RAG 应用。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Vercel AI SDK | TypeScript AI 开发工具包 | 免费(开源) | Next.js 全栈 RAG 应用 |
| Neon | Serverless PostgreSQL | 免费层可用;Pro $19/月起 | 零运维 PostgreSQL + pgvector |
| Drizzle ORM | TypeScript ORM | 免费(开源) | 类型安全的数据库操作 |
| OpenAI SDK | 嵌入和生成 API | 按量计费 | 嵌入生成和 LLM 调用 |
操作步骤
步骤 1:数据库 Schema 定义
// src/lib/db/schema.ts
import { pgTable, serial, text, vector, timestamp, jsonb } from "drizzle-orm/pg-core";
export const documents = pgTable("documents", {
id: serial("id").primaryKey(),
content: text("content").notNull(),
metadata: jsonb("metadata").$type<{
source: string;
title?: string;
chunk_index?: number;
}>(),
embedding: vector("embedding", { dimensions: 1536 }),
createdAt: timestamp("created_at").defaultNow(),
});步骤 2:文档摄入与嵌入
// src/lib/rag/ingestion.ts
import { openai } from "@ai-sdk/openai";
import { embed } from "ai";
import { db } from "@/lib/db";
import { documents } from "@/lib/db/schema";
/**
* 将文本分块(递归分块策略)
*/
function chunkText(text: string, chunkSize = 600, overlap = 120): string[] {
const chunks: string[] = [];
const separators = ["\n\n\n", "\n\n", "\n", "。", ".", " "];
function splitRecursive(text: string, sepIndex: number): string[] {
if (text.length <= chunkSize) return [text];
if (sepIndex >= separators.length) {
// 最后手段:按字符数硬切
const result: string[] = [];
for (let i = 0; i < text.length; i += chunkSize - overlap) {
result.push(text.slice(i, i + chunkSize));
}
return result;
}
const sep = separators[sepIndex];
const parts = text.split(sep);
const result: string[] = [];
let current = "";
for (const part of parts) {
if ((current + sep + part).length > chunkSize && current) {
result.push(current.trim());
// 保留重叠部分
const words = current.split(" ");
current = words.slice(-Math.floor(words.length * 0.2)).join(" ") + sep + part;
} else {
current = current ? current + sep + part : part;
}
}
if (current.trim()) result.push(current.trim());
// 对超长块递归使用下一级分隔符
return result.flatMap((chunk) =>
chunk.length > chunkSize ? splitRecursive(chunk, sepIndex + 1) : [chunk]
);
}
return splitRecursive(text, 0).filter((c) => c.length > 50);
}
/**
* 生成文本嵌入向量
*/
async function generateEmbedding(text: string): Promise<number[]> {
const { embedding } = await embed({
model: openai.embedding("text-embedding-3-small"),
value: text,
});
return embedding;
}
/**
* 摄入文档:分块 → 嵌入 → 存储
*/
export async function ingestDocument(
content: string,
source: string,
title?: string
) {
const chunks = chunkText(content);
console.log(`📄 ${source}: 分为 ${chunks.length} 个块`);
// 批量生成嵌入(减少 API 调用)
const results = await Promise.all(
chunks.map(async (chunk, index) => {
const embeddingVector = await generateEmbedding(chunk);
return {
content: chunk,
metadata: { source, title, chunk_index: index },
embedding: embeddingVector,
};
})
);
// 批量写入数据库
await db.insert(documents).values(results);
console.log(`✅ ${source}: ${results.length} 个块已存入数据库`);
}步骤 3:检索与生成
// src/lib/rag/retrieval.ts
import { openai } from "@ai-sdk/openai";
import { embed, generateText } from "ai";
import { db } from "@/lib/db";
import { documents } from "@/lib/db/schema";
import { cosineDistance, desc, gt, sql } from "drizzle-orm";
/**
* 语义检索:查找与查询最相关的文档块
*/
export async function retrieveRelevantChunks(
query: string,
topK = 10,
minSimilarity = 0.3
) {
// 生成查询嵌入
const { embedding: queryEmbedding } = await embed({
model: openai.embedding("text-embedding-3-small"),
value: query,
});
// 向量相似度搜索
const similarity = sql<number>`1 - (${cosineDistance(
documents.embedding,
queryEmbedding
)})`;
const results = await db
.select({
id: documents.id,
content: documents.content,
metadata: documents.metadata,
similarity,
})
.from(documents)
.where(gt(similarity, minSimilarity))
.orderBy(desc(similarity))
.limit(topK);
return results;
}
/**
* RAG 查询:检索 + 生成
*/
export async function ragQuery(question: string) {
// 1. 检索相关文档
const relevantChunks = await retrieveRelevantChunks(question, 10);
if (relevantChunks.length === 0) {
return {
answer: "根据现有知识库,我无法找到与您问题相关的信息。",
sources: [],
};
}
// 2. 组装上下文
const context = relevantChunks
.map(
(chunk, i) =>
`[来源 ${i + 1}: ${chunk.metadata?.source ?? "未知"}]\n${chunk.content}`
)
.join("\n\n---\n\n");
// 3. 生成回答
const { text } = await generateText({
model: openai("gpt-4o-mini"),
system: `你是一个基于知识库的问答助手。
规则:
1. 只使用提供的上下文信息回答问题
2. 在回答中用 [来源 N] 标注信息出处
3. 如果上下文中没有相关信息,明确说明
4. 不要编造上下文中不存在的信息`,
prompt: `## 上下文信息\n${context}\n\n## 用户问题\n${question}\n\n## 请回答(附带来源引用):`,
temperature: 0.1,
});
return {
answer: text,
sources: relevantChunks.map((chunk) => ({
source: chunk.metadata?.source ?? "未知",
similarity: chunk.similarity,
preview: chunk.content.slice(0, 150) + "...",
})),
};
}步骤 4:Next.js API 路由
// src/app/api/query/route.ts
import { NextRequest, NextResponse } from "next/server";
import { ragQuery } from "@/lib/rag/retrieval";
export async function POST(request: NextRequest) {
try {
const { question } = await request.json();
if (!question?.trim()) {
return NextResponse.json(
{ error: "问题不能为空" },
{ status: 400 }
);
}
const result = await ragQuery(question);
return NextResponse.json(result);
} catch (error) {
console.error("RAG 查询失败:", error);
return NextResponse.json(
{ error: "查询处理失败,请稍后重试" },
{ status: 500 }
);
}
}步骤 5:流式聊天界面(Vercel AI SDK)
// src/app/api/chat/route.ts
import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
import { retrieveRelevantChunks } from "@/lib/rag/retrieval";
export async function POST(req: Request) {
const { messages } = await req.json();
const lastMessage = messages[messages.length - 1]?.content ?? "";
// 检索相关上下文
const chunks = await retrieveRelevantChunks(lastMessage, 5);
const context = chunks
.map((c, i) => `[来源 ${i + 1}] ${c.content}`)
.join("\n\n");
const result = streamText({
model: openai("gpt-4o-mini"),
system: `你是知识库问答助手。基于以下上下文回答问题,标注来源。
如果上下文不包含相关信息,明确说明。
## 上下文
${context}`,
messages,
temperature: 0.1,
});
return result.toDataStreamResponse();
}// src/app/page.tsx — 前端聊天组件
"use client";
import { useChat } from "@ai-sdk/react";
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } =
useChat({ api: "/api/chat" });
return (
<div className="max-w-2xl mx-auto p-4">
<h1 className="text-2xl font-bold mb-4">📚 知识库问答</h1>
<div className="space-y-4 mb-4">
{messages.map((m) => (
<div
key={m.id}
className={`p-3 rounded-lg ${
m.role === "user" ? "bg-blue-100 ml-8" : "bg-gray-100 mr-8"
}`}
>
<p className="text-sm font-medium mb-1">
{m.role === "user" ? "你" : "AI 助手"}
</p>
<p className="whitespace-pre-wrap">{m.content}</p>
</div>
))}
</div>
<form onSubmit={handleSubmit} className="flex gap-2">
<input
value={input}
onChange={handleInputChange}
placeholder="输入你的问题..."
className="flex-1 p-2 border rounded-lg"
disabled={isLoading}
/>
<button
type="submit"
disabled={isLoading}
className="px-4 py-2 bg-blue-500 text-white rounded-lg disabled:opacity-50"
>
{isLoading ? "思考中..." : "发送"}
</button>
</form>
</div>
);
}5. 质量评估与监控
上线前必须建立评估体系,否则无法量化 RAG 系统的回答质量,出问题时也无法定位是检索问题还是生成问题。
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| RAGAS | RAG 评估框架 | 免费(开源) | 自动化评估 faithfulness、relevance 等指标 |
| LangSmith | LLM 可观测性平台 | 免费层可用;Plus $39/月起 | Trace 追踪、评估、调试 |
| Langfuse | 开源 LLM 可观测性 | 免费(自托管);Cloud 免费层可用 | 自托管监控、成本追踪 |
| DeepEval | LLM 测试框架 | 免费(开源) | 单元测试风格的 RAG 评估 |
| Arize Phoenix | AI 可观测性 | 免费(开源) | 本地 Trace 分析和评估 |
操作步骤
步骤 1:使用 RAGAS 评估 RAG 质量
# src/evaluation.py
"""使用 RAGAS 框架评估 RAG 系统质量"""
from ragas import evaluate
from ragas.metrics import (
faithfulness, # 忠实度:回答是否基于检索到的上下文
answer_relevancy, # 回答相关性:回答是否切题
context_precision, # 上下文精度:检索到的内容是否相关
context_recall, # 上下文召回:是否检索到了所有需要的信息
)
from ragas import EvaluationDataset, SingleTurnSample
def create_test_dataset() -> EvaluationDataset:
"""创建评估数据集(需要人工标注的参考答案)"""
samples = [
SingleTurnSample(
user_input="RAG 系统的核心组件有哪些?",
response="RAG 系统的核心组件包括...", # RAG 系统生成的回答
retrieved_contexts=[
"RAG 系统由检索器、生成器和知识库组成...",
"向量数据库是 RAG 的存储核心...",
],
reference="RAG 系统的核心组件包括:检索器(Retriever)、"
"生成器(Generator/LLM)、知识库(Knowledge Base)、"
"向量数据库(Vector Database)和嵌入模型(Embedding Model)。",
),
# 添加更多测试样本...
]
return EvaluationDataset(samples=samples)
def run_evaluation():
"""运行 RAGAS 评估"""
dataset = create_test_dataset()
result = evaluate(
dataset=dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall,
],
)
print("📊 RAG 评估结果:")
print(f" 忠实度 (Faithfulness): {result['faithfulness']:.4f}")
print(f" 回答相关性 (Relevancy): {result['answer_relevancy']:.4f}")
print(f" 上下文精度 (Precision): {result['context_precision']:.4f}")
print(f" 上下文召回 (Recall): {result['context_recall']:.4f}")
# 生产环境基准线
thresholds = {
"faithfulness": 0.85,
"answer_relevancy": 0.80,
"context_precision": 0.75,
"context_recall": 0.70,
}
for metric, threshold in thresholds.items():
score = result[metric]
status = "✅" if score >= threshold else "⚠️"
print(f" {status} {metric}: {score:.4f} (阈值: {threshold})")
return result步骤 2:生产监控指标
"""生产环境关键监控指标"""
import time
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class RAGMetrics:
"""RAG 系统运行时指标收集器"""
query_count: int = 0
total_latency_ms: float = 0
error_count: int = 0
empty_retrieval_count: int = 0 # 检索结果为空的次数
latency_buckets: list = field(default_factory=list)
def record_query(self, latency_ms: float, retrieved_count: int, success: bool):
self.query_count += 1
self.total_latency_ms += latency_ms
self.latency_buckets.append(latency_ms)
if not success:
self.error_count += 1
if retrieved_count == 0:
self.empty_retrieval_count += 1
@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / max(self.query_count, 1)
@property
def p95_latency_ms(self) -> float:
if not self.latency_buckets:
return 0
sorted_latencies = sorted(self.latency_buckets)
idx = int(len(sorted_latencies) * 0.95)
return sorted_latencies[min(idx, len(sorted_latencies) - 1)]
@property
def success_rate(self) -> float:
return (self.query_count - self.error_count) / max(self.query_count, 1)
@property
def empty_retrieval_rate(self) -> float:
return self.empty_retrieval_count / max(self.query_count, 1)
def report(self) -> dict:
return {
"total_queries": self.query_count,
"avg_latency_ms": round(self.avg_latency_ms, 2),
"p95_latency_ms": round(self.p95_latency_ms, 2),
"success_rate": f"{self.success_rate:.2%}",
"empty_retrieval_rate": f"{self.empty_retrieval_rate:.2%}",
"error_count": self.error_count,
}
# 使用示例
metrics = RAGMetrics()
def monitored_query(question: str, engine) -> dict:
"""带监控的查询包装器"""
start = time.time()
try:
result = query(question, engine)
latency = (time.time() - start) * 1000
metrics.record_query(latency, result["source_count"], success=True)
return result
except Exception as e:
latency = (time.time() - start) * 1000
metrics.record_query(latency, 0, success=False)
raise提示词模板
请帮我分析以下 RAG 系统的评估结果,并提出优化建议:
## 评估指标
- Faithfulness(忠实度):[分数]
- Answer Relevancy(回答相关性):[分数]
- Context Precision(上下文精度):[分数]
- Context Recall(上下文召回):[分数]
## 系统配置
- 分块大小:[chunk_size]
- 嵌入模型:[模型名]
- 检索 Top-K:[数量]
- 是否使用重排序:[是/否]
## 请分析
1. 哪个指标是当前瓶颈?
2. 瓶颈的可能原因是什么?
3. 推荐的优化方向和具体操作?
4. 优化后预期的指标提升幅度?6. 生产部署与成本优化
工具推荐
| 工具 | 用途 | 价格 | 适用场景 |
|---|---|---|---|
| Docker + Docker Compose | 容器化部署 | 免费 | 本地和服务器部署 |
| Vercel | Next.js 托管 | 免费层可用;Pro $20/月 | TypeScript RAG 前端 |
| Railway | 应用托管 | $5/月起 | Python RAG 后端 |
| Neon | Serverless PostgreSQL | 免费层可用;Pro $19/月起 | pgvector 托管 |
| Qdrant Cloud | 托管向量数据库 | ~$10/月起 | 零运维向量存储 |
| Redis | 语义缓存 | 免费(自托管);Cloud $5/月起 | 减少重复查询成本 60-80% |
操作步骤
步骤 1:Docker 化部署
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential && rm -rf /var/lib/apt/lists/*
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY src/ ./src/
# 暴露端口
EXPOSE 8000
# 启动服务
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.prod.yml
version: '3.8'
services:
rag-api:
build: .
ports:
- "8000:8000"
env_file: .env
depends_on:
- qdrant
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage
restart: unless-stopped
deploy:
resources:
limits:
memory: 4G
volumes:
qdrant_data:步骤 2:语义缓存(降低成本 60-80%)
"""语义缓存:相似查询复用已有回答,大幅降低 API 调用成本"""
import hashlib
import json
import numpy as np
from functools import lru_cache
class SemanticCache:
"""基于向量相似度的语义缓存"""
def __init__(self, similarity_threshold: float = 0.95):
self.cache: dict[str, dict] = {} # hash -> {embedding, response}
self.threshold = similarity_threshold
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a_np, b_np = np.array(a), np.array(b)
return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))
def get(self, query_embedding: list[float]) -> dict | None:
"""查找语义相似的缓存结果"""
for key, entry in self.cache.items():
similarity = self._cosine_similarity(query_embedding, entry["embedding"])
if similarity >= self.threshold:
return entry["response"]
return None
def set(self, query_embedding: list[float], response: dict):
"""缓存查询结果"""
key = hashlib.md5(json.dumps(query_embedding[:10]).encode()).hexdigest()
self.cache[key] = {
"embedding": query_embedding,
"response": response,
}
@property
def hit_rate_info(self) -> str:
return f"缓存条目数: {len(self.cache)}"
# 集成到查询流程
cache = SemanticCache(similarity_threshold=0.95)
def cached_query(question: str, engine) -> dict:
"""带语义缓存的查询"""
# 生成查询嵌入
query_embedding = get_embedding(question)
# 检查缓存
cached = cache.get(query_embedding)
if cached:
cached["from_cache"] = True
return cached
# 缓存未命中,执行实际查询
result = query(question, engine)
cache.set(query_embedding, result)
result["from_cache"] = False
return result步骤 3:成本估算模板
┌─────────────────────────────────────────────────────────┐
│ RAG 系统月度成本估算 │
├─────────────────────────────────────────────────────────┤
│ │
│ 假设:10 万文档,日均 1000 次查询 │
│ │
│ 一次性成本(索引构建) │
│ ├── 嵌入 API:10 万文档 × 500 tokens/文档 │
│ │ = 5000 万 tokens × $0.02/百万 = $1.00 │
│ └── 合计:~$1(仅首次构建) │
│ │
│ 月度运行成本 │
│ ├── 嵌入 API(查询):1000 次/天 × 30 天 │
│ │ = 3 万次 × 100 tokens × $0.02/百万 = $0.06 │
│ ├── LLM 生成:3 万次 × 2000 tokens(含上下文) │
│ │ = 6000 万 tokens × $0.15/百万 = $9.00 │
│ ├── 重排序 API:3 万次 × $1/千次 = $30.00 │
│ ├── 向量数据库:Qdrant Cloud ~$10-25/月 │
│ └── 服务器/托管:Railway/Vercel ~$5-20/月 │
│ │
│ 月度总计:~$55-85(不含缓存优化) │
│ 缓存优化后:~$25-45(节省 40-60%) │
│ │
│ 成本优化杠杆 │
│ ├── 语义缓存:减少 60-80% 重复查询 │
│ ├── 维度缩减:嵌入从 1536→512,存储减少 3x │
│ ├── 模型降级:非关键查询用 gpt-4o-mini │
│ └── 批量嵌入:减少 API 调用次数 │
│ │
└─────────────────────────────────────────────────────────┘实战案例:企业内部知识库问答系统
场景描述
一家 200 人的技术公司需要构建内部知识库问答系统,覆盖以下数据源:
- 技术文档(Markdown):500+ 篇
- API 文档(OpenAPI/Swagger):30+ 个服务
- 代码仓库 README 和注释:50+ 个仓库
- 会议纪要和决策记录(PDF):200+ 份
技术选型决策
| 决策点 | 选择 | 理由 |
|---|---|---|
| RAG 框架 | LlamaIndex | 文档密集型场景,LlamaIndex 的数据连接器和索引优化更成熟 |
| 向量数据库 | Qdrant Cloud | 团队无 DevOps 资源,需要托管服务;过滤搜索性能优秀(按团队/项目过滤) |
| 嵌入模型 | text-embedding-3-small(512 维) | 性价比最优,维度缩减后存储成本降低 3x |
| 生成模型 | gpt-4o-mini | 成本低,质量足够内部使用 |
| 重排序 | Cohere rerank-v3.5 | API 即用,多语言支持好 |
| 部署 | Docker + Railway | 简单可靠,团队熟悉 |
案例分析
关键设计决策:
-
多数据源统一索引:使用 LlamaIndex 的
SimpleDirectoryReader统一加载 Markdown、PDF 和代码文件,通过元数据(source_type、team、project)区分来源,支持按团队和项目过滤检索 -
分块策略差异化:
- 技术文档:递归分块,512 tokens,按标题层级切分
- 代码文件:
CodeSplitter按函数/类切分,保留语法完整性 - PDF 会议纪要:先用 LlamaParse 解析,再按段落分块
-
增量更新机制:使用
filename_as_id=True让文件名作为文档 ID,更新文档时只需重新索引变更的文件,无需全量重建 -
成本控制:
- 语义缓存命中率达到 35%(内部用户查询重复度高)
- 维度缩减从 1536→512,向量存储成本降低 67%
- 月度总成本约 $60(500 用户,日均 800 次查询)
-
评估体系:
- 上线前用 50 个标注样本做 RAGAS 评估,faithfulness > 0.85
- 上线后收集用户反馈(👍/👎),每周分析低评分查询并优化
避坑指南
❌ 常见错误
-
一开始就追求完美架构,迟迟不上线
- 问题:花数周搭建 Modular RAG + 知识图谱 + 多路召回,产品还没有用户验证
- 正确做法:先用 Naive RAG(LlamaIndex + ChromaDB)在 1 天内跑通原型,验证业务价值后再逐步迭代到 Advanced RAG。“能用的 RAG 比完美的 PPT 有价值 100 倍”
-
忽略文档解析质量,直接进入分块
- 问题:PDF 中的表格变成乱码、代码块丢失格式、标题层级混乱,导致检索到的内容不可用
- 正确做法:在流水线中加入解析质量抽检步骤。对复杂 PDF 使用 LlamaParse,对代码文件使用语法感知分块。解析质量是”垃圾进垃圾出”的第一道防线
-
没有区分文档嵌入和查询嵌入的 input_type
- 问题:部分嵌入模型(Voyage AI、Cohere)对文档和查询使用不同编码策略,混用导致检索质量下降 10-20%
- 正确做法:文档嵌入用
input_type="document",查询嵌入用input_type="query"。始终查阅嵌入模型官方文档确认
-
生产环境不做缓存,每次查询都调用全套 API
- 问题:内部知识库场景下,30-40% 的查询是相似或重复的,不缓存导致成本翻倍
- 正确做法:实现语义缓存(相似度 > 0.95 的查询复用结果),可降低 API 成本 40-60%
-
没有建立评估体系就上线
- 问题:上线后用户反馈”回答不准确”,但无法定位是检索问题还是生成问题
- 正确做法:上线前用 RAGAS 建立基线指标(faithfulness、relevance、precision、recall),上线后持续监控。详见 11f-RAG评估与优化
-
将所有文档塞入一个集合,不做元数据过滤
- 问题:知识库增长后,检索到的内容来自不相关的领域,回答质量下降
- 正确做法:为每个文档块添加丰富的元数据(来源、团队、项目、日期、文档类型),查询时通过元数据过滤缩小检索范围
✅ 最佳实践
- 遵循”1 天原型 → 1 周生产 → 持续迭代”的节奏——先验证价值,再优化质量
- 从第一天就封装向量数据库抽象层——方便后续从 ChromaDB 迁移到 Qdrant/Milvus
- 批量嵌入而非逐条调用——减少 API 调用次数,降低延迟和成本
- 为每个文档块添加丰富元数据——来源、日期、类别、作者,支持过滤检索
- 分别评估检索质量和生成质量——定位瓶颈后针对性优化,避免盲目调参
- 建立用户反馈闭环——收集 👍/👎 反馈,定期分析低评分查询,持续改进
相关资源与延伸阅读
- LlamaIndex 官方文档 — 从零构建 RAG :最全面的 RAG 框架文档,包含从入门到生产的完整教程和 API 参考
- LangChain RAG 教程 :LangChain 官方 RAG 实现指南,适合需要复杂工作流的场景
- Vercel AI SDK — RAG 指南 :TypeScript 开发者的 RAG 构建指南,与 Next.js 深度集成
- RAGAS 评估框架文档 :RAG 系统评估的事实标准框架,提供 faithfulness、relevance 等自动化评估指标
- Qdrant 官方教程 :Qdrant 向量数据库的部署、配置和性能调优指南
- Pinecone RAG 学习中心 :从概念到实践的 RAG 系列文章,适合入门学习
- LangSmith 文档 :LLM 应用的可观测性平台,支持 Trace 追踪和评估
- Neon + pgvector 快速入门 :Serverless PostgreSQL 上使用 pgvector 的完整指南
- Athenic — 从零到生产构建 RAG 知识库 :端到端的生产级 RAG 构建指南,包含架构决策和性能优化
- RAG 工程检查清单(The Data Sensei) :部署前的 25 项检查清单,覆盖摄入、嵌入、检索、缓存、评估和部署
参考来源
- Building Production-Ready RAG Applications: Complete Guide — DasRoot (2026-01)
- Building Production-Ready RAG Systems — Athenic (2025-08)
- Production-Ready RAG Systems: End to End Guide — Saumil Srivastava (2025-05)
- Build a Production-Ready RAG Pipeline with LangChain and Pinecone — Reintech (2025-12)
- Complete Guide for AI Engineers: Building Production RAG Systems — Zen van Riel (2025-12)
- Production-Ready RAG: A Practical Guide for Engineers — DoorToOnline (2025-09)
- The RAG Engineering Checklist — The Data Sensei (2025-12)
- Scaling RAG for Production — Superlinked (2025-12)
- RAG with Vercel AI SDK — Vercel Examples (2025)
- Best RAG Frameworks 2025: LangChain vs LlamaIndex vs Haystack — LangCopilot (2025-11)
- Engineering Guidelines for Scalable RAG Systems — Netguru (2025-08)
📖 返回 总览与导航 | 上一节:11c-向量数据库对比 | 下一节:11e-高级RAG技术