Skip to Content

29d - ORM与查询优化

本文是《AI Agent 实战手册》第 29 章第 4 节。 上一节:29c-迁移文件生成与审查 | 下一节:29e-数据库Prompt模板与反模式

概述

ORM(Object-Relational Mapping)是现代应用开发中连接业务逻辑与数据库的核心桥梁,而查询性能则直接决定用户体验和系统可扩展性。2025-2026 年,AI 编码助手(Claude Code、Cursor、Kiro)能够自动生成类型安全的 ORM 模型代码,同时 AI 驱动的查询优化工具(Prisma Optimize、PlanetScale Insights、pganalyze、AI2sql)可以自动检测 N+1 查询、推荐索引策略、分析慢查询执行计划。本节系统覆盖五大主流 ORM(Prisma、Drizzle、SQLAlchemy、TypeORM、Diesel)的模型代码生成模式,深入讲解 AI 辅助查询优化的完整工作流——从索引建议到慢查询分析、从 N+1 检测到连接池调优,提供完整的提示词模板和实战案例。


1. ORM 生态全景与 AI 辅助能力对比

1.1 主流 ORM 对比(2025-2026)

ORM语言/生态类型安全AI 友好度查询优化工具价格适用场景
PrismaTypeScript/JavaScript⭐⭐⭐⭐⭐ 自动生成类型⭐⭐⭐⭐⭐ Schema DSL 清晰Prisma Optimize(AI 驱动)免费(开源)/ Optimize $0 起 / Accelerate $0.20/100K opsNode.js 全栈项目,类型安全优先
DrizzleTypeScript/JavaScript⭐⭐⭐⭐⭐ TypeScript 原生⭐⭐⭐⭐ SQL-like 语法无内置(配合外部工具)免费(开源)/ Drizzle Studio 免费轻量 TypeScript 项目,SQL 熟练者
SQLAlchemyPython⭐⭐⭐⭐ 类型注解支持⭐⭐⭐⭐ 灵活可定制无内置(配合 pganalyze 等)免费(开源)Python/FastAPI/Flask 项目
TypeORMTypeScript/JavaScript⭐⭐⭐ 装饰器类型⭐⭐⭐ Entity 模式成熟无内置(配合外部工具)免费(开源)NestJS/Express 项目,ActiveRecord 风格
DieselRust⭐⭐⭐⭐⭐ 编译时验证⭐⭐⭐ 宏系统复杂无内置(编译时安全)免费(开源)Rust 后端项目,零运行时开销
Django ORMPython (Django)⭐⭐⭐ 动态类型⭐⭐⭐⭐ 成熟稳定Django Debug Toolbar / Silk免费(开源)Django 项目
SeaORMRust⭐⭐⭐⭐ 异步原生⭐⭐⭐ 较新生态无内置免费(开源)Rust 异步后端项目
Drizzle + KyselyTypeScript⭐⭐⭐⭐⭐ 类型推断⭐⭐⭐⭐ SQL 构建器无内置免费(开源)复杂查询场景

1.2 AI 辅助 ORM 开发的能力边界

┌─────────────────────────────────────────────────────────────────┐ │ AI 辅助 ORM 开发能力矩阵 │ ├─────────────────────┬───────────────────────────────────────────┤ │ ✅ AI 擅长 │ ⚠️ AI 需要人工审查 │ │ │ │ │ • Schema → Model │ • 复杂关联查询优化 │ │ 代码生成 │ • 分布式事务设计 │ │ • CRUD 操作生成 │ • 连接池参数调优 │ │ • 基础关联定义 │ • 缓存失效策略 │ │ • 类型定义生成 │ • 读写分离路由 │ │ • 索引建议 │ │ │ • N+1 检测 │ │ │ • 查询重写建议 │ │ ├─────────────────────┼───────────────────────────────────────────┤ │ ❌ AI 容易犯错 │ 🔴 AI 不应自主决策 │ │ │ │ │ • 过度 eager load │ • 生产数据库索引变更 │ │ • 忽略查询复杂度 │ • 大表 Schema 迁移 │ │ • 不当使用原生 SQL │ • 数据库引擎选择 │ │ • 忽略并发场景 │ • 分库分表策略 │ │ • 缓存一致性问题 │ • 备份与恢复策略 │ └─────────────────────┴───────────────────────────────────────────┘

2. Prisma:类型安全的 ORM 模型生成与 AI 优化

2.1 Prisma 模型代码生成

Prisma 采用 Schema-first 方式:开发者定义 schema.prisma,Prisma 自动生成完全类型安全的 Client 代码。AI 编码助手对 Prisma Schema DSL 的理解能力极强,是 2025 年 AI 友好度最高的 ORM。

Schema 定义与 Client 生成

// prisma/schema.prisma generator client { provider = "prisma-client-js" } datasource db { provider = "postgresql" url = env("DATABASE_URL") } model User { id String @id @default(uuid()) email String @unique name String avatarUrl String? @map("avatar_url") isActive Boolean @default(true) @map("is_active") role Role @default(USER) createdAt DateTime @default(now()) @map("created_at") updatedAt DateTime @updatedAt @map("updated_at") posts Post[] comments Comment[] profile Profile? @@map("users") @@index([email]) @@index([createdAt]) } model Post { id String @id @default(uuid()) title String content String? slug String @unique published Boolean @default(false) publishedAt DateTime? @map("published_at") authorId String @map("author_id") categoryId String? @map("category_id") createdAt DateTime @default(now()) @map("created_at") updatedAt DateTime @updatedAt @map("updated_at") author User @relation(fields: [authorId], references: [id], onDelete: Cascade) category Category? @relation(fields: [categoryId], references: [id]) comments Comment[] tags Tag[] @@map("posts") @@index([authorId]) @@index([categoryId]) @@index([publishedAt]) @@index([slug]) } model Category { id String @id @default(uuid()) name String @unique slug String @unique parentId String? @map("parent_id") parent Category? @relation("CategoryTree", fields: [parentId], references: [id]) children Category[] @relation("CategoryTree") posts Post[] @@map("categories") } model Comment { id String @id @default(uuid()) content String authorId String @map("author_id") postId String @map("post_id") parentId String? @map("parent_id") createdAt DateTime @default(now()) @map("created_at") author User @relation(fields: [authorId], references: [id], onDelete: Cascade) post Post @relation(fields: [postId], references: [id], onDelete: Cascade) parent Comment? @relation("CommentThread", fields: [parentId], references: [id]) replies Comment[] @relation("CommentThread") @@map("comments") @@index([postId]) @@index([authorId]) } model Tag { id String @id @default(uuid()) name String @unique posts Post[] @@map("tags") } model Profile { id String @id @default(uuid()) bio String? userId String @unique @map("user_id") user User @relation(fields: [userId], references: [id], onDelete: Cascade) @@map("profiles") } enum Role { USER ADMIN MODERATOR }
# 生成 Prisma Client(自动生成类型安全的查询 API) npx prisma generate # 生成的类型文件位于 node_modules/.prisma/client/ # 包含所有模型的 TypeScript 类型定义

AI 生成的类型安全查询模式

// src/repositories/user.repository.ts import { PrismaClient, Prisma } from '@prisma/client'; const prisma = new PrismaClient(); // ✅ 基础 CRUD 操作(AI 可以完美生成) export class UserRepository { // 创建用户(含关联 Profile) async create(data: { email: string; name: string; bio?: string; }) { return prisma.user.create({ data: { email: data.email, name: data.name, profile: data.bio ? { create: { bio: data.bio } } : undefined, }, include: { profile: true, }, }); } // 分页查询(含筛选和排序) async findMany(params: { page: number; pageSize: number; search?: string; role?: 'USER' | 'ADMIN' | 'MODERATOR'; orderBy?: 'createdAt' | 'name' | 'email'; order?: 'asc' | 'desc'; }) { const { page, pageSize, search, role, orderBy = 'createdAt', order = 'desc' } = params; const where: Prisma.UserWhereInput = { isActive: true, ...(search && { OR: [ { name: { contains: search, mode: 'insensitive' } }, { email: { contains: search, mode: 'insensitive' } }, ], }), ...(role && { role }), }; const [users, total] = await Promise.all([ prisma.user.findMany({ where, skip: (page - 1) * pageSize, take: pageSize, orderBy: { [orderBy]: order }, include: { profile: true, _count: { select: { posts: true, comments: true } }, }, }), prisma.user.count({ where }), ]); return { data: users, pagination: { page, pageSize, total, totalPages: Math.ceil(total / pageSize), }, }; } // 带关联的详情查询 async findByIdWithPosts(id: string) { return prisma.user.findUnique({ where: { id }, include: { profile: true, posts: { where: { published: true }, orderBy: { publishedAt: 'desc' }, take: 10, include: { category: true, tags: true, _count: { select: { comments: true } }, }, }, }, }); } // 事务操作:转移用户所有文章 async transferPosts(fromUserId: string, toUserId: string) { return prisma.$transaction(async (tx) => { const posts = await tx.post.findMany({ where: { authorId: fromUserId }, select: { id: true }, }); await tx.post.updateMany({ where: { authorId: fromUserId }, data: { authorId: toUserId }, }); return { transferredCount: posts.length }; }); } }

2.2 Prisma Optimize:AI 驱动的查询优化

Prisma Optimize 是 Prisma 官方推出的 AI 驱动查询分析工具,能够自动检测性能瓶颈并提供优化建议。

工具信息

特性详情
类型SaaS 查询分析平台
集成方式Prisma Client 扩展
核心能力N+1 检测、缺失索引建议、全表扫描告警、查询延迟分析
价格免费(基础)/ $29/月(Pro)/ 自定义(Enterprise)
支持数据库PostgreSQL、MySQL、SQLite、MongoDB

集成 Prisma Optimize

// src/lib/prisma.ts import { PrismaClient } from '@prisma/client'; import { withOptimize } from '@prisma/extension-optimize'; const prisma = new PrismaClient().$extends( withOptimize({ apiKey: process.env.PRISMA_OPTIMIZE_API_KEY!, }) ); export default prisma;

Prisma Optimize 检测的问题类型

┌─────────────────────────────────────────────────────────────────┐ │ Prisma Optimize 检测能力 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 🔍 N+1 查询检测 │ │ ├── 检测循环中的重复查询 │ │ ├── 建议使用 include/select 预加载 │ │ └── 提供具体的代码修改建议 │ │ │ │ 📊 缺失索引建议 │ │ ├── 分析 WHERE 子句中的字段 │ │ ├── 检测 ORDER BY 未索引字段 │ │ └── 推荐复合索引组合 │ │ │ │ ⚡ 全表扫描告警 │ │ ├── 检测缺少 WHERE 条件的大表查询 │ │ ├── 建议添加 LIMIT 限制 │ │ └── 标记潜在的性能瓶颈 │ │ │ │ 📈 查询延迟分析 │ │ ├── P50/P95/P99 延迟统计 │ │ ├── 查询分组和模式识别 │ │ └── 历史趋势对比 │ │ │ │ 🔄 不必要的数据获取 │ │ ├── 检测 SELECT * 模式 │ │ ├── 建议使用 select 精确选择字段 │ │ └── 减少网络传输和内存占用 │ │ │ └─────────────────────────────────────────────────────────────────┘

2.3 Prisma N+1 查询检测与修复

N+1 查询是 ORM 中最常见的性能问题。以下是 Prisma 中 N+1 问题的典型场景和修复方法:

// ❌ N+1 问题:循环中逐个查询作者信息 async function getPostsWithAuthors_BAD() { const posts = await prisma.post.findMany({ where: { published: true }, take: 20, }); // N+1:每篇文章都会触发一次额外查询获取作者 const postsWithAuthors = await Promise.all( posts.map(async (post) => { const author = await prisma.user.findUnique({ where: { id: post.authorId }, }); return { ...post, author }; }) ); return postsWithAuthors; } // 总查询数:1(获取文章)+ 20(获取每个作者)= 21 次查询! // ✅ 修复方案 1:使用 include 预加载 async function getPostsWithAuthors_GOOD_1() { return prisma.post.findMany({ where: { published: true }, take: 20, include: { author: { select: { id: true, name: true, avatarUrl: true }, }, }, }); } // 总查询数:1 次(Prisma 自动 JOIN 或批量查询) // ✅ 修复方案 2:使用 select 精确控制返回字段 async function getPostsWithAuthors_GOOD_2() { return prisma.post.findMany({ where: { published: true }, take: 20, select: { id: true, title: true, slug: true, publishedAt: true, author: { select: { id: true, name: true, avatarUrl: true }, }, _count: { select: { comments: true } }, }, }); } // 总查询数:1 次,且只返回需要的字段 // ✅ 修复方案 3:复杂场景使用 $queryRaw async function getPostStats() { return prisma.$queryRaw` SELECT p.id, p.title, u.name AS author_name, COUNT(c.id) AS comment_count, COUNT(DISTINCT t.id) AS tag_count FROM posts p JOIN users u ON p.author_id = u.id LEFT JOIN comments c ON c.post_id = p.id LEFT JOIN "_PostToTag" pt ON pt."A" = p.id LEFT JOIN tags t ON t.id = pt."B" WHERE p.published = true GROUP BY p.id, p.title, u.name ORDER BY comment_count DESC LIMIT 10 `; }

2.4 Prisma 查询优化提示词模板

提示词模板:生成优化的 Prisma 查询

你是一位 Prisma ORM 专家。请根据以下需求生成高性能的 Prisma 查询代码。 ## Prisma Schema [粘贴相关的 schema.prisma 模型定义] ## 查询需求 [描述需要实现的查询功能] ## 性能要求 - 避免 N+1 查询(使用 include 或 select 预加载关联数据) - 只返回必要的字段(使用 select 而非返回整个模型) - 大列表查询必须分页(使用 skip/take) - 复杂聚合考虑使用 $queryRaw - 写操作使用事务($transaction)保证一致性 ## 输出要求 1. TypeScript 代码(含完整类型定义) 2. 预估查询次数和复杂度 3. 建议的索引(如果当前 Schema 缺少) 4. 潜在的性能风险和优化建议

提示词模板:审查 Prisma 查询性能

你是一位数据库性能专家。请审查以下 Prisma 查询代码的性能问题。 ## 代码 [粘贴 Prisma 查询代码] ## Schema [粘贴相关 Schema] ## 数据规模 - [表名] 约 [行数] 行 - 日均查询量:[次数] ## 审查清单 1. 是否存在 N+1 查询? 2. 是否有不必要的数据获取(SELECT *)? 3. 是否缺少分页? 4. 关联查询是否可以优化? 5. 是否需要添加索引? 6. 是否应该使用 $queryRaw 替代 ORM 查询? 7. 事务使用是否合理? ## 输出 - 🔴 严重问题(必须修复) - 🟡 性能隐患(建议修复) - 🟢 可接受 - 每个问题提供修复后的代码

3. Drizzle ORM:SQL-Like 的类型安全查询

3.1 Drizzle 模型定义与查询模式

Drizzle 的核心理念是”如果你懂 SQL,你就懂 Drizzle”。它提供了最接近原生 SQL 的 TypeScript ORM 体验,同时保持完整的类型安全。

Schema 定义

// src/db/schema.ts import { pgTable, uuid, varchar, text, boolean, timestamp, integer, pgEnum, index, uniqueIndex, primaryKey } from 'drizzle-orm/pg-core'; import { relations } from 'drizzle-orm'; // 枚举定义 export const roleEnum = pgEnum('role', ['USER', 'ADMIN', 'MODERATOR']); // 用户表 export const users = pgTable('users', { id: uuid('id').primaryKey().defaultRandom(), email: varchar('email', { length: 255 }).notNull().unique(), name: varchar('name', { length: 100 }).notNull(), avatarUrl: text('avatar_url'), isActive: boolean('is_active').notNull().default(true), role: roleEnum('role').notNull().default('USER'), createdAt: timestamp('created_at').notNull().defaultNow(), updatedAt: timestamp('updated_at').notNull().defaultNow(), }, (table) => [ index('users_email_idx').on(table.email), index('users_created_at_idx').on(table.createdAt), ]); // 文章表 export const posts = pgTable('posts', { id: uuid('id').primaryKey().defaultRandom(), title: varchar('title', { length: 255 }).notNull(), content: text('content'), slug: varchar('slug', { length: 255 }).notNull().unique(), published: boolean('published').notNull().default(false), publishedAt: timestamp('published_at'), authorId: uuid('author_id').notNull().references(() => users.id, { onDelete: 'cascade' }), categoryId: uuid('category_id').references(() => categories.id), createdAt: timestamp('created_at').notNull().defaultNow(), updatedAt: timestamp('updated_at').notNull().defaultNow(), }, (table) => [ index('posts_author_id_idx').on(table.authorId), index('posts_category_id_idx').on(table.categoryId), index('posts_published_at_idx').on(table.publishedAt), uniqueIndex('posts_slug_idx').on(table.slug), ]); // 分类表(自引用树形结构) export const categories = pgTable('categories', { id: uuid('id').primaryKey().defaultRandom(), name: varchar('name', { length: 100 }).notNull().unique(), slug: varchar('slug', { length: 100 }).notNull().unique(), parentId: uuid('parent_id'), }); // 评论表 export const comments = pgTable('comments', { id: uuid('id').primaryKey().defaultRandom(), content: text('content').notNull(), authorId: uuid('author_id').notNull().references(() => users.id, { onDelete: 'cascade' }), postId: uuid('post_id').notNull().references(() => posts.id, { onDelete: 'cascade' }), parentId: uuid('parent_id'), createdAt: timestamp('created_at').notNull().defaultNow(), }, (table) => [ index('comments_post_id_idx').on(table.postId), index('comments_author_id_idx').on(table.authorId), ]); // 标签表(多对多) export const tags = pgTable('tags', { id: uuid('id').primaryKey().defaultRandom(), name: varchar('name', { length: 50 }).notNull().unique(), }); export const postsToTags = pgTable('posts_to_tags', { postId: uuid('post_id').notNull().references(() => posts.id, { onDelete: 'cascade' }), tagId: uuid('tag_id').notNull().references(() => tags.id, { onDelete: 'cascade' }), }, (table) => [ primaryKey({ columns: [table.postId, table.tagId] }), ]); // 用户资料表 export const profiles = pgTable('profiles', { id: uuid('id').primaryKey().defaultRandom(), bio: text('bio'), userId: uuid('user_id').notNull().unique().references(() => users.id, { onDelete: 'cascade' }), });

关系定义

// src/db/relations.ts import { relations } from 'drizzle-orm'; import { users, posts, comments, tags, postsToTags, profiles, categories } from './schema'; export const usersRelations = relations(users, ({ one, many }) => ({ profile: one(profiles, { fields: [users.id], references: [profiles.userId], }), posts: many(posts), comments: many(comments), })); export const postsRelations = relations(posts, ({ one, many }) => ({ author: one(users, { fields: [posts.authorId], references: [users.id], }), category: one(categories, { fields: [posts.categoryId], references: [categories.id], }), comments: many(comments), postsToTags: many(postsToTags), })); export const commentsRelations = relations(comments, ({ one }) => ({ author: one(users, { fields: [comments.authorId], references: [users.id], }), post: one(posts, { fields: [comments.postId], references: [posts.id], }), })); export const postsToTagsRelations = relations(postsToTags, ({ one }) => ({ post: one(posts, { fields: [postsToTags.postId], references: [posts.id], }), tag: one(tags, { fields: [postsToTags.tagId], references: [tags.id], }), })); export const tagsRelations = relations(tags, ({ many }) => ({ postsToTags: many(postsToTags), })); export const categoriesRelations = relations(categories, ({ many }) => ({ posts: many(posts), })); export const profilesRelations = relations(profiles, ({ one }) => ({ user: one(users, { fields: [profiles.userId], references: [users.id], }), }));

3.2 Drizzle 查询模式与优化

// src/repositories/post.repository.ts import { db } from '../db'; import { posts, users, comments, tags, postsToTags, categories } from '../db/schema'; import { eq, and, or, like, desc, asc, sql, count, ilike, isNotNull } from 'drizzle-orm'; // ✅ 关系查询 API(推荐:自动处理 JOIN,避免 N+1) async function getPostsWithRelations() { return db.query.posts.findMany({ where: eq(posts.published, true), limit: 20, orderBy: [desc(posts.publishedAt)], with: { author: { columns: { id: true, name: true, avatarUrl: true }, }, category: { columns: { id: true, name: true, slug: true }, }, comments: { columns: { id: true }, // 只计数,不返回完整评论 }, postsToTags: { with: { tag: true, }, }, }, columns: { id: true, title: true, slug: true, publishedAt: true, }, }); } // ✅ SQL-like 查询构建器(适合复杂查询) async function getPostStats() { return db .select({ postId: posts.id, title: posts.title, authorName: users.name, commentCount: count(comments.id).as('comment_count'), }) .from(posts) .innerJoin(users, eq(posts.authorId, users.id)) .leftJoin(comments, eq(comments.postId, posts.id)) .where(eq(posts.published, true)) .groupBy(posts.id, posts.title, users.name) .orderBy(desc(sql`comment_count`)) .limit(10); } // ✅ 分页查询(含搜索和筛选) async function searchPosts(params: { search?: string; categoryId?: string; page: number; pageSize: number; }) { const { search, categoryId, page, pageSize } = params; const conditions = [eq(posts.published, true)]; if (search) { conditions.push( or( ilike(posts.title, `%${search}%`), ilike(posts.content, `%${search}%`) )! ); } if (categoryId) { conditions.push(eq(posts.categoryId, categoryId)); } const whereClause = and(...conditions); const [data, totalResult] = await Promise.all([ db.query.posts.findMany({ where: whereClause, limit: pageSize, offset: (page - 1) * pageSize, orderBy: [desc(posts.publishedAt)], with: { author: { columns: { id: true, name: true } }, category: { columns: { id: true, name: true } }, }, }), db.select({ count: count() }).from(posts).where(whereClause), ]); return { data, pagination: { page, pageSize, total: totalResult[0].count, totalPages: Math.ceil(totalResult[0].count / pageSize), }, }; } // ✅ 子查询和 CTE async function getActiveAuthorsWithPostCount() { const postCountSubquery = db .select({ authorId: posts.authorId, postCount: count(posts.id).as('post_count'), }) .from(posts) .where(eq(posts.published, true)) .groupBy(posts.authorId) .as('post_counts'); return db .select({ userId: users.id, userName: users.name, email: users.email, postCount: postCountSubquery.postCount, }) .from(users) .innerJoin(postCountSubquery, eq(users.id, postCountSubquery.authorId)) .where(eq(users.isActive, true)) .orderBy(desc(postCountSubquery.postCount)); } // ✅ 事务操作 async function createPostWithTags(data: { title: string; content: string; slug: string; authorId: string; categoryId?: string; tagIds: string[]; }) { return db.transaction(async (tx) => { const [post] = await tx.insert(posts).values({ title: data.title, content: data.content, slug: data.slug, authorId: data.authorId, categoryId: data.categoryId, }).returning(); if (data.tagIds.length > 0) { await tx.insert(postsToTags).values( data.tagIds.map((tagId) => ({ postId: post.id, tagId, })) ); } return post; }); }

3.3 Drizzle N+1 检测与修复

// ❌ N+1 问题:使用 select 构建器时手动循环查询 async function getPostsWithAuthors_BAD() { const allPosts = await db.select().from(posts).where(eq(posts.published, true)).limit(20); // N+1:每篇文章单独查询作者 const result = await Promise.all( allPosts.map(async (post) => { const [author] = await db .select({ name: users.name, avatarUrl: users.avatarUrl }) .from(users) .where(eq(users.id, post.authorId)); return { ...post, author }; }) ); return result; } // 总查询:1 + 20 = 21 次 // ✅ 修复方案 1:使用 JOIN async function getPostsWithAuthors_GOOD_1() { return db .select({ post: posts, authorName: users.name, authorAvatar: users.avatarUrl, }) .from(posts) .innerJoin(users, eq(posts.authorId, users.id)) .where(eq(posts.published, true)) .limit(20); } // 总查询:1 次 // ✅ 修复方案 2:使用关系查询 API async function getPostsWithAuthors_GOOD_2() { return db.query.posts.findMany({ where: eq(posts.published, true), limit: 20, with: { author: { columns: { name: true, avatarUrl: true }, }, }, }); } // 总查询:Drizzle 自动优化为最少查询次数

4. SQLAlchemy:Python 生态的 ORM 标准

4.1 SQLAlchemy 模型代码生成

SQLAlchemy 2.0+ 采用声明式映射(Declarative Mapping),支持完整的类型注解,AI 编码助手可以生成类型安全的 Python 模型代码。

模型定义(SQLAlchemy 2.0+ 风格)

# app/models/base.py from datetime import datetime from uuid import uuid4 from sqlalchemy import String, Text, Boolean, DateTime, ForeignKey, Index, Enum as SAEnum from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import ( DeclarativeBase, Mapped, mapped_column, relationship, MappedAsDataclass ) import enum class Base(DeclarativeBase): pass class Role(str, enum.Enum): USER = "USER" ADMIN = "ADMIN" MODERATOR = "MODERATOR"
# app/models/user.py from __future__ import annotations from typing import Optional, List from sqlalchemy import String, Text, Boolean, DateTime, Index from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column, relationship from datetime import datetime from uuid import uuid4 from .base import Base, Role class User(Base): __tablename__ = "users" id: Mapped[str] = mapped_column( UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()) ) email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) name: Mapped[str] = mapped_column(String(100), nullable=False) avatar_url: Mapped[Optional[str]] = mapped_column(Text, nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) role: Mapped[Role] = mapped_column( SAEnum(Role, name="role"), default=Role.USER, nullable=False ) created_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) # 关系 profile: Mapped[Optional["Profile"]] = relationship( back_populates="user", cascade="all, delete-orphan", uselist=False ) posts: Mapped[List["Post"]] = relationship( back_populates="author", cascade="all, delete-orphan" ) comments: Mapped[List["Comment"]] = relationship( back_populates="author", cascade="all, delete-orphan" ) __table_args__ = ( Index("ix_users_email", "email"), Index("ix_users_created_at", "created_at"), ) def __repr__(self) -> str: return f"<User(id={self.id}, email={self.email})>"
# app/models/post.py from __future__ import annotations from typing import Optional, List from sqlalchemy import String, Text, Boolean, DateTime, ForeignKey, Index, Table, Column from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column, relationship from datetime import datetime from uuid import uuid4 from .base import Base # 多对多关联表 posts_tags = Table( "posts_tags", Base.metadata, Column("post_id", UUID(as_uuid=False), ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True), Column("tag_id", UUID(as_uuid=False), ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True), ) class Post(Base): __tablename__ = "posts" id: Mapped[str] = mapped_column( UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()) ) title: Mapped[str] = mapped_column(String(255), nullable=False) content: Mapped[Optional[str]] = mapped_column(Text, nullable=True) slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) published_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) author_id: Mapped[str] = mapped_column( UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False ) category_id: Mapped[Optional[str]] = mapped_column( UUID(as_uuid=False), ForeignKey("categories.id"), nullable=True ) created_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) # 关系 author: Mapped["User"] = relationship(back_populates="posts") category: Mapped[Optional["Category"]] = relationship(back_populates="posts") comments: Mapped[List["Comment"]] = relationship( back_populates="post", cascade="all, delete-orphan" ) tags: Mapped[List["Tag"]] = relationship( secondary=posts_tags, back_populates="posts" ) __table_args__ = ( Index("ix_posts_author_id", "author_id"), Index("ix_posts_category_id", "category_id"), Index("ix_posts_published_at", "published_at"), Index("ix_posts_slug", "slug"), ) class Category(Base): __tablename__ = "categories" id: Mapped[str] = mapped_column( UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()) ) name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) parent_id: Mapped[Optional[str]] = mapped_column( UUID(as_uuid=False), ForeignKey("categories.id"), nullable=True ) parent: Mapped[Optional["Category"]] = relationship( remote_side="Category.id", back_populates="children" ) children: Mapped[List["Category"]] = relationship(back_populates="parent") posts: Mapped[List["Post"]] = relationship(back_populates="category") class Comment(Base): __tablename__ = "comments" id: Mapped[str] = mapped_column( UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()) ) content: Mapped[str] = mapped_column(Text, nullable=False) author_id: Mapped[str] = mapped_column( UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False ) post_id: Mapped[str] = mapped_column( UUID(as_uuid=False), ForeignKey("posts.id", ondelete="CASCADE"), nullable=False ) parent_id: Mapped[Optional[str]] = mapped_column( UUID(as_uuid=False), ForeignKey("comments.id"), nullable=True ) created_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, nullable=False ) author: Mapped["User"] = relationship(back_populates="comments") post: Mapped["Post"] = relationship(back_populates="comments") __table_args__ = ( Index("ix_comments_post_id", "post_id"), Index("ix_comments_author_id", "author_id"), ) class Tag(Base): __tablename__ = "tags" id: Mapped[str] = mapped_column( UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()) ) name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False) posts: Mapped[List["Post"]] = relationship( secondary=posts_tags, back_populates="tags" ) class Profile(Base): __tablename__ = "profiles" id: Mapped[str] = mapped_column( UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()) ) bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True) user_id: Mapped[str] = mapped_column( UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), unique=True, nullable=False ) user: Mapped["User"] = relationship(back_populates="profile")

4.2 SQLAlchemy 查询优化模式

# app/repositories/post_repository.py from sqlalchemy import select, func, and_, or_, desc from sqlalchemy.orm import selectinload, joinedload, subqueryload, load_only from sqlalchemy.ext.asyncio import AsyncSession from app.models.post import Post, Comment, Tag, Category from app.models.user import User class PostRepository: def __init__(self, session: AsyncSession): self.session = session # ✅ 使用 selectinload 避免 N+1(推荐用于一对多关系) async def get_posts_with_authors(self, limit: int = 20): stmt = ( select(Post) .where(Post.published == True) .options( joinedload(Post.author).load_only(User.id, User.name, User.avatar_url), selectinload(Post.tags), joinedload(Post.category), ) .order_by(desc(Post.published_at)) .limit(limit) ) result = await self.session.execute(stmt) return result.scalars().unique().all() # ✅ 分页查询(含搜索和筛选) async def search_posts( self, page: int = 1, page_size: int = 20, search: str | None = None, category_id: str | None = None, ): conditions = [Post.published == True] if search: conditions.append( or_( Post.title.ilike(f"%{search}%"), Post.content.ilike(f"%{search}%"), ) ) if category_id: conditions.append(Post.category_id == category_id) where_clause = and_(*conditions) # 数据查询 data_stmt = ( select(Post) .where(where_clause) .options( joinedload(Post.author).load_only(User.id, User.name), joinedload(Post.category).load_only(Category.id, Category.name), ) .order_by(desc(Post.published_at)) .offset((page - 1) * page_size) .limit(page_size) ) # 计数查询 count_stmt = select(func.count(Post.id)).where(where_clause) data_result = await self.session.execute(data_stmt) count_result = await self.session.execute(count_stmt) posts = data_result.scalars().unique().all() total = count_result.scalar_one() return { "data": posts, "pagination": { "page": page, "page_size": page_size, "total": total, "total_pages": (total + page_size - 1) // page_size, }, } # ✅ 聚合查询(使用原生 SQL 表达式) async def get_post_stats(self): stmt = ( select( Post.id, Post.title, User.name.label("author_name"), func.count(Comment.id).label("comment_count"), ) .join(User, Post.author_id == User.id) .outerjoin(Comment, Comment.post_id == Post.id) .where(Post.published == True) .group_by(Post.id, Post.title, User.name) .order_by(desc(func.count(Comment.id))) .limit(10) ) result = await self.session.execute(stmt) return result.all()

4.3 SQLAlchemy N+1 检测与修复

# ❌ N+1 问题:默认 lazy loading async def get_posts_bad(session: AsyncSession): stmt = select(Post).where(Post.published == True).limit(20) result = await session.execute(stmt) posts = result.scalars().all() # 每次访问 post.author 都会触发一次额外查询! for post in posts: print(f"{post.title} by {post.author.name}") # N+1! return posts # ✅ 修复方案 1:joinedload(LEFT JOIN,适合一对一/多对一) async def get_posts_good_1(session: AsyncSession): stmt = ( select(Post) .where(Post.published == True) .options(joinedload(Post.author)) .limit(20) ) result = await session.execute(stmt) posts = result.scalars().unique().all() for post in posts: print(f"{post.title} by {post.author.name}") # 无额外查询 return posts # ✅ 修复方案 2:selectinload(SELECT IN,适合一对多) async def get_posts_good_2(session: AsyncSession): stmt = ( select(Post) .where(Post.published == True) .options( joinedload(Post.author), # 多对一:用 JOIN selectinload(Post.comments), # 一对多:用 SELECT IN selectinload(Post.tags), # 多对多:用 SELECT IN ) .limit(20) ) result = await session.execute(stmt) return result.scalars().unique().all() # ✅ 修复方案 3:subqueryload(子查询,适合大数据集) async def get_posts_good_3(session: AsyncSession): stmt = ( select(Post) .where(Post.published == True) .options( subqueryload(Post.comments), # 使用子查询加载 ) .limit(20) ) result = await session.execute(stmt) return result.scalars().all()

SQLAlchemy 加载策略对比

策略SQL 模式适用关系优点缺点
joinedloadLEFT JOIN多对一、一对一单次查询大结果集时数据重复
selectinloadSELECT … WHERE id IN (…)一对多、多对多无数据重复需要两次查询
subqueryload子查询一对多(大数据集)适合复杂过滤子查询可能较慢
lazyload按需查询任何简单N+1 风险
raiseload禁止加载任何强制显式加载需要预先规划

5. TypeORM:Entity-First 的 ORM 模型生成

5.1 TypeORM Entity 定义与代码生成

TypeORM 采用装饰器(Decorator)模式定义 Entity,AI 编码助手可以根据数据库 Schema 或业务需求自动生成完整的 Entity 代码。

Entity 定义

// src/entities/User.ts import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, UpdateDateColumn, Index, OneToMany, OneToOne, JoinColumn } from 'typeorm'; import { Post } from './Post'; import { Comment } from './Comment'; import { Profile } from './Profile'; export enum Role { USER = 'USER', ADMIN = 'ADMIN', MODERATOR = 'MODERATOR', } @Entity('users') export class User { @PrimaryGeneratedColumn('uuid') id: string; @Column({ type: 'varchar', length: 255, unique: true }) @Index('idx_users_email') email: string; @Column({ type: 'varchar', length: 100 }) name: string; @Column({ type: 'text', nullable: true, name: 'avatar_url' }) avatarUrl: string | null; @Column({ type: 'boolean', default: true, name: 'is_active' }) isActive: boolean; @Column({ type: 'enum', enum: Role, default: Role.USER }) role: Role; @CreateDateColumn({ name: 'created_at' }) @Index('idx_users_created_at') createdAt: Date; @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; @OneToMany(() => Post, (post) => post.author, { cascade: true }) posts: Post[]; @OneToMany(() => Comment, (comment) => comment.author, { cascade: true }) comments: Comment[]; @OneToOne(() => Profile, (profile) => profile.user, { cascade: true }) profile: Profile; }
// src/entities/Post.ts import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, UpdateDateColumn, Index, ManyToOne, OneToMany, ManyToMany, JoinColumn, JoinTable } from 'typeorm'; import { User } from './User'; import { Category } from './Category'; import { Comment } from './Comment'; import { Tag } from './Tag'; @Entity('posts') export class Post { @PrimaryGeneratedColumn('uuid') id: string; @Column({ type: 'varchar', length: 255 }) title: string; @Column({ type: 'text', nullable: true }) content: string | null; @Column({ type: 'varchar', length: 255, unique: true }) @Index('idx_posts_slug') slug: string; @Column({ type: 'boolean', default: false }) published: boolean; @Column({ type: 'timestamp', nullable: true, name: 'published_at' }) @Index('idx_posts_published_at') publishedAt: Date | null; @Column({ name: 'author_id' }) @Index('idx_posts_author_id') authorId: string; @Column({ name: 'category_id', nullable: true }) @Index('idx_posts_category_id') categoryId: string | null; @CreateDateColumn({ name: 'created_at' }) createdAt: Date; @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; @ManyToOne(() => User, (user) => user.posts, { onDelete: 'CASCADE' }) @JoinColumn({ name: 'author_id' }) author: User; @ManyToOne(() => Category, (category) => category.posts) @JoinColumn({ name: 'category_id' }) category: Category | null; @OneToMany(() => Comment, (comment) => comment.post, { cascade: true }) comments: Comment[]; @ManyToMany(() => Tag, (tag) => tag.posts, { cascade: true }) @JoinTable({ name: 'posts_tags', joinColumn: { name: 'post_id', referencedColumnName: 'id' }, inverseJoinColumn: { name: 'tag_id', referencedColumnName: 'id' }, }) tags: Tag[]; }

5.2 TypeORM QueryBuilder 查询优化

// src/repositories/PostRepository.ts import { Repository, DataSource, Brackets } from 'typeorm'; import { Post } from '../entities/Post'; export class PostRepository { private repo: Repository<Post>; constructor(dataSource: DataSource) { this.repo = dataSource.getRepository(Post); } // ✅ 使用 QueryBuilder 避免 N+1 async getPostsWithRelations(limit: number = 20) { return this.repo .createQueryBuilder('post') .leftJoinAndSelect('post.author', 'author') .leftJoinAndSelect('post.category', 'category') .leftJoinAndSelect('post.tags', 'tag') .loadRelationCountAndMap('post.commentCount', 'post.comments') .where('post.published = :published', { published: true }) .orderBy('post.publishedAt', 'DESC') .take(limit) .getMany(); } // ✅ 分页搜索查询 async searchPosts(params: { search?: string; categoryId?: string; page: number; pageSize: number; }) { const { search, categoryId, page, pageSize } = params; const qb = this.repo .createQueryBuilder('post') .leftJoinAndSelect('post.author', 'author') .leftJoinAndSelect('post.category', 'category') .where('post.published = :published', { published: true }); if (search) { qb.andWhere( new Brackets((sub) => { sub .where('post.title ILIKE :search', { search: `%${search}%` }) .orWhere('post.content ILIKE :search', { search: `%${search}%` }); }) ); } if (categoryId) { qb.andWhere('post.categoryId = :categoryId', { categoryId }); } const [data, total] = await qb .orderBy('post.publishedAt', 'DESC') .skip((page - 1) * pageSize) .take(pageSize) .getManyAndCount(); return { data, pagination: { page, pageSize, total, totalPages: Math.ceil(total / pageSize), }, }; } // ✅ 聚合查询 async getPostStats() { return this.repo .createQueryBuilder('post') .select('post.id', 'postId') .addSelect('post.title', 'title') .addSelect('author.name', 'authorName') .addSelect('COUNT(comment.id)', 'commentCount') .innerJoin('post.author', 'author') .leftJoin('post.comments', 'comment') .where('post.published = :published', { published: true }) .groupBy('post.id') .addGroupBy('post.title') .addGroupBy('author.name') .orderBy('COUNT(comment.id)', 'DESC') .limit(10) .getRawMany(); } // ✅ 事务操作 async createPostWithTags( dataSource: DataSource, data: { title: string; content: string; slug: string; authorId: string; categoryId?: string; tagIds: string[]; } ) { return dataSource.transaction(async (manager) => { const post = manager.create(Post, { title: data.title, content: data.content, slug: data.slug, authorId: data.authorId, categoryId: data.categoryId, }); const savedPost = await manager.save(post); if (data.tagIds.length > 0) { await manager .createQueryBuilder() .relation(Post, 'tags') .of(savedPost.id) .add(data.tagIds); } return savedPost; }); } }

5.3 TypeORM N+1 检测与修复

// ❌ N+1 问题:使用 find 时未指定 relations async function getPostsBad(repo: Repository<Post>) { const posts = await repo.find({ where: { published: true }, take: 20, }); // 每次访问 post.author 都会触发 lazy loading(如果配置了 lazy) // 或者返回 undefined(如果没配置 lazy) for (const post of posts) { console.log(post.author?.name); // 可能是 undefined 或触发 N+1 } } // ✅ 修复方案 1:使用 relations 选项 async function getPostsGood1(repo: Repository<Post>) { return repo.find({ where: { published: true }, take: 20, relations: { author: true, category: true, tags: true, }, select: { id: true, title: true, slug: true, publishedAt: true, author: { id: true, name: true, avatarUrl: true }, category: { id: true, name: true }, }, }); } // ✅ 修复方案 2:使用 QueryBuilder(更灵活) async function getPostsGood2(repo: Repository<Post>) { return repo .createQueryBuilder('post') .leftJoinAndSelect('post.author', 'author') .leftJoinAndSelect('post.category', 'category') .leftJoinAndSelect('post.tags', 'tag') .select([ 'post.id', 'post.title', 'post.slug', 'post.publishedAt', 'author.id', 'author.name', 'author.avatarUrl', 'category.id', 'category.name', 'tag.id', 'tag.name', ]) .where('post.published = :published', { published: true }) .orderBy('post.publishedAt', 'DESC') .take(20) .getMany(); }

6. Diesel:Rust 生态的编译时安全 ORM

6.1 Diesel 模型代码生成(Queryable/Insertable 宏)

Diesel 的核心优势是编译时 Schema 验证——通过 QueryableInsertableSelectable 等 derive 宏,Diesel 在编译阶段就能确保 Rust 结构体与数据库 Schema 完全匹配。

Schema 定义(自动生成)

// src/schema.rs(由 diesel print-schema 自动生成) diesel::table! { users (id) { id -> Uuid, email -> Varchar, name -> Varchar, avatar_url -> Nullable<Text>, is_active -> Bool, role -> Varchar, created_at -> Timestamp, updated_at -> Timestamp, } } diesel::table! { posts (id) { id -> Uuid, title -> Varchar, content -> Nullable<Text>, slug -> Varchar, published -> Bool, published_at -> Nullable<Timestamp>, author_id -> Uuid, category_id -> Nullable<Uuid>, created_at -> Timestamp, updated_at -> Timestamp, } } diesel::table! { categories (id) { id -> Uuid, name -> Varchar, slug -> Varchar, parent_id -> Nullable<Uuid>, } } diesel::table! { comments (id) { id -> Uuid, content -> Text, author_id -> Uuid, post_id -> Uuid, parent_id -> Nullable<Uuid>, created_at -> Timestamp, } } diesel::table! { tags (id) { id -> Uuid, name -> Varchar, } } diesel::table! { posts_tags (post_id, tag_id) { post_id -> Uuid, tag_id -> Uuid, } } diesel::table! { profiles (id) { id -> Uuid, bio -> Nullable<Text>, user_id -> Uuid, } } diesel::joinable!(posts -> users (author_id)); diesel::joinable!(posts -> categories (category_id)); diesel::joinable!(comments -> users (author_id)); diesel::joinable!(comments -> posts (post_id)); diesel::joinable!(posts_tags -> posts (post_id)); diesel::joinable!(posts_tags -> tags (tag_id)); diesel::joinable!(profiles -> users (user_id)); diesel::allow_tables_to_appear_in_same_query!( users, posts, categories, comments, tags, posts_tags, profiles, );

Model 结构体定义

// src/models/user.rs use diesel::prelude::*; use uuid::Uuid; use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; /// 查询用:从数据库读取完整用户记录 #[derive(Queryable, Selectable, Debug, Serialize)] #[diesel(table_name = crate::schema::users)] #[diesel(check_for_backend(diesel::pg::Pg))] pub struct User { pub id: Uuid, pub email: String, pub name: String, pub avatar_url: Option<String>, pub is_active: bool, pub role: String, pub created_at: NaiveDateTime, pub updated_at: NaiveDateTime, } /// 插入用:创建新用户 #[derive(Insertable, Debug, Deserialize)] #[diesel(table_name = crate::schema::users)] pub struct NewUser<'a> { pub email: &'a str, pub name: &'a str, pub avatar_url: Option<&'a str>, pub role: Option<&'a str>, } /// 更新用:部分更新用户信息 #[derive(AsChangeset, Debug, Deserialize)] #[diesel(table_name = crate::schema::users)] pub struct UpdateUser<'a> { pub name: Option<&'a str>, pub avatar_url: Option<Option<&'a str>>, // Option<Option<>> 支持设置为 NULL pub is_active: Option<bool>, pub role: Option<&'a str>, } /// 精简查询用:只返回必要字段(避免 SELECT *) #[derive(Queryable, Selectable, Debug, Serialize)] #[diesel(table_name = crate::schema::users)] pub struct UserSummary { pub id: Uuid, pub name: String, pub avatar_url: Option<String>, }
// src/models/post.rs use diesel::prelude::*; use uuid::Uuid; use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; #[derive(Queryable, Selectable, Debug, Serialize, Associations, Identifiable)] #[diesel(table_name = crate::schema::posts)] #[diesel(belongs_to(crate::models::user::User, foreign_key = author_id))] pub struct Post { pub id: Uuid, pub title: String, pub content: Option<String>, pub slug: String, pub published: bool, pub published_at: Option<NaiveDateTime>, pub author_id: Uuid, pub category_id: Option<Uuid>, pub created_at: NaiveDateTime, pub updated_at: NaiveDateTime, } #[derive(Insertable, Debug, Deserialize)] #[diesel(table_name = crate::schema::posts)] pub struct NewPost<'a> { pub title: &'a str, pub content: Option<&'a str>, pub slug: &'a str, pub author_id: Uuid, pub category_id: Option<Uuid>, } /// 文章摘要(列表展示用) #[derive(Queryable, Selectable, Debug, Serialize)] #[diesel(table_name = crate::schema::posts)] pub struct PostSummary { pub id: Uuid, pub title: String, pub slug: String, pub published_at: Option<NaiveDateTime>, pub author_id: Uuid, }

6.2 Diesel 查询模式与优化

// src/repositories/post_repository.rs use diesel::prelude::*; use diesel::pg::PgConnection; use diesel::dsl::count; use uuid::Uuid; use crate::schema::{posts, users, comments, categories}; use crate::models::post::{Post, PostSummary, NewPost}; use crate::models::user::{User, UserSummary}; /// 获取已发布文章(含作者信息,使用 JOIN 避免 N+1) pub fn get_published_posts_with_authors( conn: &mut PgConnection, limit: i64, ) -> QueryResult<Vec<(PostSummary, UserSummary)>> { posts::table .inner_join(users::table.on(posts::author_id.eq(users::id))) .filter(posts::published.eq(true)) .order(posts::published_at.desc()) .limit(limit) .select((PostSummary::as_select(), UserSummary::as_select())) .load::<(PostSummary, UserSummary)>(conn) } /// 分页查询(含搜索) pub fn search_posts( conn: &mut PgConnection, search: Option<&str>, category_id: Option<Uuid>, page: i64, page_size: i64, ) -> QueryResult<(Vec<(PostSummary, UserSummary)>, i64)> { let mut query = posts::table .inner_join(users::table.on(posts::author_id.eq(users::id))) .filter(posts::published.eq(true)) .into_boxed(); if let Some(search_term) = search { let pattern = format!("%{}%", search_term); query = query.filter( posts::title.ilike(pattern.clone()) .or(posts::content.ilike(pattern)) ); } if let Some(cat_id) = category_id { query = query.filter(posts::category_id.eq(cat_id)); } // 计数查询 let total = query .count() .get_result::<i64>(conn)?; // 数据查询 let data = query .order(posts::published_at.desc()) .offset((page - 1) * page_size) .limit(page_size) .select((PostSummary::as_select(), UserSummary::as_select())) .load::<(PostSummary, UserSummary)>(conn)?; Ok((data, total)) } /// 聚合查询:文章评论统计 pub fn get_post_comment_stats( conn: &mut PgConnection, limit: i64, ) -> QueryResult<Vec<(Uuid, String, String, i64)>> { posts::table .inner_join(users::table.on(posts::author_id.eq(users::id))) .left_join(comments::table.on(comments::post_id.eq(posts::id))) .filter(posts::published.eq(true)) .group_by((posts::id, posts::title, users::name)) .select(( posts::id, posts::title, users::name, count(comments::id), )) .order(count(comments::id).desc()) .limit(limit) .load::<(Uuid, String, String, i64)>(conn) } /// 事务操作:创建文章并关联标签 pub fn create_post_with_tags( conn: &mut PgConnection, new_post: &NewPost, tag_ids: &[Uuid], ) -> QueryResult<Post> { conn.transaction(|conn| { // 插入文章 let post = diesel::insert_into(posts::table) .values(new_post) .returning(Post::as_returning()) .get_result(conn)?; // 关联标签 if !tag_ids.is_empty() { use crate::schema::posts_tags; let values: Vec<_> = tag_ids .iter() .map(|tag_id| { ( posts_tags::post_id.eq(post.id), posts_tags::tag_id.eq(*tag_id), ) }) .collect(); diesel::insert_into(posts_tags::table) .values(&values) .execute(conn)?; } Ok(post) }) }

6.3 Diesel 的编译时安全优势

// ✅ 编译时类型检查:如果 Schema 和 Model 不匹配,编译直接报错 // 假设 schema.rs 中 users 表没有 phone 字段 #[derive(Queryable, Selectable)] #[diesel(table_name = crate::schema::users)] pub struct UserWithPhone { pub id: Uuid, pub name: String, pub phone: String, // ❌ 编译错误:users 表没有 phone 列! } // 假设 schema.rs 中 email 是 Varchar,但 Model 定义为 i32 #[derive(Queryable, Selectable)] #[diesel(table_name = crate::schema::users)] pub struct UserBadType { pub id: Uuid, pub email: i32, // ❌ 编译错误:类型不匹配!Varchar != i32 pub name: String, } // ✅ 正确:Model 与 Schema 完全匹配 #[derive(Queryable, Selectable)] #[diesel(table_name = crate::schema::users)] pub struct UserCorrect { pub id: Uuid, pub email: String, pub name: String, } // ✅ 编译通过:类型安全保证

7. AI 辅助查询优化工具全景

7.1 查询优化工具对比

工具类型核心能力支持数据库AI 功能价格适用场景
Prisma OptimizeSaaS 平台查询延迟分析、N+1 检测、索引建议、全表扫描告警PostgreSQL、MySQL、SQLite、MongoDBAI 驱动的优化建议免费(基础)/ $29/月(Pro)Prisma 项目专用
PlanetScale InsightsSaaS 平台查询分析、索引建议、慢查询检测、连接监控MySQL、PostgreSQLAI 驱动的索引建议包含在 PlanetScale 计划中($39/月起)PlanetScale 托管数据库
pganalyzeSaaS/自托管EXPLAIN 计划分析、索引建议、查询统计、等待事件分析PostgreSQLQuery Advisor(AI 查询优化)$150/月起(Starter)/ 自定义(Enterprise)PostgreSQL 生产环境
AI2sqlSaaS 平台自然语言→SQL、SQL 优化、索引建议、查询解释PostgreSQL、MySQL、SQLite、SQL Server全 AI 驱动免费(基础)/ $12/月(Pro)/ $24/月(Business)SQL 编写和优化
Datadog DBMSaaS 平台查询性能监控、执行计划分析、等待分析、资源关联PostgreSQL、MySQL、SQL Server、OracleAI 异常检测$70/主机/月(DBM)企业级数据库监控
MetisSaaS 平台SQL 审查、性能预测、Schema 变更影响分析PostgreSQL、MySQLAI SQL 审查免费(开源 Guardian)/ 付费(Platform)CI/CD 集成 SQL 审查
EverSQLSaaS 平台SQL 优化、索引建议、Schema 优化PostgreSQL、MySQL、MariaDBAI 驱动优化免费(基础)/ $29/月(Pro)SQL 查询优化
Aiven AI DBOptimizerSaaS 平台查询优化、索引建议、配置调优PostgreSQL、MySQLAI 驱动包含在 Aiven 计划中Aiven 托管数据库

7.2 AI 查询优化工作流

┌─────────────────────────────────────────────────────────────────────┐ │ AI 辅助查询优化完整工作流 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ① 查询性能监控 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Prisma Optimize / pganalyze / Datadog DBM │ │ │ │ • 收集查询延迟数据(P50/P95/P99) │ │ │ │ • 识别慢查询(>100ms) │ │ │ │ • 检测 N+1 查询模式 │ │ │ │ • 监控连接池使用率 │ │ │ └──────────────────────┬──────────────────────────────────┘ │ │ ▼ │ │ ② AI 分析与诊断 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ AI 编码助手 / AI2sql / pganalyze Query Advisor │ │ │ │ • 分析 EXPLAIN ANALYZE 输出 │ │ │ │ • 识别全表扫描和低效 JOIN │ │ │ │ • 检测缺失索引 │ │ │ │ • 评估查询复杂度 │ │ │ └──────────────────────┬──────────────────────────────────┘ │ │ ▼ │ │ ③ 优化建议生成 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ • 索引创建建议(含 CREATE INDEX 语句) │ │ │ │ • 查询重写建议(含优化后的 SQL/ORM 代码) │ │ │ │ • N+1 修复方案(含具体代码修改) │ │ │ │ • 连接池参数调优建议 │ │ │ └──────────────────────┬──────────────────────────────────┘ │ │ ▼ │ │ ④ 人工审查与实施 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ • 审查 AI 建议的索引(评估写入性能影响) │ │ │ │ • 在测试环境验证查询优化效果 │ │ │ │ • 使用 EXPLAIN ANALYZE 对比优化前后 │ │ │ │ • 部署到生产环境并持续监控 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘

8. AI 辅助索引优化

8.1 索引类型与适用场景

索引类型PostgreSQLMySQL适用场景AI 建议频率
B-Tree 索引✅ 默认✅ 默认等值查询、范围查询、排序⭐⭐⭐⭐⭐ 最常见
Hash 索引纯等值查询⭐⭐ 较少使用
GIN 索引全文搜索、JSONB、数组⭐⭐⭐ 特定场景
GiST 索引地理空间、范围类型⭐⭐ 特定场景
BRIN 索引大表时间序列数据⭐⭐⭐ 大数据场景
部分索引只索引满足条件的行⭐⭐⭐⭐ AI 常推荐
覆盖索引✅ INCLUDE避免回表查询⭐⭐⭐⭐ AI 常推荐
复合索引多列查询条件⭐⭐⭐⭐⭐ 最常见
全文索引✅ tsvector✅ FULLTEXT文本搜索⭐⭐⭐ 搜索场景

8.2 AI 辅助索引建议实战

使用 AI 编码助手分析查询并建议索引

-- 场景:电商平台订单查询 -- 慢查询:按用户 ID + 状态 + 时间范围查询订单 SELECT o.id, o.total_amount, o.status, o.created_at, oi.product_name, oi.quantity, oi.unit_price FROM orders o JOIN order_items oi ON oi.order_id = o.id WHERE o.user_id = '550e8400-e29b-41d4-a716-446655440000' AND o.status = 'completed' AND o.created_at >= '2025-01-01' AND o.created_at < '2025-07-01' ORDER BY o.created_at DESC LIMIT 20;

EXPLAIN ANALYZE 输出分析

-- 执行 EXPLAIN ANALYZE 获取查询计划 EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) SELECT o.id, o.total_amount, o.status, o.created_at, oi.product_name, oi.quantity, oi.unit_price FROM orders o JOIN order_items oi ON oi.order_id = o.id WHERE o.user_id = '550e8400-e29b-41d4-a716-446655440000' AND o.status = 'completed' AND o.created_at >= '2025-01-01' AND o.created_at < '2025-07-01' ORDER BY o.created_at DESC LIMIT 20; -- 输出示例(优化前): -- Limit (cost=15234.56..15234.61 rows=20 width=128) (actual time=892.345..892.367 rows=20 loops=1) -- -> Sort (cost=15234.56..15267.89 rows=13332 width=128) (actual time=892.343..892.358 rows=20 loops=1) -- Sort Key: o.created_at DESC -- Sort Method: top-N heapsort Memory: 28kB -- -> Hash Join (cost=4567.89..14890.12 rows=13332 width=128) (actual time=456.789..878.901 rows=15234 loops=1) -- Hash Cond: (oi.order_id = o.id) -- -> Seq Scan on order_items oi (cost=0.00..8901.23 rows=500000 width=64) (actual time=0.012..234.567 rows=500000 loops=1) -- -> Hash (cost=4234.56..4234.56 rows=13332 width=64) (actual time=456.123..456.123 rows=15234 loops=1) -- Buckets: 16384 Batches: 1 Memory Usage: 1234kB -- -> Seq Scan on orders o (cost=0.00..4234.56 rows=13332 width=64) (actual time=0.023..445.678 rows=15234 loops=1) -- Filter: ((user_id = '550e8400...'::uuid) AND (status = 'completed') AND (created_at >= '2025-01-01') AND (created_at < '2025-07-01')) -- Rows Removed by Filter: 984766 -- Planning Time: 0.234 ms -- Execution Time: 892.456 ms

AI 建议的索引优化方案

-- AI 分析结果: -- 🔴 问题 1:orders 表全表扫描(Seq Scan),过滤掉 98% 的行 -- 🔴 问题 2:order_items 表全表扫描 -- 🟡 问题 3:排序操作在内存中进行,数据量大时可能溢出到磁盘 -- ✅ 优化方案 1:创建复合索引(覆盖 WHERE + ORDER BY) CREATE INDEX CONCURRENTLY idx_orders_user_status_created ON orders (user_id, status, created_at DESC); -- 效果:将 orders 表的 Seq Scan 变为 Index Scan -- 复合索引列顺序:等值条件在前,范围条件在后 -- ✅ 优化方案 2:为 order_items 创建外键索引 CREATE INDEX CONCURRENTLY idx_order_items_order_id ON order_items (order_id); -- 效果:将 order_items 的 Seq Scan 变为 Index Scan -- ✅ 优化方案 3:覆盖索引(避免回表) CREATE INDEX CONCURRENTLY idx_orders_user_status_created_covering ON orders (user_id, status, created_at DESC) INCLUDE (id, total_amount); -- 效果:Index Only Scan,无需回表读取数据页 -- 优化后的 EXPLAIN ANALYZE 预期: -- Limit (cost=12.34..56.78 rows=20 width=128) (actual time=2.345..2.567 rows=20 loops=1) -- -> Nested Loop (cost=12.34..1234.56 rows=267 width=128) (actual time=2.340..2.558 rows=20 loops=1) -- -> Index Only Scan using idx_orders_user_status_created_covering on orders o -- (cost=0.42..45.67 rows=267 width=64) (actual time=0.034..0.123 rows=20 loops=1) -- Index Cond: ((user_id = '550e8400...'::uuid) AND (status = 'completed') AND ...) -- Heap Fetches: 0 -- -> Index Scan using idx_order_items_order_id on order_items oi -- (cost=0.42..4.56 rows=3 width=64) (actual time=0.008..0.012 rows=3 loops=20) -- Index Cond: (order_id = o.id) -- Execution Time: 2.678 ms (从 892ms 优化到 2.7ms,提升 330 倍!)

8.3 索引优化提示词模板

提示词模板:分析 EXPLAIN ANALYZE 并建议索引

你是一位 PostgreSQL 性能调优专家。请分析以下 EXPLAIN ANALYZE 输出, 识别性能瓶颈并提供索引优化建议。 ## 查询 SQL [粘贴原始 SQL 查询] ## EXPLAIN ANALYZE 输出 [粘贴完整的 EXPLAIN ANALYZE 输出] ## 表结构和数据规模 - [表名]:约 [行数] 行,[大小] GB - 现有索引:[列出现有索引] ## 分析要求 1. 逐行解读执行计划,标注性能瓶颈 2. 识别全表扫描(Seq Scan)和低效操作 3. 计算实际过滤比例(Rows Removed by Filter / 总行数) 4. 评估排序和 JOIN 操作的效率 ## 优化建议要求 1. 提供具体的 CREATE INDEX 语句 2. 使用 CONCURRENTLY 避免锁表 3. 考虑复合索引的列顺序(等值在前,范围在后) 4. 评估覆盖索引(INCLUDE)的适用性 5. 评估部分索引(WHERE)的适用性 6. 估算索引大小和写入性能影响 7. 提供优化后的预期执行计划 ## 输出格式 - 🔴 严重瓶颈(必须优化) - 🟡 潜在问题(建议优化) - 🟢 可接受 - 每个建议包含 SQL 语句和预期效果

9. N+1 查询检测与修复全攻略

9.1 N+1 问题的本质

N+1 查询是 ORM 中最常见也最隐蔽的性能问题。它的本质是:获取 N 条主记录后,对每条记录分别执行 1 次关联查询,总共产生 N+1 次数据库查询。

┌─────────────────────────────────────────────────────────────────┐ │ N+1 查询问题图解 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 请求:获取 20 篇文章及其作者信息 │ │ │ │ ❌ N+1 模式(21 次查询): │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Query 1: SELECT * FROM posts LIMIT 20 │ │ │ │ Query 2: SELECT * FROM users WHERE id = 'author_1' │ │ │ │ Query 3: SELECT * FROM users WHERE id = 'author_2' │ │ │ │ Query 4: SELECT * FROM users WHERE id = 'author_3' │ │ │ │ ... │ │ │ │ Query 21: SELECT * FROM users WHERE id = 'author_20'│ │ │ └──────────────────────────────────────────────────────┘ │ │ 总耗时:20ms + 20 × 5ms = 120ms │ │ │ │ ✅ 优化后(1-2 次查询): │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ 方案 A - JOIN(1 次查询): │ │ │ │ SELECT p.*, u.name FROM posts p │ │ │ │ JOIN users u ON p.author_id = u.id LIMIT 20 │ │ │ │ │ │ │ │ 方案 B - Batch Load(2 次查询): │ │ │ │ Query 1: SELECT * FROM posts LIMIT 20 │ │ │ │ Query 2: SELECT * FROM users WHERE id IN (...) │ │ │ └──────────────────────────────────────────────────────┘ │ │ 总耗时:25ms(方案 A)或 30ms(方案 B) │ │ │ └─────────────────────────────────────────────────────────────────┘

9.2 各 ORM 的 N+1 检测与修复速查表

ORMN+1 检测方式预加载方案批量加载方案推荐工具
PrismaPrisma Optimize 自动检测include: { author: true }Prisma 自动批量Prisma Optimize
Drizzle手动审查 / 日志分析with: { author: true }(关系 API)innerJoin()查询日志
SQLAlchemySQLAlchemy 日志 / echo=Truejoinedload() / selectinload()subqueryload()pganalyze
TypeORMTypeORM 日志 / logging: truerelations: { author: true }QueryBuilder JOINDatadog DBM
Diesel编译时关联检查inner_join() / left_join()belonging_to() 批量编译器
DjangoDjango Debug Toolbar / Silkselect_related()prefetch_related()django-silk

9.3 Django ORM 的 N+1 检测与修复

Django ORM 是 N+1 问题的”重灾区”,因为默认使用 lazy loading。以下是完整的检测和修复方案:

# Django Models from django.db import models class Author(models.Model): name = models.CharField(max_length=100) email = models.EmailField(unique=True) class Meta: db_table = 'authors' class Post(models.Model): title = models.CharField(max_length=255) content = models.TextField() author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name='posts') published = models.BooleanField(default=False) published_at = models.DateTimeField(null=True) tags = models.ManyToManyField('Tag', related_name='posts') class Meta: db_table = 'posts' class Tag(models.Model): name = models.CharField(max_length=50, unique=True) class Meta: db_table = 'tags' class Comment(models.Model): content = models.TextField() author = models.ForeignKey(Author, on_delete=models.CASCADE) post = models.ForeignKey(Post, on_delete=models.CASCADE, related_name='comments') created_at = models.DateTimeField(auto_now_add=True) class Meta: db_table = 'comments'
# ❌ N+1 问题示例 def get_posts_bad(request): posts = Post.objects.filter(published=True)[:20] result = [] for post in posts: result.append({ 'title': post.title, 'author': post.author.name, # N+1:每次访问触发查询 'tags': [t.name for t in post.tags.all()], # N+1:每次访问触发查询 'comment_count': post.comments.count(), # N+1:每次访问触发查询 }) return result # 总查询:1 + 20 + 20 + 20 = 61 次! # ✅ 修复方案:select_related + prefetch_related + annotate from django.db.models import Count, Prefetch def get_posts_good(request): posts = ( Post.objects .filter(published=True) .select_related('author') # FK 关系:用 JOIN(1 次查询) .prefetch_related('tags') # M2M 关系:用 SELECT IN(1 次额外查询) .prefetch_related( # 自定义 prefetch Prefetch( 'comments', queryset=Comment.objects.select_related('author'), ) ) .annotate(comment_count=Count('comments')) # 聚合:在 SQL 层计算 .order_by('-published_at')[:20] ) result = [] for post in posts: result.append({ 'title': post.title, 'author': post.author.name, # 无额外查询(已 JOIN) 'tags': [t.name for t in post.tags.all()], # 无额外查询(已 prefetch) 'comment_count': post.comment_count, # 无额外查询(已 annotate) }) return result # 总查询:3 次(posts JOIN authors + tags IN + comments IN)
┌─────────────────────────────────────────────────────────────────┐ │ Django 预加载策略选择决策树 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 关系类型? │ │ ├── ForeignKey / OneToOne(多对一/一对一) │ │ │ └── ✅ 使用 select_related() │ │ │ • 生成 SQL JOIN │ │ │ • 单次查询完成 │ │ │ • 适合:author, category, profile │ │ │ │ │ ├── ManyToMany / reverse ForeignKey(多对多/一对多) │ │ │ └── ✅ 使用 prefetch_related() │ │ │ • 生成 SELECT ... WHERE id IN (...) │ │ │ • 两次查询完成 │ │ │ • 适合:tags, comments, posts │ │ │ │ │ └── 需要过滤/排序关联数据? │ │ └── ✅ 使用 Prefetch() 对象 │ │ • 自定义 queryset │ │ • 可以 filter/order_by/select_related │ │ • 适合:最近 5 条评论、已发布的文章 │ │ │ └─────────────────────────────────────────────────────────────────┘

9.4 N+1 检测提示词模板

你是一位 ORM 性能专家。请审查以下代码中的 N+1 查询问题。 ## ORM 框架 [Prisma / Drizzle / SQLAlchemy / TypeORM / Diesel / Django] ## 代码 [粘贴需要审查的代码] ## 数据模型关系 [描述模型之间的关系:一对一、一对多、多对多] ## 审查要求 1. 识别所有 N+1 查询点(标注具体代码行) 2. 计算当前代码的总查询次数(假设主查询返回 N 条记录) 3. 对每个 N+1 问题提供修复方案: - 使用该 ORM 的预加载/JOIN 语法 - 修复后的完整代码 - 修复后的预期查询次数 4. 如果有些关联数据不需要预加载(如只在条件分支中使用),请说明 ## 输出格式 - 🔴 N+1 问题(标注行号和影响) - ✅ 修复代码 - 📊 优化前后查询次数对比

10. 慢查询分析与 EXPLAIN ANALYZE 深度解读

10.1 慢查询识别流程

┌─────────────────────────────────────────────────────────────────┐ │ 慢查询识别与分析流程 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ① 启用慢查询日志 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ PostgreSQL: │ │ │ │ log_min_duration_statement = 100 -- 记录 >100ms │ │ │ │ log_statement = 'none' │ │ │ │ log_duration = off │ │ │ │ │ │ │ │ MySQL: │ │ │ │ slow_query_log = ON │ │ │ │ long_query_time = 0.1 -- 记录 >100ms │ │ │ │ log_queries_not_using_indexes = ON │ │ │ └──────────────────────┬──────────────────────────────┘ │ │ ▼ │ │ ② 收集慢查询 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ • pg_stat_statements(PostgreSQL 扩展) │ │ │ │ • performance_schema(MySQL) │ │ │ │ • pganalyze / Datadog DBM / PlanetScale Insights │ │ │ │ • ORM 查询日志(Prisma logging / SQLAlchemy echo) │ │ │ └──────────────────────┬──────────────────────────────┘ │ │ ▼ │ │ ③ 分析执行计划 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) <query> │ │ │ │ • 识别 Seq Scan(全表扫描) │ │ │ │ • 识别 Nested Loop(嵌套循环 JOIN) │ │ │ │ • 识别 Sort(内存/磁盘排序) │ │ │ │ • 识别 Hash Join vs Merge Join │ │ │ └──────────────────────┬──────────────────────────────┘ │ │ ▼ │ │ ④ AI 辅助优化 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 将 EXPLAIN 输出提供给 AI 编码助手: │ │ │ │ • 索引建议 │ │ │ │ • 查询重写 │ │ │ │ • ORM 代码优化 │ │ │ │ • 连接池调优 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

10.2 PostgreSQL EXPLAIN 关键节点解读

节点类型含义性能影响优化方向
Seq Scan全表扫描🔴 大表时极慢添加索引
Index Scan索引扫描 + 回表🟢 高效考虑覆盖索引
Index Only Scan仅索引扫描(无回表)🟢 最高效理想状态
Bitmap Index Scan位图索引扫描🟡 中等多条件 OR 查询
Nested Loop嵌套循环 JOIN🟡 小数据集高效大数据集考虑 Hash Join
Hash Join哈希 JOIN🟢 大数据集高效需要足够 work_mem
Merge Join归并 JOIN🟢 已排序数据高效需要索引排序
Sort排序操作🟡 可能溢出磁盘添加排序索引
HashAggregate哈希聚合🟡 需要内存调整 work_mem
Materialize物化子查询🟡 缓存中间结果考虑 CTE 或临时表

10.3 EXPLAIN ANALYZE 实战分析示例

-- 场景:博客平台首页查询(获取热门文章 + 作者 + 评论数 + 标签) EXPLAIN (ANALYZE, BUFFERS, COSTS, FORMAT TEXT) SELECT p.id, p.title, p.slug, p.published_at, u.name AS author_name, u.avatar_url, c.name AS category_name, comment_stats.comment_count, ARRAY_AGG(DISTINCT t.name) AS tag_names FROM posts p INNER JOIN users u ON p.author_id = u.id LEFT JOIN categories c ON p.category_id = c.id LEFT JOIN posts_tags pt ON pt.post_id = p.id LEFT JOIN tags t ON t.id = pt.tag_id LEFT JOIN LATERAL ( SELECT COUNT(*) AS comment_count FROM comments cm WHERE cm.post_id = p.id ) comment_stats ON true WHERE p.published = true AND p.published_at >= NOW() - INTERVAL '30 days' GROUP BY p.id, p.title, p.slug, p.published_at, u.name, u.avatar_url, c.name, comment_stats.comment_count ORDER BY comment_stats.comment_count DESC, p.published_at DESC LIMIT 20;

AI 辅助分析输出示例

## EXPLAIN ANALYZE 分析报告 ### 🔴 严重问题 1. **posts 表 Seq Scan**(第 12 行) - 扫描 100,000 行,过滤后仅保留 2,345 行(97.7% 被丢弃) - 原因:缺少 (published, published_at) 复合索引 - 修复: CREATE INDEX CONCURRENTLY idx_posts_published_date ON posts (published, published_at DESC) WHERE published = true; 2. **comments 子查询 Seq Scan**(第 18 行) - 对每篇文章都执行全表扫描计算评论数 - 原因:LATERAL 子查询 + 缺少 post_id 索引 - 修复: CREATE INDEX CONCURRENTLY idx_comments_post_id ON comments (post_id); ### 🟡 潜在问题 3. **Sort 操作使用磁盘**(第 5 行) - Sort Method: external merge Disk: 4096kB - 原因:work_mem 不足,排序溢出到磁盘 - 修复:SET work_mem = '16MB';(会话级别) - 或在索引中包含排序列 4. **ARRAY_AGG 聚合**(第 3 行) - 对每行都执行数组聚合 - 建议:如果标签数量固定,考虑在应用层处理 ### 🟢 可接受 5. **users 表 Index Scan**(第 8 行) - 使用主键索引,效率良好 ### 优化后预期 - 执行时间:从 ~450ms 降至 ~15ms - 查询次数:保持 1 次

10.4 慢查询分析提示词模板

你是一位数据库性能调优专家。请帮我分析以下慢查询并提供优化方案。 ## 数据库类型 [PostgreSQL / MySQL] ## 慢查询 SQL [粘贴完整的 SQL 查询] ## EXPLAIN ANALYZE 输出 [粘贴完整的 EXPLAIN ANALYZE 输出] ## 表结构 [粘贴相关表的 CREATE TABLE 语句] ## 现有索引 [粘贴 \di 或 SHOW INDEX 的输出] ## 数据规模 - [表名]:[行数] 行 - 日均查询频率:[次数] - 当前平均执行时间:[毫秒] - 目标执行时间:[毫秒] ## 分析要求 1. 逐节点解读执行计划 2. 识别所有性能瓶颈(Seq Scan、磁盘排序、低效 JOIN) 3. 提供索引优化建议(含完整 CREATE INDEX 语句) 4. 提供查询重写建议(如果 SQL 本身可以优化) 5. 评估 ORM 层面的优化可能性 6. 估算优化后的执行时间 ## 输出格式 按严重程度排序:🔴 严重 → 🟡 中等 → 🟢 轻微 每个问题包含:问题描述、根因分析、修复方案、预期效果

11. 连接池优化

11.1 连接池基础概念

数据库连接是昂贵的资源——每个连接需要 TCP 握手、认证、内存分配。连接池通过复用连接来减少开销。

┌─────────────────────────────────────────────────────────────────┐ │ 连接池工作原理 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 应用服务器 连接池 数据库 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 请求 1 │──获取连接──→ │ 连接 A ✅ │──复用──→ │ │ │ │ │ 请求 2 │──获取连接──→ │ 连接 B ✅ │──复用──→ │ PostgreSQL│ │ │ │ 请求 3 │──等待... │ 连接 C ✅ │──复用──→ │ │ │ │ │ 请求 4 │──等待... │ (空闲池) │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ 关键参数: │ │ • min_connections:最小连接数(预热) │ │ • max_connections:最大连接数(上限) │ │ • idle_timeout:空闲连接超时(回收) │ │ • connection_timeout:获取连接超时(排队等待) │ │ • max_lifetime:连接最大生命周期(防止泄漏) │ │ │ └─────────────────────────────────────────────────────────────────┘

11.2 各 ORM 连接池配置

Prisma 连接池

// schema.prisma datasource db { provider = "postgresql" url = env("DATABASE_URL") // Prisma 内置连接池参数通过 URL 配置 // postgresql://user:pass@host:5432/db?connection_limit=20&pool_timeout=10 }
// 或使用 Prisma Accelerate(全球边缘连接池) // schema.prisma datasource db { provider = "postgresql" url = env("DATABASE_URL") directUrl = env("DIRECT_DATABASE_URL") // 直连(迁移用) } // Prisma Accelerate 自动管理连接池 // 价格:$0.20/100K 操作(Starter)

Drizzle 连接池(使用 node-postgres)

// src/db/index.ts import { drizzle } from 'drizzle-orm/node-postgres'; import { Pool } from 'pg'; import * as schema from './schema'; const pool = new Pool({ connectionString: process.env.DATABASE_URL, max: 20, // 最大连接数 min: 5, // 最小连接数 idleTimeoutMillis: 30000, // 空闲超时 30s connectionTimeoutMillis: 5000, // 连接超时 5s maxUses: 7500, // 每个连接最大使用次数 }); // 监控连接池状态 pool.on('connect', () => console.log('New connection created')); pool.on('remove', () => console.log('Connection removed')); pool.on('error', (err) => console.error('Pool error:', err)); export const db = drizzle(pool, { schema }); // 健康检查 export async function checkDbHealth() { const client = await pool.connect(); try { await client.query('SELECT 1'); return { healthy: true, totalCount: pool.totalCount, idleCount: pool.idleCount, waitingCount: pool.waitingCount, }; } finally { client.release(); } }

SQLAlchemy 连接池(异步)

# app/db.py from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost:5432/mydb", pool_size=20, # 连接池大小 max_overflow=10, # 超出 pool_size 的最大溢出连接数 pool_timeout=30, # 获取连接超时(秒) pool_recycle=3600, # 连接回收时间(秒,防止数据库端超时) pool_pre_ping=True, # 使用前 ping 检查连接是否存活 echo=False, # 生产环境关闭 SQL 日志 ) async_session = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, ) # 依赖注入(FastAPI) async def get_session() -> AsyncSession: async with async_session() as session: try: yield session await session.commit() except Exception: await session.rollback() raise

外部连接池:PgBouncer

对于高并发场景,推荐在应用和数据库之间部署 PgBouncer 作为外部连接池:

# pgbouncer.ini [databases] mydb = host=localhost port=5432 dbname=mydb [pgbouncer] listen_addr = 0.0.0.0 listen_port = 6432 auth_type = md5 auth_file = /etc/pgbouncer/userlist.txt # 连接池模式 pool_mode = transaction # 事务级复用(推荐) # pool_mode = session # 会话级复用 # pool_mode = statement # 语句级复用(限制最多) # 连接池参数 default_pool_size = 25 # 每个用户/数据库对的默认连接数 min_pool_size = 5 # 最小连接数 max_client_conn = 200 # 最大客户端连接数 max_db_connections = 50 # 最大数据库连接数 # 超时设置 server_idle_timeout = 600 # 服务端空闲超时 client_idle_timeout = 0 # 客户端空闲超时(0=不限) query_timeout = 30 # 查询超时

11.3 连接池参数调优指南

参数推荐值计算公式说明
max_connections20-50CPU 核数 × 2 + 磁盘数PostgreSQL 官方建议
pool_size10-25max_connections / 应用实例数每个应用实例的连接数
max_overflow5-10pool_size × 0.5突发流量缓冲
idle_timeout30-300s根据流量模式低流量可设短,高流量设长
connection_timeout5-10s根据 SLA超时后返回错误而非无限等待
max_lifetime1800-3600s根据数据库配置防止连接泄漏

连接池优化提示词模板

你是一位数据库连接池调优专家。请根据以下信息优化连接池配置。 ## 应用信息 - 框架:[Next.js / FastAPI / NestJS / Actix-web] - ORM:[Prisma / Drizzle / SQLAlchemy / TypeORM / Diesel] - 部署方式:[单实例 / 多实例(N 个)/ Serverless] - 数据库:[PostgreSQL / MySQL],最大连接数:[数量] ## 流量特征 - 日均请求量:[数量] - 峰值 QPS:[数量] - 平均查询耗时:[毫秒] - 长查询比例:[百分比] ## 当前配置 [粘贴当前连接池配置] ## 当前问题 [描述遇到的问题:连接超时、连接耗尽、空闲连接过多等] ## 优化要求 1. 提供优化后的连接池参数 2. 解释每个参数的选择理由 3. 是否需要外部连接池(PgBouncer) 4. Serverless 场景的特殊处理 5. 监控指标和告警阈值建议

12. 查询重写与 AI 辅助优化模式

12.1 常见查询重写模式

模式 1:子查询 → JOIN

-- ❌ 低效:相关子查询(每行执行一次子查询) SELECT p.id, p.title, (SELECT COUNT(*) FROM comments c WHERE c.post_id = p.id) AS comment_count FROM posts p WHERE p.published = true; -- ✅ 优化:LEFT JOIN + GROUP BY SELECT p.id, p.title, COUNT(c.id) AS comment_count FROM posts p LEFT JOIN comments c ON c.post_id = p.id WHERE p.published = true GROUP BY p.id, p.title; -- ✅ 更优:使用窗口函数(如果需要保留所有列) SELECT p.*, COUNT(c.id) OVER (PARTITION BY p.id) AS comment_count FROM posts p LEFT JOIN comments c ON c.post_id = p.id WHERE p.published = true;

模式 2:EXISTS 替代 IN

-- ❌ 低效:IN 子查询(大数据集时性能差) SELECT * FROM users WHERE id IN ( SELECT author_id FROM posts WHERE published = true AND published_at >= '2025-01-01' ); -- ✅ 优化:EXISTS(短路求值,找到即停止) SELECT * FROM users u WHERE EXISTS ( SELECT 1 FROM posts p WHERE p.author_id = u.id AND p.published = true AND p.published_at >= '2025-01-01' );

模式 3:分页优化(Keyset Pagination)

-- ❌ 低效:OFFSET 分页(深页性能差) SELECT * FROM posts WHERE published = true ORDER BY published_at DESC LIMIT 20 OFFSET 10000; -- OFFSET 10000 需要扫描并丢弃 10000 行! -- ✅ 优化:Keyset 分页(游标分页) SELECT * FROM posts WHERE published = true AND (published_at, id) < ('2025-06-01T12:00:00', 'last-seen-id') ORDER BY published_at DESC, id DESC LIMIT 20; -- 直接从上次位置开始,无需扫描前面的行

模式 4:批量操作优化

-- ❌ 低效:逐条插入 INSERT INTO tags (name) VALUES ('rust'); INSERT INTO tags (name) VALUES ('python'); INSERT INTO tags (name) VALUES ('typescript'); -- 3 次网络往返 -- ✅ 优化:批量插入 INSERT INTO tags (name) VALUES ('rust'), ('python'), ('typescript'); -- 1 次网络往返 -- ✅ 更优:COPY 命令(PostgreSQL,大批量数据) COPY tags (name) FROM STDIN WITH (FORMAT csv); rust python typescript \.

模式 5:条件聚合替代多次查询

-- ❌ 低效:多次查询获取不同状态的计数 SELECT COUNT(*) FROM orders WHERE status = 'pending'; SELECT COUNT(*) FROM orders WHERE status = 'completed'; SELECT COUNT(*) FROM orders WHERE status = 'cancelled'; -- 3 次查询 -- ✅ 优化:条件聚合(1 次查询) SELECT COUNT(*) FILTER (WHERE status = 'pending') AS pending_count, COUNT(*) FILTER (WHERE status = 'completed') AS completed_count, COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled_count FROM orders; -- PostgreSQL 语法 -- MySQL 等效写法 SELECT SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) AS pending_count, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) AS completed_count, SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) AS cancelled_count FROM orders;

12.2 ORM 层面的查询重写

// Prisma:使用 groupBy 替代多次 count // ❌ 低效:3 次查询 const pending = await prisma.order.count({ where: { status: 'PENDING' } }); const completed = await prisma.order.count({ where: { status: 'COMPLETED' } }); const cancelled = await prisma.order.count({ where: { status: 'CANCELLED' } }); // ✅ 优化:1 次查询 const stats = await prisma.order.groupBy({ by: ['status'], _count: { id: true }, }); // 返回:[{ status: 'PENDING', _count: { id: 150 } }, ...]
# SQLAlchemy:使用 case 表达式替代多次查询 from sqlalchemy import select, func, case # ❌ 低效:3 次查询 pending = await session.scalar( select(func.count()).where(Order.status == 'pending') ) completed = await session.scalar( select(func.count()).where(Order.status == 'completed') ) # ✅ 优化:1 次查询 stmt = select( func.count().filter(Order.status == 'pending').label('pending'), func.count().filter(Order.status == 'completed').label('completed'), func.count().filter(Order.status == 'cancelled').label('cancelled'), ) result = await session.execute(stmt) stats = result.one()

13. Kiro Hooks 自动化查询审查

13.1 配置 Kiro Hooks 自动检测查询问题

Kiro 的 Hooks 系统可以在开发过程中自动检测 ORM 查询的常见问题:

Hook:检测 N+1 查询模式

# .kiro/hooks/detect-n-plus-one.yaml name: "N+1 查询检测" description: "检测 ORM 代码中的 N+1 查询模式" trigger: event: fileEdited patterns: - "**/*.repository.ts" - "**/*.service.ts" - "**/repositories/**/*.ts" - "**/repositories/**/*.py" action: type: askAgent prompt: | 请审查修改的文件中是否存在 N+1 查询问题。 检查以下模式: 1. 循环中调用 ORM 查询(for/map/forEach 中的 findUnique/findFirst) 2. Promise.all 中的重复查询 3. 缺少 include/select/joinedload 的关联数据访问 4. Django 中缺少 select_related/prefetch_related 如果发现问题,提供具体的修复建议。 如果没有问题,简短确认即可。

Hook:审查新增索引

# .kiro/hooks/review-index-changes.yaml name: "索引变更审查" description: "审查迁移文件中的索引变更" trigger: event: fileCreated patterns: - "**/migrations/**/*.sql" - "**/migrations/**/*.ts" - "**/migrations/**/*.py" action: type: askAgent prompt: | 请审查新创建的迁移文件中的索引变更。 检查以下问题: 1. CREATE INDEX 是否使用了 CONCURRENTLY(PostgreSQL)? 2. 复合索引的列顺序是否合理(等值条件在前,范围条件在后)? 3. 是否有冗余索引(新索引是否被现有索引覆盖)? 4. 索引是否会显著影响写入性能? 5. 是否考虑了部分索引(WHERE 子句)的适用性? 提供具体的改进建议。

Hook:Prisma Schema 变更审查

# .kiro/hooks/prisma-schema-review.yaml name: "Prisma Schema 审查" description: "审查 Prisma Schema 变更的查询性能影响" trigger: event: fileEdited patterns: - "**/schema.prisma" action: type: askAgent prompt: | 请审查 Prisma Schema 的变更,关注查询性能影响。 检查以下问题: 1. 新增的关系是否需要索引? 2. 是否缺少常用查询字段的索引(@@index)? 3. 多对多关系是否使用了隐式关系表(性能影响)? 4. 是否有不必要的 @unique 约束(每个 unique 都会创建索引)? 5. 枚举类型是否合理(vs 关联表)? 提供具体的优化建议。

14. ORM 模型代码生成提示词模板库

14.1 通用 ORM 模型生成提示词

你是一位 [Prisma/Drizzle/SQLAlchemy/TypeORM/Diesel] 专家。 请根据以下业务需求生成完整的 ORM 模型代码。 ## 业务需求 [描述业务实体和关系] ## 技术要求 - 数据库:[PostgreSQL/MySQL/SQLite] - ORM:[具体 ORM 名称和版本] - 语言:[TypeScript/Python/Rust] ## 代码规范 1. 使用 UUID 作为主键 2. 所有表包含 created_at 和 updated_at 时间戳 3. 字符串字段指定合理的长度限制 4. 外键关系指定 ON DELETE 行为 5. 为常用查询字段添加索引 6. 使用 snake_case 数据库列名,camelCase 代码属性名 7. 包含完整的类型定义 ## 输出要求 1. 完整的模型/Schema 定义代码 2. 关系定义(一对一、一对多、多对多) 3. 建议的索引列表及理由 4. 基础 CRUD 操作示例 5. 常见查询模式示例(分页、搜索、关联查询)

14.2 查询优化审查提示词

你是一位数据库查询优化专家。请审查以下 ORM 查询代码的性能。 ## ORM 框架 [Prisma/Drizzle/SQLAlchemy/TypeORM/Diesel/Django] ## 代码 [粘贴需要审查的查询代码] ## 数据模型 [粘贴 Schema/Model 定义] ## 数据规模 [描述各表的数据量] ## 审查维度 1. **N+1 检测**:是否存在循环查询? 2. **索引使用**:查询条件是否有索引支持? 3. **数据获取**:是否获取了不必要的字段? 4. **分页**:大列表是否有分页? 5. **事务**:写操作是否使用了事务? 6. **并发**:是否有竞态条件风险? 7. **缓存**:是否有适合缓存的查询? ## 输出 - 问题列表(按严重程度排序) - 每个问题的修复代码 - 优化前后的查询次数对比 - 建议的索引变更

14.3 从 SQL 到 ORM 代码转换提示词

你是一位 ORM 专家。请将以下 SQL 查询转换为 [ORM 名称] 代码。 ## 原始 SQL [粘贴 SQL 查询] ## 目标 ORM [Prisma/Drizzle/SQLAlchemy/TypeORM/Diesel] ## 已有模型定义 [粘贴现有的 ORM 模型/Schema] ## 转换要求 1. 保持查询语义完全一致 2. 利用 ORM 的类型安全特性 3. 使用 ORM 的关系查询 API(而非原生 SQL) 4. 如果 ORM 无法表达某些操作,使用原生查询并说明原因 5. 确保转换后的查询性能不低于原始 SQL ## 输出 1. 转换后的 ORM 代码 2. 类型定义(如果需要) 3. 性能对比说明 4. 无法用 ORM 表达的部分(如果有)

实战案例:电商平台查询优化全流程

案例背景

一个中型电商平台(日活 5 万用户,商品 10 万+,订单 500 万+)使用 Prisma + PostgreSQL,首页加载时间从 200ms 恶化到 2.5s。

步骤 1:使用 Prisma Optimize 识别问题

// 集成 Prisma Optimize import { PrismaClient } from '@prisma/client'; import { withOptimize } from '@prisma/extension-optimize'; const prisma = new PrismaClient().$extends( withOptimize({ apiKey: process.env.PRISMA_OPTIMIZE_KEY! }) ); // Prisma Optimize 报告发现以下问题: // 🔴 N+1 查询:首页商品列表查询触发 50+ 次额外查询 // 🔴 缺失索引:products 表的 category_id + is_active 缺少复合索引 // 🟡 全表扫描:热门商品查询扫描全表 // 🟡 不必要的数据获取:返回了商品的完整描述(TEXT 字段)

步骤 2:修复 N+1 查询

// ❌ 原始代码(首页商品列表) async function getHomepageProducts() { const products = await prisma.product.findMany({ where: { isActive: true }, take: 50, orderBy: { salesCount: 'desc' }, }); // N+1:每个商品单独查询分类、图片、评价统计 return Promise.all(products.map(async (product) => ({ ...product, category: await prisma.category.findUnique({ where: { id: product.categoryId }, }), images: await prisma.productImage.findMany({ where: { productId: product.id }, take: 3, }), rating: await prisma.review.aggregate({ where: { productId: product.id }, _avg: { rating: true }, _count: true, }), }))); } // 总查询:1 + 50 + 50 + 50 = 151 次! // ✅ 优化后 async function getHomepageProductsOptimized() { return prisma.product.findMany({ where: { isActive: true }, take: 50, orderBy: { salesCount: 'desc' }, select: { id: true, name: true, slug: true, price: true, originalPrice: true, salesCount: true, // 不返回 description(大 TEXT 字段) category: { select: { id: true, name: true, slug: true }, }, images: { select: { url: true, alt: true }, take: 3, orderBy: { sortOrder: 'asc' }, }, _count: { select: { reviews: true }, }, }, }); } // 总查询:Prisma 自动优化为 2-3 次批量查询

步骤 3:添加索引

-- 基于 Prisma Optimize 和 EXPLAIN ANALYZE 的建议 -- 1. 商品列表查询索引 CREATE INDEX CONCURRENTLY idx_products_active_sales ON products (is_active, sales_count DESC) WHERE is_active = true; -- 2. 分类筛选索引 CREATE INDEX CONCURRENTLY idx_products_category_active ON products (category_id, is_active) WHERE is_active = true; -- 3. 商品图片查询索引 CREATE INDEX CONCURRENTLY idx_product_images_product_sort ON product_images (product_id, sort_order); -- 4. 评价统计索引 CREATE INDEX CONCURRENTLY idx_reviews_product_rating ON reviews (product_id, rating);

步骤 4:验证优化效果

-- 优化前 EXPLAIN ANALYZE SELECT ... -- Execution Time: 2,345.678 ms -- 优化后 EXPLAIN ANALYZE SELECT ... -- Execution Time: 12.345 ms -- 性能提升:190 倍!

案例分析

指标优化前优化后提升
数据库查询次数151 次3 次50x
平均查询耗时2,345ms12ms190x
首页加载时间2.5s180ms14x
数据库 CPU 使用率85%15%5.7x
网络传输量4.2MB0.3MB14x

关键决策点:

  1. 使用 Prisma Optimize 快速定位问题,而非手动分析日志
  2. 优先修复 N+1 查询(影响最大),再优化索引
  3. 使用 select 精确控制返回字段,减少网络传输
  4. 使用部分索引(WHERE is_active = true)减少索引大小
  5. 使用 CONCURRENTLY 创建索引,避免锁表影响线上服务

避坑指南

❌ 常见错误

  1. 盲目添加索引

    • 问题:为每个查询条件都创建索引,导致写入性能下降 50%+,存储空间翻倍
    • 正确做法:基于 EXPLAIN ANALYZE 和实际查询频率决定索引策略,定期清理未使用的索引(pg_stat_user_indexesidx_scan = 0 的索引)
  2. 忽略 N+1 查询

    • 问题:开发环境数据少时感知不到,上线后数据量增长导致性能急剧下降
    • 正确做法:在开发阶段就启用查询日志或 Prisma Optimize,CI 中集成 N+1 检测
  3. 在生产环境使用 ORM 的 synchronize/auto-migrate

    • 问题:TypeORM 的 synchronize: true 或 Django 的 migrate 在启动时自动修改 Schema,可能导致数据丢失
    • 正确做法:生产环境只使用版本化的迁移文件,禁用自动同步
  4. 过度使用原生 SQL

    • 问题:绕过 ORM 的类型安全和参数化查询,引入 SQL 注入风险
    • 正确做法:优先使用 ORM 查询 API,只在 ORM 无法表达时使用参数化的原生查询
  5. 连接池配置不当

    • 问题:连接数设置过高导致数据库内存耗尽,或设置过低导致请求排队超时
    • 正确做法:根据公式 CPU 核数 × 2 + 磁盘数 设置数据库最大连接数,应用连接池大小 = 最大连接数 / 应用实例数
  6. 忽略查询返回的数据量

    • 问题:使用 SELECT * 或 ORM 默认返回所有字段,包括大 TEXT/BLOB 字段
    • 正确做法:使用 select(Prisma)、load_only(SQLAlchemy)、columns(Drizzle)精确控制返回字段
  7. OFFSET 分页在深页时性能崩溃

    • 问题:OFFSET 100000 需要扫描并丢弃 10 万行数据
    • 正确做法:使用 Keyset 分页(游标分页),基于上一页最后一条记录的排序键定位
  8. AI 生成的查询未经审查直接上线

    • 问题:AI 可能生成看似正确但性能极差的查询(如不必要的子查询、错误的 JOIN 顺序)
    • 正确做法:所有 AI 生成的查询必须经过 EXPLAIN ANALYZE 验证,在测试环境用生产级数据量测试

✅ 最佳实践

  1. 开发阶段就启用查询监控:集成 Prisma Optimize、Django Debug Toolbar 或 SQLAlchemy echo,在开发时就发现性能问题
  2. 建立查询性能基线:记录关键查询的 P50/P95/P99 延迟,设置告警阈值
  3. 索引策略文档化:在 Schema 注释或专门文档中记录每个索引的用途和对应的查询
  4. 定期审查慢查询日志:每周审查 Top 10 慢查询,持续优化
  5. 使用 AI 辅助但人工审查:让 AI 生成初始查询和索引建议,但必须经过 EXPLAIN ANALYZE 验证
  6. 连接池监控:监控连接池使用率、等待队列长度、连接创建/销毁频率
  7. Keyset 分页作为默认:除非业务需要跳页,否则默认使用 Keyset 分页

相关资源与延伸阅读

  1. Prisma Optimize 官方文档  — Prisma AI 驱动查询优化工具的完整使用指南
  2. Drizzle ORM 官方文档  — Drizzle 查询 API 和性能优化最佳实践
  3. SQLAlchemy 2.0 文档 - Loading Relationships  — SQLAlchemy 关系加载策略详解
  4. pganalyze Query Advisor  — PostgreSQL AI 驱动的查询优化顾问
  5. PlanetScale AI Index Suggestions  — AI 驱动的 PostgreSQL 索引建议功能
  6. AI2sql 查询优化指南  — AI 辅助 SQL 查询优化完整指南
  7. Diesel ORM 官方文档  — Rust Diesel ORM 查询构建和性能指南
  8. Use The Index, Luke  — SQL 索引设计经典教程,覆盖 B-Tree 原理和优化策略
  9. TypeORM QueryBuilder 文档  — TypeORM 高级查询构建器使用指南
  10. PostgreSQL EXPLAIN 可视化工具 - explain.dalibo.com  — 在线 EXPLAIN ANALYZE 可视化分析工具

参考来源


📖 返回 总览与导航 | 上一节:29c-迁移文件生成与审查 | 下一节:29e-数据库Prompt模板与反模式

Last updated on