29d - ORM与查询优化
本文是《AI Agent 实战手册》第 29 章第 4 节。 上一节:29c-迁移文件生成与审查 | 下一节:29e-数据库Prompt模板与反模式
概述
ORM(Object-Relational Mapping)是现代应用开发中连接业务逻辑与数据库的核心桥梁,而查询性能则直接决定用户体验和系统可扩展性。2025-2026 年,AI 编码助手(Claude Code、Cursor、Kiro)能够自动生成类型安全的 ORM 模型代码,同时 AI 驱动的查询优化工具(Prisma Optimize、PlanetScale Insights、pganalyze、AI2sql)可以自动检测 N+1 查询、推荐索引策略、分析慢查询执行计划。本节系统覆盖五大主流 ORM(Prisma、Drizzle、SQLAlchemy、TypeORM、Diesel)的模型代码生成模式,深入讲解 AI 辅助查询优化的完整工作流——从索引建议到慢查询分析、从 N+1 检测到连接池调优,提供完整的提示词模板和实战案例。
1. ORM 生态全景与 AI 辅助能力对比
1.1 主流 ORM 对比(2025-2026)
| ORM | 语言/生态 | 类型安全 | AI 友好度 | 查询优化工具 | 价格 | 适用场景 |
|---|---|---|---|---|---|---|
| Prisma | TypeScript/JavaScript | ⭐⭐⭐⭐⭐ 自动生成类型 | ⭐⭐⭐⭐⭐ Schema DSL 清晰 | Prisma Optimize(AI 驱动) | 免费(开源)/ Optimize $0 起 / Accelerate $0.20/100K ops | Node.js 全栈项目,类型安全优先 |
| Drizzle | TypeScript/JavaScript | ⭐⭐⭐⭐⭐ TypeScript 原生 | ⭐⭐⭐⭐ SQL-like 语法 | 无内置(配合外部工具) | 免费(开源)/ Drizzle Studio 免费 | 轻量 TypeScript 项目,SQL 熟练者 |
| SQLAlchemy | Python | ⭐⭐⭐⭐ 类型注解支持 | ⭐⭐⭐⭐ 灵活可定制 | 无内置(配合 pganalyze 等) | 免费(开源) | Python/FastAPI/Flask 项目 |
| TypeORM | TypeScript/JavaScript | ⭐⭐⭐ 装饰器类型 | ⭐⭐⭐ Entity 模式成熟 | 无内置(配合外部工具) | 免费(开源) | NestJS/Express 项目,ActiveRecord 风格 |
| Diesel | Rust | ⭐⭐⭐⭐⭐ 编译时验证 | ⭐⭐⭐ 宏系统复杂 | 无内置(编译时安全) | 免费(开源) | Rust 后端项目,零运行时开销 |
| Django ORM | Python (Django) | ⭐⭐⭐ 动态类型 | ⭐⭐⭐⭐ 成熟稳定 | Django Debug Toolbar / Silk | 免费(开源) | Django 项目 |
| SeaORM | Rust | ⭐⭐⭐⭐ 异步原生 | ⭐⭐⭐ 较新生态 | 无内置 | 免费(开源) | Rust 异步后端项目 |
| Drizzle + Kysely | TypeScript | ⭐⭐⭐⭐⭐ 类型推断 | ⭐⭐⭐⭐ SQL 构建器 | 无内置 | 免费(开源) | 复杂查询场景 |
1.2 AI 辅助 ORM 开发的能力边界
┌─────────────────────────────────────────────────────────────────┐
│ AI 辅助 ORM 开发能力矩阵 │
├─────────────────────┬───────────────────────────────────────────┤
│ ✅ AI 擅长 │ ⚠️ AI 需要人工审查 │
│ │ │
│ • Schema → Model │ • 复杂关联查询优化 │
│ 代码生成 │ • 分布式事务设计 │
│ • CRUD 操作生成 │ • 连接池参数调优 │
│ • 基础关联定义 │ • 缓存失效策略 │
│ • 类型定义生成 │ • 读写分离路由 │
│ • 索引建议 │ │
│ • N+1 检测 │ │
│ • 查询重写建议 │ │
├─────────────────────┼───────────────────────────────────────────┤
│ ❌ AI 容易犯错 │ 🔴 AI 不应自主决策 │
│ │ │
│ • 过度 eager load │ • 生产数据库索引变更 │
│ • 忽略查询复杂度 │ • 大表 Schema 迁移 │
│ • 不当使用原生 SQL │ • 数据库引擎选择 │
│ • 忽略并发场景 │ • 分库分表策略 │
│ • 缓存一致性问题 │ • 备份与恢复策略 │
└─────────────────────┴───────────────────────────────────────────┘2. Prisma:类型安全的 ORM 模型生成与 AI 优化
2.1 Prisma 模型代码生成
Prisma 采用 Schema-first 方式:开发者定义 schema.prisma,Prisma 自动生成完全类型安全的 Client 代码。AI 编码助手对 Prisma Schema DSL 的理解能力极强,是 2025 年 AI 友好度最高的 ORM。
Schema 定义与 Client 生成
// prisma/schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model User {
id String @id @default(uuid())
email String @unique
name String
avatarUrl String? @map("avatar_url")
isActive Boolean @default(true) @map("is_active")
role Role @default(USER)
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
posts Post[]
comments Comment[]
profile Profile?
@@map("users")
@@index([email])
@@index([createdAt])
}
model Post {
id String @id @default(uuid())
title String
content String?
slug String @unique
published Boolean @default(false)
publishedAt DateTime? @map("published_at")
authorId String @map("author_id")
categoryId String? @map("category_id")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
author User @relation(fields: [authorId], references: [id], onDelete: Cascade)
category Category? @relation(fields: [categoryId], references: [id])
comments Comment[]
tags Tag[]
@@map("posts")
@@index([authorId])
@@index([categoryId])
@@index([publishedAt])
@@index([slug])
}
model Category {
id String @id @default(uuid())
name String @unique
slug String @unique
parentId String? @map("parent_id")
parent Category? @relation("CategoryTree", fields: [parentId], references: [id])
children Category[] @relation("CategoryTree")
posts Post[]
@@map("categories")
}
model Comment {
id String @id @default(uuid())
content String
authorId String @map("author_id")
postId String @map("post_id")
parentId String? @map("parent_id")
createdAt DateTime @default(now()) @map("created_at")
author User @relation(fields: [authorId], references: [id], onDelete: Cascade)
post Post @relation(fields: [postId], references: [id], onDelete: Cascade)
parent Comment? @relation("CommentThread", fields: [parentId], references: [id])
replies Comment[] @relation("CommentThread")
@@map("comments")
@@index([postId])
@@index([authorId])
}
model Tag {
id String @id @default(uuid())
name String @unique
posts Post[]
@@map("tags")
}
model Profile {
id String @id @default(uuid())
bio String?
userId String @unique @map("user_id")
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
@@map("profiles")
}
enum Role {
USER
ADMIN
MODERATOR
}# 生成 Prisma Client(自动生成类型安全的查询 API)
npx prisma generate
# 生成的类型文件位于 node_modules/.prisma/client/
# 包含所有模型的 TypeScript 类型定义AI 生成的类型安全查询模式
// src/repositories/user.repository.ts
import { PrismaClient, Prisma } from '@prisma/client';
const prisma = new PrismaClient();
// ✅ 基础 CRUD 操作(AI 可以完美生成)
export class UserRepository {
// 创建用户(含关联 Profile)
async create(data: {
email: string;
name: string;
bio?: string;
}) {
return prisma.user.create({
data: {
email: data.email,
name: data.name,
profile: data.bio ? {
create: { bio: data.bio }
} : undefined,
},
include: {
profile: true,
},
});
}
// 分页查询(含筛选和排序)
async findMany(params: {
page: number;
pageSize: number;
search?: string;
role?: 'USER' | 'ADMIN' | 'MODERATOR';
orderBy?: 'createdAt' | 'name' | 'email';
order?: 'asc' | 'desc';
}) {
const { page, pageSize, search, role, orderBy = 'createdAt', order = 'desc' } = params;
const where: Prisma.UserWhereInput = {
isActive: true,
...(search && {
OR: [
{ name: { contains: search, mode: 'insensitive' } },
{ email: { contains: search, mode: 'insensitive' } },
],
}),
...(role && { role }),
};
const [users, total] = await Promise.all([
prisma.user.findMany({
where,
skip: (page - 1) * pageSize,
take: pageSize,
orderBy: { [orderBy]: order },
include: {
profile: true,
_count: { select: { posts: true, comments: true } },
},
}),
prisma.user.count({ where }),
]);
return {
data: users,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}
// 带关联的详情查询
async findByIdWithPosts(id: string) {
return prisma.user.findUnique({
where: { id },
include: {
profile: true,
posts: {
where: { published: true },
orderBy: { publishedAt: 'desc' },
take: 10,
include: {
category: true,
tags: true,
_count: { select: { comments: true } },
},
},
},
});
}
// 事务操作:转移用户所有文章
async transferPosts(fromUserId: string, toUserId: string) {
return prisma.$transaction(async (tx) => {
const posts = await tx.post.findMany({
where: { authorId: fromUserId },
select: { id: true },
});
await tx.post.updateMany({
where: { authorId: fromUserId },
data: { authorId: toUserId },
});
return { transferredCount: posts.length };
});
}
}2.2 Prisma Optimize:AI 驱动的查询优化
Prisma Optimize 是 Prisma 官方推出的 AI 驱动查询分析工具,能够自动检测性能瓶颈并提供优化建议。
工具信息
| 特性 | 详情 |
|---|---|
| 类型 | SaaS 查询分析平台 |
| 集成方式 | Prisma Client 扩展 |
| 核心能力 | N+1 检测、缺失索引建议、全表扫描告警、查询延迟分析 |
| 价格 | 免费(基础)/ $29/月(Pro)/ 自定义(Enterprise) |
| 支持数据库 | PostgreSQL、MySQL、SQLite、MongoDB |
集成 Prisma Optimize
// src/lib/prisma.ts
import { PrismaClient } from '@prisma/client';
import { withOptimize } from '@prisma/extension-optimize';
const prisma = new PrismaClient().$extends(
withOptimize({
apiKey: process.env.PRISMA_OPTIMIZE_API_KEY!,
})
);
export default prisma;Prisma Optimize 检测的问题类型
┌─────────────────────────────────────────────────────────────────┐
│ Prisma Optimize 检测能力 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 🔍 N+1 查询检测 │
│ ├── 检测循环中的重复查询 │
│ ├── 建议使用 include/select 预加载 │
│ └── 提供具体的代码修改建议 │
│ │
│ 📊 缺失索引建议 │
│ ├── 分析 WHERE 子句中的字段 │
│ ├── 检测 ORDER BY 未索引字段 │
│ └── 推荐复合索引组合 │
│ │
│ ⚡ 全表扫描告警 │
│ ├── 检测缺少 WHERE 条件的大表查询 │
│ ├── 建议添加 LIMIT 限制 │
│ └── 标记潜在的性能瓶颈 │
│ │
│ 📈 查询延迟分析 │
│ ├── P50/P95/P99 延迟统计 │
│ ├── 查询分组和模式识别 │
│ └── 历史趋势对比 │
│ │
│ 🔄 不必要的数据获取 │
│ ├── 检测 SELECT * 模式 │
│ ├── 建议使用 select 精确选择字段 │
│ └── 减少网络传输和内存占用 │
│ │
└─────────────────────────────────────────────────────────────────┘2.3 Prisma N+1 查询检测与修复
N+1 查询是 ORM 中最常见的性能问题。以下是 Prisma 中 N+1 问题的典型场景和修复方法:
// ❌ N+1 问题:循环中逐个查询作者信息
async function getPostsWithAuthors_BAD() {
const posts = await prisma.post.findMany({
where: { published: true },
take: 20,
});
// N+1:每篇文章都会触发一次额外查询获取作者
const postsWithAuthors = await Promise.all(
posts.map(async (post) => {
const author = await prisma.user.findUnique({
where: { id: post.authorId },
});
return { ...post, author };
})
);
return postsWithAuthors;
}
// 总查询数:1(获取文章)+ 20(获取每个作者)= 21 次查询!
// ✅ 修复方案 1:使用 include 预加载
async function getPostsWithAuthors_GOOD_1() {
return prisma.post.findMany({
where: { published: true },
take: 20,
include: {
author: {
select: { id: true, name: true, avatarUrl: true },
},
},
});
}
// 总查询数:1 次(Prisma 自动 JOIN 或批量查询)
// ✅ 修复方案 2:使用 select 精确控制返回字段
async function getPostsWithAuthors_GOOD_2() {
return prisma.post.findMany({
where: { published: true },
take: 20,
select: {
id: true,
title: true,
slug: true,
publishedAt: true,
author: {
select: { id: true, name: true, avatarUrl: true },
},
_count: { select: { comments: true } },
},
});
}
// 总查询数:1 次,且只返回需要的字段
// ✅ 修复方案 3:复杂场景使用 $queryRaw
async function getPostStats() {
return prisma.$queryRaw`
SELECT
p.id,
p.title,
u.name AS author_name,
COUNT(c.id) AS comment_count,
COUNT(DISTINCT t.id) AS tag_count
FROM posts p
JOIN users u ON p.author_id = u.id
LEFT JOIN comments c ON c.post_id = p.id
LEFT JOIN "_PostToTag" pt ON pt."A" = p.id
LEFT JOIN tags t ON t.id = pt."B"
WHERE p.published = true
GROUP BY p.id, p.title, u.name
ORDER BY comment_count DESC
LIMIT 10
`;
}2.4 Prisma 查询优化提示词模板
提示词模板:生成优化的 Prisma 查询
你是一位 Prisma ORM 专家。请根据以下需求生成高性能的 Prisma 查询代码。
## Prisma Schema
[粘贴相关的 schema.prisma 模型定义]
## 查询需求
[描述需要实现的查询功能]
## 性能要求
- 避免 N+1 查询(使用 include 或 select 预加载关联数据)
- 只返回必要的字段(使用 select 而非返回整个模型)
- 大列表查询必须分页(使用 skip/take)
- 复杂聚合考虑使用 $queryRaw
- 写操作使用事务($transaction)保证一致性
## 输出要求
1. TypeScript 代码(含完整类型定义)
2. 预估查询次数和复杂度
3. 建议的索引(如果当前 Schema 缺少)
4. 潜在的性能风险和优化建议提示词模板:审查 Prisma 查询性能
你是一位数据库性能专家。请审查以下 Prisma 查询代码的性能问题。
## 代码
[粘贴 Prisma 查询代码]
## Schema
[粘贴相关 Schema]
## 数据规模
- [表名] 约 [行数] 行
- 日均查询量:[次数]
## 审查清单
1. 是否存在 N+1 查询?
2. 是否有不必要的数据获取(SELECT *)?
3. 是否缺少分页?
4. 关联查询是否可以优化?
5. 是否需要添加索引?
6. 是否应该使用 $queryRaw 替代 ORM 查询?
7. 事务使用是否合理?
## 输出
- 🔴 严重问题(必须修复)
- 🟡 性能隐患(建议修复)
- 🟢 可接受
- 每个问题提供修复后的代码3. Drizzle ORM:SQL-Like 的类型安全查询
3.1 Drizzle 模型定义与查询模式
Drizzle 的核心理念是”如果你懂 SQL,你就懂 Drizzle”。它提供了最接近原生 SQL 的 TypeScript ORM 体验,同时保持完整的类型安全。
Schema 定义
// src/db/schema.ts
import {
pgTable, uuid, varchar, text, boolean, timestamp,
integer, pgEnum, index, uniqueIndex, primaryKey
} from 'drizzle-orm/pg-core';
import { relations } from 'drizzle-orm';
// 枚举定义
export const roleEnum = pgEnum('role', ['USER', 'ADMIN', 'MODERATOR']);
// 用户表
export const users = pgTable('users', {
id: uuid('id').primaryKey().defaultRandom(),
email: varchar('email', { length: 255 }).notNull().unique(),
name: varchar('name', { length: 100 }).notNull(),
avatarUrl: text('avatar_url'),
isActive: boolean('is_active').notNull().default(true),
role: roleEnum('role').notNull().default('USER'),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
}, (table) => [
index('users_email_idx').on(table.email),
index('users_created_at_idx').on(table.createdAt),
]);
// 文章表
export const posts = pgTable('posts', {
id: uuid('id').primaryKey().defaultRandom(),
title: varchar('title', { length: 255 }).notNull(),
content: text('content'),
slug: varchar('slug', { length: 255 }).notNull().unique(),
published: boolean('published').notNull().default(false),
publishedAt: timestamp('published_at'),
authorId: uuid('author_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
categoryId: uuid('category_id').references(() => categories.id),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
}, (table) => [
index('posts_author_id_idx').on(table.authorId),
index('posts_category_id_idx').on(table.categoryId),
index('posts_published_at_idx').on(table.publishedAt),
uniqueIndex('posts_slug_idx').on(table.slug),
]);
// 分类表(自引用树形结构)
export const categories = pgTable('categories', {
id: uuid('id').primaryKey().defaultRandom(),
name: varchar('name', { length: 100 }).notNull().unique(),
slug: varchar('slug', { length: 100 }).notNull().unique(),
parentId: uuid('parent_id'),
});
// 评论表
export const comments = pgTable('comments', {
id: uuid('id').primaryKey().defaultRandom(),
content: text('content').notNull(),
authorId: uuid('author_id').notNull().references(() => users.id, { onDelete: 'cascade' }),
postId: uuid('post_id').notNull().references(() => posts.id, { onDelete: 'cascade' }),
parentId: uuid('parent_id'),
createdAt: timestamp('created_at').notNull().defaultNow(),
}, (table) => [
index('comments_post_id_idx').on(table.postId),
index('comments_author_id_idx').on(table.authorId),
]);
// 标签表(多对多)
export const tags = pgTable('tags', {
id: uuid('id').primaryKey().defaultRandom(),
name: varchar('name', { length: 50 }).notNull().unique(),
});
export const postsToTags = pgTable('posts_to_tags', {
postId: uuid('post_id').notNull().references(() => posts.id, { onDelete: 'cascade' }),
tagId: uuid('tag_id').notNull().references(() => tags.id, { onDelete: 'cascade' }),
}, (table) => [
primaryKey({ columns: [table.postId, table.tagId] }),
]);
// 用户资料表
export const profiles = pgTable('profiles', {
id: uuid('id').primaryKey().defaultRandom(),
bio: text('bio'),
userId: uuid('user_id').notNull().unique().references(() => users.id, { onDelete: 'cascade' }),
});关系定义
// src/db/relations.ts
import { relations } from 'drizzle-orm';
import { users, posts, comments, tags, postsToTags, profiles, categories } from './schema';
export const usersRelations = relations(users, ({ one, many }) => ({
profile: one(profiles, {
fields: [users.id],
references: [profiles.userId],
}),
posts: many(posts),
comments: many(comments),
}));
export const postsRelations = relations(posts, ({ one, many }) => ({
author: one(users, {
fields: [posts.authorId],
references: [users.id],
}),
category: one(categories, {
fields: [posts.categoryId],
references: [categories.id],
}),
comments: many(comments),
postsToTags: many(postsToTags),
}));
export const commentsRelations = relations(comments, ({ one }) => ({
author: one(users, {
fields: [comments.authorId],
references: [users.id],
}),
post: one(posts, {
fields: [comments.postId],
references: [posts.id],
}),
}));
export const postsToTagsRelations = relations(postsToTags, ({ one }) => ({
post: one(posts, {
fields: [postsToTags.postId],
references: [posts.id],
}),
tag: one(tags, {
fields: [postsToTags.tagId],
references: [tags.id],
}),
}));
export const tagsRelations = relations(tags, ({ many }) => ({
postsToTags: many(postsToTags),
}));
export const categoriesRelations = relations(categories, ({ many }) => ({
posts: many(posts),
}));
export const profilesRelations = relations(profiles, ({ one }) => ({
user: one(users, {
fields: [profiles.userId],
references: [users.id],
}),
}));3.2 Drizzle 查询模式与优化
// src/repositories/post.repository.ts
import { db } from '../db';
import { posts, users, comments, tags, postsToTags, categories } from '../db/schema';
import { eq, and, or, like, desc, asc, sql, count, ilike, isNotNull } from 'drizzle-orm';
// ✅ 关系查询 API(推荐:自动处理 JOIN,避免 N+1)
async function getPostsWithRelations() {
return db.query.posts.findMany({
where: eq(posts.published, true),
limit: 20,
orderBy: [desc(posts.publishedAt)],
with: {
author: {
columns: { id: true, name: true, avatarUrl: true },
},
category: {
columns: { id: true, name: true, slug: true },
},
comments: {
columns: { id: true },
// 只计数,不返回完整评论
},
postsToTags: {
with: {
tag: true,
},
},
},
columns: {
id: true,
title: true,
slug: true,
publishedAt: true,
},
});
}
// ✅ SQL-like 查询构建器(适合复杂查询)
async function getPostStats() {
return db
.select({
postId: posts.id,
title: posts.title,
authorName: users.name,
commentCount: count(comments.id).as('comment_count'),
})
.from(posts)
.innerJoin(users, eq(posts.authorId, users.id))
.leftJoin(comments, eq(comments.postId, posts.id))
.where(eq(posts.published, true))
.groupBy(posts.id, posts.title, users.name)
.orderBy(desc(sql`comment_count`))
.limit(10);
}
// ✅ 分页查询(含搜索和筛选)
async function searchPosts(params: {
search?: string;
categoryId?: string;
page: number;
pageSize: number;
}) {
const { search, categoryId, page, pageSize } = params;
const conditions = [eq(posts.published, true)];
if (search) {
conditions.push(
or(
ilike(posts.title, `%${search}%`),
ilike(posts.content, `%${search}%`)
)!
);
}
if (categoryId) {
conditions.push(eq(posts.categoryId, categoryId));
}
const whereClause = and(...conditions);
const [data, totalResult] = await Promise.all([
db.query.posts.findMany({
where: whereClause,
limit: pageSize,
offset: (page - 1) * pageSize,
orderBy: [desc(posts.publishedAt)],
with: {
author: { columns: { id: true, name: true } },
category: { columns: { id: true, name: true } },
},
}),
db.select({ count: count() }).from(posts).where(whereClause),
]);
return {
data,
pagination: {
page,
pageSize,
total: totalResult[0].count,
totalPages: Math.ceil(totalResult[0].count / pageSize),
},
};
}
// ✅ 子查询和 CTE
async function getActiveAuthorsWithPostCount() {
const postCountSubquery = db
.select({
authorId: posts.authorId,
postCount: count(posts.id).as('post_count'),
})
.from(posts)
.where(eq(posts.published, true))
.groupBy(posts.authorId)
.as('post_counts');
return db
.select({
userId: users.id,
userName: users.name,
email: users.email,
postCount: postCountSubquery.postCount,
})
.from(users)
.innerJoin(postCountSubquery, eq(users.id, postCountSubquery.authorId))
.where(eq(users.isActive, true))
.orderBy(desc(postCountSubquery.postCount));
}
// ✅ 事务操作
async function createPostWithTags(data: {
title: string;
content: string;
slug: string;
authorId: string;
categoryId?: string;
tagIds: string[];
}) {
return db.transaction(async (tx) => {
const [post] = await tx.insert(posts).values({
title: data.title,
content: data.content,
slug: data.slug,
authorId: data.authorId,
categoryId: data.categoryId,
}).returning();
if (data.tagIds.length > 0) {
await tx.insert(postsToTags).values(
data.tagIds.map((tagId) => ({
postId: post.id,
tagId,
}))
);
}
return post;
});
}3.3 Drizzle N+1 检测与修复
// ❌ N+1 问题:使用 select 构建器时手动循环查询
async function getPostsWithAuthors_BAD() {
const allPosts = await db.select().from(posts).where(eq(posts.published, true)).limit(20);
// N+1:每篇文章单独查询作者
const result = await Promise.all(
allPosts.map(async (post) => {
const [author] = await db
.select({ name: users.name, avatarUrl: users.avatarUrl })
.from(users)
.where(eq(users.id, post.authorId));
return { ...post, author };
})
);
return result;
}
// 总查询:1 + 20 = 21 次
// ✅ 修复方案 1:使用 JOIN
async function getPostsWithAuthors_GOOD_1() {
return db
.select({
post: posts,
authorName: users.name,
authorAvatar: users.avatarUrl,
})
.from(posts)
.innerJoin(users, eq(posts.authorId, users.id))
.where(eq(posts.published, true))
.limit(20);
}
// 总查询:1 次
// ✅ 修复方案 2:使用关系查询 API
async function getPostsWithAuthors_GOOD_2() {
return db.query.posts.findMany({
where: eq(posts.published, true),
limit: 20,
with: {
author: {
columns: { name: true, avatarUrl: true },
},
},
});
}
// 总查询:Drizzle 自动优化为最少查询次数4. SQLAlchemy:Python 生态的 ORM 标准
4.1 SQLAlchemy 模型代码生成
SQLAlchemy 2.0+ 采用声明式映射(Declarative Mapping),支持完整的类型注解,AI 编码助手可以生成类型安全的 Python 模型代码。
模型定义(SQLAlchemy 2.0+ 风格)
# app/models/base.py
from datetime import datetime
from uuid import uuid4
from sqlalchemy import String, Text, Boolean, DateTime, ForeignKey, Index, Enum as SAEnum
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column, relationship,
MappedAsDataclass
)
import enum
class Base(DeclarativeBase):
pass
class Role(str, enum.Enum):
USER = "USER"
ADMIN = "ADMIN"
MODERATOR = "MODERATOR"# app/models/user.py
from __future__ import annotations
from typing import Optional, List
from sqlalchemy import String, Text, Boolean, DateTime, Index
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from uuid import uuid4
from .base import Base, Role
class User(Base):
__tablename__ = "users"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4())
)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(100), nullable=False)
avatar_url: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
role: Mapped[Role] = mapped_column(
SAEnum(Role, name="role"), default=Role.USER, nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
# 关系
profile: Mapped[Optional["Profile"]] = relationship(
back_populates="user", cascade="all, delete-orphan", uselist=False
)
posts: Mapped[List["Post"]] = relationship(
back_populates="author", cascade="all, delete-orphan"
)
comments: Mapped[List["Comment"]] = relationship(
back_populates="author", cascade="all, delete-orphan"
)
__table_args__ = (
Index("ix_users_email", "email"),
Index("ix_users_created_at", "created_at"),
)
def __repr__(self) -> str:
return f"<User(id={self.id}, email={self.email})>"# app/models/post.py
from __future__ import annotations
from typing import Optional, List
from sqlalchemy import String, Text, Boolean, DateTime, ForeignKey, Index, Table, Column
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from uuid import uuid4
from .base import Base
# 多对多关联表
posts_tags = Table(
"posts_tags",
Base.metadata,
Column("post_id", UUID(as_uuid=False), ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", UUID(as_uuid=False), ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4())
)
title: Mapped[str] = mapped_column(String(255), nullable=False)
content: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
published_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
author_id: Mapped[str] = mapped_column(
UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
category_id: Mapped[Optional[str]] = mapped_column(
UUID(as_uuid=False), ForeignKey("categories.id"), nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
# 关系
author: Mapped["User"] = relationship(back_populates="posts")
category: Mapped[Optional["Category"]] = relationship(back_populates="posts")
comments: Mapped[List["Comment"]] = relationship(
back_populates="post", cascade="all, delete-orphan"
)
tags: Mapped[List["Tag"]] = relationship(
secondary=posts_tags, back_populates="posts"
)
__table_args__ = (
Index("ix_posts_author_id", "author_id"),
Index("ix_posts_category_id", "category_id"),
Index("ix_posts_published_at", "published_at"),
Index("ix_posts_slug", "slug"),
)
class Category(Base):
__tablename__ = "categories"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4())
)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
parent_id: Mapped[Optional[str]] = mapped_column(
UUID(as_uuid=False), ForeignKey("categories.id"), nullable=True
)
parent: Mapped[Optional["Category"]] = relationship(
remote_side="Category.id", back_populates="children"
)
children: Mapped[List["Category"]] = relationship(back_populates="parent")
posts: Mapped[List["Post"]] = relationship(back_populates="category")
class Comment(Base):
__tablename__ = "comments"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4())
)
content: Mapped[str] = mapped_column(Text, nullable=False)
author_id: Mapped[str] = mapped_column(
UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
post_id: Mapped[str] = mapped_column(
UUID(as_uuid=False), ForeignKey("posts.id", ondelete="CASCADE"), nullable=False
)
parent_id: Mapped[Optional[str]] = mapped_column(
UUID(as_uuid=False), ForeignKey("comments.id"), nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
)
author: Mapped["User"] = relationship(back_populates="comments")
post: Mapped["Post"] = relationship(back_populates="comments")
__table_args__ = (
Index("ix_comments_post_id", "post_id"),
Index("ix_comments_author_id", "author_id"),
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4())
)
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
posts: Mapped[List["Post"]] = relationship(
secondary=posts_tags, back_populates="tags"
)
class Profile(Base):
__tablename__ = "profiles"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4())
)
bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
user_id: Mapped[str] = mapped_column(
UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"),
unique=True, nullable=False
)
user: Mapped["User"] = relationship(back_populates="profile")4.2 SQLAlchemy 查询优化模式
# app/repositories/post_repository.py
from sqlalchemy import select, func, and_, or_, desc
from sqlalchemy.orm import selectinload, joinedload, subqueryload, load_only
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.post import Post, Comment, Tag, Category
from app.models.user import User
class PostRepository:
def __init__(self, session: AsyncSession):
self.session = session
# ✅ 使用 selectinload 避免 N+1(推荐用于一对多关系)
async def get_posts_with_authors(self, limit: int = 20):
stmt = (
select(Post)
.where(Post.published == True)
.options(
joinedload(Post.author).load_only(User.id, User.name, User.avatar_url),
selectinload(Post.tags),
joinedload(Post.category),
)
.order_by(desc(Post.published_at))
.limit(limit)
)
result = await self.session.execute(stmt)
return result.scalars().unique().all()
# ✅ 分页查询(含搜索和筛选)
async def search_posts(
self,
page: int = 1,
page_size: int = 20,
search: str | None = None,
category_id: str | None = None,
):
conditions = [Post.published == True]
if search:
conditions.append(
or_(
Post.title.ilike(f"%{search}%"),
Post.content.ilike(f"%{search}%"),
)
)
if category_id:
conditions.append(Post.category_id == category_id)
where_clause = and_(*conditions)
# 数据查询
data_stmt = (
select(Post)
.where(where_clause)
.options(
joinedload(Post.author).load_only(User.id, User.name),
joinedload(Post.category).load_only(Category.id, Category.name),
)
.order_by(desc(Post.published_at))
.offset((page - 1) * page_size)
.limit(page_size)
)
# 计数查询
count_stmt = select(func.count(Post.id)).where(where_clause)
data_result = await self.session.execute(data_stmt)
count_result = await self.session.execute(count_stmt)
posts = data_result.scalars().unique().all()
total = count_result.scalar_one()
return {
"data": posts,
"pagination": {
"page": page,
"page_size": page_size,
"total": total,
"total_pages": (total + page_size - 1) // page_size,
},
}
# ✅ 聚合查询(使用原生 SQL 表达式)
async def get_post_stats(self):
stmt = (
select(
Post.id,
Post.title,
User.name.label("author_name"),
func.count(Comment.id).label("comment_count"),
)
.join(User, Post.author_id == User.id)
.outerjoin(Comment, Comment.post_id == Post.id)
.where(Post.published == True)
.group_by(Post.id, Post.title, User.name)
.order_by(desc(func.count(Comment.id)))
.limit(10)
)
result = await self.session.execute(stmt)
return result.all()4.3 SQLAlchemy N+1 检测与修复
# ❌ N+1 问题:默认 lazy loading
async def get_posts_bad(session: AsyncSession):
stmt = select(Post).where(Post.published == True).limit(20)
result = await session.execute(stmt)
posts = result.scalars().all()
# 每次访问 post.author 都会触发一次额外查询!
for post in posts:
print(f"{post.title} by {post.author.name}") # N+1!
return posts
# ✅ 修复方案 1:joinedload(LEFT JOIN,适合一对一/多对一)
async def get_posts_good_1(session: AsyncSession):
stmt = (
select(Post)
.where(Post.published == True)
.options(joinedload(Post.author))
.limit(20)
)
result = await session.execute(stmt)
posts = result.scalars().unique().all()
for post in posts:
print(f"{post.title} by {post.author.name}") # 无额外查询
return posts
# ✅ 修复方案 2:selectinload(SELECT IN,适合一对多)
async def get_posts_good_2(session: AsyncSession):
stmt = (
select(Post)
.where(Post.published == True)
.options(
joinedload(Post.author), # 多对一:用 JOIN
selectinload(Post.comments), # 一对多:用 SELECT IN
selectinload(Post.tags), # 多对多:用 SELECT IN
)
.limit(20)
)
result = await session.execute(stmt)
return result.scalars().unique().all()
# ✅ 修复方案 3:subqueryload(子查询,适合大数据集)
async def get_posts_good_3(session: AsyncSession):
stmt = (
select(Post)
.where(Post.published == True)
.options(
subqueryload(Post.comments), # 使用子查询加载
)
.limit(20)
)
result = await session.execute(stmt)
return result.scalars().all()SQLAlchemy 加载策略对比
| 策略 | SQL 模式 | 适用关系 | 优点 | 缺点 |
|---|---|---|---|---|
joinedload | LEFT JOIN | 多对一、一对一 | 单次查询 | 大结果集时数据重复 |
selectinload | SELECT … WHERE id IN (…) | 一对多、多对多 | 无数据重复 | 需要两次查询 |
subqueryload | 子查询 | 一对多(大数据集) | 适合复杂过滤 | 子查询可能较慢 |
lazyload | 按需查询 | 任何 | 简单 | N+1 风险 |
raiseload | 禁止加载 | 任何 | 强制显式加载 | 需要预先规划 |
5. TypeORM:Entity-First 的 ORM 模型生成
5.1 TypeORM Entity 定义与代码生成
TypeORM 采用装饰器(Decorator)模式定义 Entity,AI 编码助手可以根据数据库 Schema 或业务需求自动生成完整的 Entity 代码。
Entity 定义
// src/entities/User.ts
import {
Entity, PrimaryGeneratedColumn, Column, CreateDateColumn,
UpdateDateColumn, Index, OneToMany, OneToOne, JoinColumn
} from 'typeorm';
import { Post } from './Post';
import { Comment } from './Comment';
import { Profile } from './Profile';
export enum Role {
USER = 'USER',
ADMIN = 'ADMIN',
MODERATOR = 'MODERATOR',
}
@Entity('users')
export class User {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'varchar', length: 255, unique: true })
@Index('idx_users_email')
email: string;
@Column({ type: 'varchar', length: 100 })
name: string;
@Column({ type: 'text', nullable: true, name: 'avatar_url' })
avatarUrl: string | null;
@Column({ type: 'boolean', default: true, name: 'is_active' })
isActive: boolean;
@Column({ type: 'enum', enum: Role, default: Role.USER })
role: Role;
@CreateDateColumn({ name: 'created_at' })
@Index('idx_users_created_at')
createdAt: Date;
@UpdateDateColumn({ name: 'updated_at' })
updatedAt: Date;
@OneToMany(() => Post, (post) => post.author, { cascade: true })
posts: Post[];
@OneToMany(() => Comment, (comment) => comment.author, { cascade: true })
comments: Comment[];
@OneToOne(() => Profile, (profile) => profile.user, { cascade: true })
profile: Profile;
}// src/entities/Post.ts
import {
Entity, PrimaryGeneratedColumn, Column, CreateDateColumn,
UpdateDateColumn, Index, ManyToOne, OneToMany, ManyToMany,
JoinColumn, JoinTable
} from 'typeorm';
import { User } from './User';
import { Category } from './Category';
import { Comment } from './Comment';
import { Tag } from './Tag';
@Entity('posts')
export class Post {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'varchar', length: 255 })
title: string;
@Column({ type: 'text', nullable: true })
content: string | null;
@Column({ type: 'varchar', length: 255, unique: true })
@Index('idx_posts_slug')
slug: string;
@Column({ type: 'boolean', default: false })
published: boolean;
@Column({ type: 'timestamp', nullable: true, name: 'published_at' })
@Index('idx_posts_published_at')
publishedAt: Date | null;
@Column({ name: 'author_id' })
@Index('idx_posts_author_id')
authorId: string;
@Column({ name: 'category_id', nullable: true })
@Index('idx_posts_category_id')
categoryId: string | null;
@CreateDateColumn({ name: 'created_at' })
createdAt: Date;
@UpdateDateColumn({ name: 'updated_at' })
updatedAt: Date;
@ManyToOne(() => User, (user) => user.posts, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'author_id' })
author: User;
@ManyToOne(() => Category, (category) => category.posts)
@JoinColumn({ name: 'category_id' })
category: Category | null;
@OneToMany(() => Comment, (comment) => comment.post, { cascade: true })
comments: Comment[];
@ManyToMany(() => Tag, (tag) => tag.posts, { cascade: true })
@JoinTable({
name: 'posts_tags',
joinColumn: { name: 'post_id', referencedColumnName: 'id' },
inverseJoinColumn: { name: 'tag_id', referencedColumnName: 'id' },
})
tags: Tag[];
}5.2 TypeORM QueryBuilder 查询优化
// src/repositories/PostRepository.ts
import { Repository, DataSource, Brackets } from 'typeorm';
import { Post } from '../entities/Post';
export class PostRepository {
private repo: Repository<Post>;
constructor(dataSource: DataSource) {
this.repo = dataSource.getRepository(Post);
}
// ✅ 使用 QueryBuilder 避免 N+1
async getPostsWithRelations(limit: number = 20) {
return this.repo
.createQueryBuilder('post')
.leftJoinAndSelect('post.author', 'author')
.leftJoinAndSelect('post.category', 'category')
.leftJoinAndSelect('post.tags', 'tag')
.loadRelationCountAndMap('post.commentCount', 'post.comments')
.where('post.published = :published', { published: true })
.orderBy('post.publishedAt', 'DESC')
.take(limit)
.getMany();
}
// ✅ 分页搜索查询
async searchPosts(params: {
search?: string;
categoryId?: string;
page: number;
pageSize: number;
}) {
const { search, categoryId, page, pageSize } = params;
const qb = this.repo
.createQueryBuilder('post')
.leftJoinAndSelect('post.author', 'author')
.leftJoinAndSelect('post.category', 'category')
.where('post.published = :published', { published: true });
if (search) {
qb.andWhere(
new Brackets((sub) => {
sub
.where('post.title ILIKE :search', { search: `%${search}%` })
.orWhere('post.content ILIKE :search', { search: `%${search}%` });
})
);
}
if (categoryId) {
qb.andWhere('post.categoryId = :categoryId', { categoryId });
}
const [data, total] = await qb
.orderBy('post.publishedAt', 'DESC')
.skip((page - 1) * pageSize)
.take(pageSize)
.getManyAndCount();
return {
data,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}
// ✅ 聚合查询
async getPostStats() {
return this.repo
.createQueryBuilder('post')
.select('post.id', 'postId')
.addSelect('post.title', 'title')
.addSelect('author.name', 'authorName')
.addSelect('COUNT(comment.id)', 'commentCount')
.innerJoin('post.author', 'author')
.leftJoin('post.comments', 'comment')
.where('post.published = :published', { published: true })
.groupBy('post.id')
.addGroupBy('post.title')
.addGroupBy('author.name')
.orderBy('COUNT(comment.id)', 'DESC')
.limit(10)
.getRawMany();
}
// ✅ 事务操作
async createPostWithTags(
dataSource: DataSource,
data: {
title: string;
content: string;
slug: string;
authorId: string;
categoryId?: string;
tagIds: string[];
}
) {
return dataSource.transaction(async (manager) => {
const post = manager.create(Post, {
title: data.title,
content: data.content,
slug: data.slug,
authorId: data.authorId,
categoryId: data.categoryId,
});
const savedPost = await manager.save(post);
if (data.tagIds.length > 0) {
await manager
.createQueryBuilder()
.relation(Post, 'tags')
.of(savedPost.id)
.add(data.tagIds);
}
return savedPost;
});
}
}5.3 TypeORM N+1 检测与修复
// ❌ N+1 问题:使用 find 时未指定 relations
async function getPostsBad(repo: Repository<Post>) {
const posts = await repo.find({
where: { published: true },
take: 20,
});
// 每次访问 post.author 都会触发 lazy loading(如果配置了 lazy)
// 或者返回 undefined(如果没配置 lazy)
for (const post of posts) {
console.log(post.author?.name); // 可能是 undefined 或触发 N+1
}
}
// ✅ 修复方案 1:使用 relations 选项
async function getPostsGood1(repo: Repository<Post>) {
return repo.find({
where: { published: true },
take: 20,
relations: {
author: true,
category: true,
tags: true,
},
select: {
id: true,
title: true,
slug: true,
publishedAt: true,
author: { id: true, name: true, avatarUrl: true },
category: { id: true, name: true },
},
});
}
// ✅ 修复方案 2:使用 QueryBuilder(更灵活)
async function getPostsGood2(repo: Repository<Post>) {
return repo
.createQueryBuilder('post')
.leftJoinAndSelect('post.author', 'author')
.leftJoinAndSelect('post.category', 'category')
.leftJoinAndSelect('post.tags', 'tag')
.select([
'post.id', 'post.title', 'post.slug', 'post.publishedAt',
'author.id', 'author.name', 'author.avatarUrl',
'category.id', 'category.name',
'tag.id', 'tag.name',
])
.where('post.published = :published', { published: true })
.orderBy('post.publishedAt', 'DESC')
.take(20)
.getMany();
}6. Diesel:Rust 生态的编译时安全 ORM
6.1 Diesel 模型代码生成(Queryable/Insertable 宏)
Diesel 的核心优势是编译时 Schema 验证——通过 Queryable、Insertable、Selectable 等 derive 宏,Diesel 在编译阶段就能确保 Rust 结构体与数据库 Schema 完全匹配。
Schema 定义(自动生成)
// src/schema.rs(由 diesel print-schema 自动生成)
diesel::table! {
users (id) {
id -> Uuid,
email -> Varchar,
name -> Varchar,
avatar_url -> Nullable<Text>,
is_active -> Bool,
role -> Varchar,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
diesel::table! {
posts (id) {
id -> Uuid,
title -> Varchar,
content -> Nullable<Text>,
slug -> Varchar,
published -> Bool,
published_at -> Nullable<Timestamp>,
author_id -> Uuid,
category_id -> Nullable<Uuid>,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
diesel::table! {
categories (id) {
id -> Uuid,
name -> Varchar,
slug -> Varchar,
parent_id -> Nullable<Uuid>,
}
}
diesel::table! {
comments (id) {
id -> Uuid,
content -> Text,
author_id -> Uuid,
post_id -> Uuid,
parent_id -> Nullable<Uuid>,
created_at -> Timestamp,
}
}
diesel::table! {
tags (id) {
id -> Uuid,
name -> Varchar,
}
}
diesel::table! {
posts_tags (post_id, tag_id) {
post_id -> Uuid,
tag_id -> Uuid,
}
}
diesel::table! {
profiles (id) {
id -> Uuid,
bio -> Nullable<Text>,
user_id -> Uuid,
}
}
diesel::joinable!(posts -> users (author_id));
diesel::joinable!(posts -> categories (category_id));
diesel::joinable!(comments -> users (author_id));
diesel::joinable!(comments -> posts (post_id));
diesel::joinable!(posts_tags -> posts (post_id));
diesel::joinable!(posts_tags -> tags (tag_id));
diesel::joinable!(profiles -> users (user_id));
diesel::allow_tables_to_appear_in_same_query!(
users, posts, categories, comments, tags, posts_tags, profiles,
);Model 结构体定义
// src/models/user.rs
use diesel::prelude::*;
use uuid::Uuid;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
/// 查询用:从数据库读取完整用户记录
#[derive(Queryable, Selectable, Debug, Serialize)]
#[diesel(table_name = crate::schema::users)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct User {
pub id: Uuid,
pub email: String,
pub name: String,
pub avatar_url: Option<String>,
pub is_active: bool,
pub role: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
}
/// 插入用:创建新用户
#[derive(Insertable, Debug, Deserialize)]
#[diesel(table_name = crate::schema::users)]
pub struct NewUser<'a> {
pub email: &'a str,
pub name: &'a str,
pub avatar_url: Option<&'a str>,
pub role: Option<&'a str>,
}
/// 更新用:部分更新用户信息
#[derive(AsChangeset, Debug, Deserialize)]
#[diesel(table_name = crate::schema::users)]
pub struct UpdateUser<'a> {
pub name: Option<&'a str>,
pub avatar_url: Option<Option<&'a str>>, // Option<Option<>> 支持设置为 NULL
pub is_active: Option<bool>,
pub role: Option<&'a str>,
}
/// 精简查询用:只返回必要字段(避免 SELECT *)
#[derive(Queryable, Selectable, Debug, Serialize)]
#[diesel(table_name = crate::schema::users)]
pub struct UserSummary {
pub id: Uuid,
pub name: String,
pub avatar_url: Option<String>,
}// src/models/post.rs
use diesel::prelude::*;
use uuid::Uuid;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
#[derive(Queryable, Selectable, Debug, Serialize, Associations, Identifiable)]
#[diesel(table_name = crate::schema::posts)]
#[diesel(belongs_to(crate::models::user::User, foreign_key = author_id))]
pub struct Post {
pub id: Uuid,
pub title: String,
pub content: Option<String>,
pub slug: String,
pub published: bool,
pub published_at: Option<NaiveDateTime>,
pub author_id: Uuid,
pub category_id: Option<Uuid>,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
}
#[derive(Insertable, Debug, Deserialize)]
#[diesel(table_name = crate::schema::posts)]
pub struct NewPost<'a> {
pub title: &'a str,
pub content: Option<&'a str>,
pub slug: &'a str,
pub author_id: Uuid,
pub category_id: Option<Uuid>,
}
/// 文章摘要(列表展示用)
#[derive(Queryable, Selectable, Debug, Serialize)]
#[diesel(table_name = crate::schema::posts)]
pub struct PostSummary {
pub id: Uuid,
pub title: String,
pub slug: String,
pub published_at: Option<NaiveDateTime>,
pub author_id: Uuid,
}6.2 Diesel 查询模式与优化
// src/repositories/post_repository.rs
use diesel::prelude::*;
use diesel::pg::PgConnection;
use diesel::dsl::count;
use uuid::Uuid;
use crate::schema::{posts, users, comments, categories};
use crate::models::post::{Post, PostSummary, NewPost};
use crate::models::user::{User, UserSummary};
/// 获取已发布文章(含作者信息,使用 JOIN 避免 N+1)
pub fn get_published_posts_with_authors(
conn: &mut PgConnection,
limit: i64,
) -> QueryResult<Vec<(PostSummary, UserSummary)>> {
posts::table
.inner_join(users::table.on(posts::author_id.eq(users::id)))
.filter(posts::published.eq(true))
.order(posts::published_at.desc())
.limit(limit)
.select((PostSummary::as_select(), UserSummary::as_select()))
.load::<(PostSummary, UserSummary)>(conn)
}
/// 分页查询(含搜索)
pub fn search_posts(
conn: &mut PgConnection,
search: Option<&str>,
category_id: Option<Uuid>,
page: i64,
page_size: i64,
) -> QueryResult<(Vec<(PostSummary, UserSummary)>, i64)> {
let mut query = posts::table
.inner_join(users::table.on(posts::author_id.eq(users::id)))
.filter(posts::published.eq(true))
.into_boxed();
if let Some(search_term) = search {
let pattern = format!("%{}%", search_term);
query = query.filter(
posts::title.ilike(pattern.clone())
.or(posts::content.ilike(pattern))
);
}
if let Some(cat_id) = category_id {
query = query.filter(posts::category_id.eq(cat_id));
}
// 计数查询
let total = query
.count()
.get_result::<i64>(conn)?;
// 数据查询
let data = query
.order(posts::published_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.select((PostSummary::as_select(), UserSummary::as_select()))
.load::<(PostSummary, UserSummary)>(conn)?;
Ok((data, total))
}
/// 聚合查询:文章评论统计
pub fn get_post_comment_stats(
conn: &mut PgConnection,
limit: i64,
) -> QueryResult<Vec<(Uuid, String, String, i64)>> {
posts::table
.inner_join(users::table.on(posts::author_id.eq(users::id)))
.left_join(comments::table.on(comments::post_id.eq(posts::id)))
.filter(posts::published.eq(true))
.group_by((posts::id, posts::title, users::name))
.select((
posts::id,
posts::title,
users::name,
count(comments::id),
))
.order(count(comments::id).desc())
.limit(limit)
.load::<(Uuid, String, String, i64)>(conn)
}
/// 事务操作:创建文章并关联标签
pub fn create_post_with_tags(
conn: &mut PgConnection,
new_post: &NewPost,
tag_ids: &[Uuid],
) -> QueryResult<Post> {
conn.transaction(|conn| {
// 插入文章
let post = diesel::insert_into(posts::table)
.values(new_post)
.returning(Post::as_returning())
.get_result(conn)?;
// 关联标签
if !tag_ids.is_empty() {
use crate::schema::posts_tags;
let values: Vec<_> = tag_ids
.iter()
.map(|tag_id| {
(
posts_tags::post_id.eq(post.id),
posts_tags::tag_id.eq(*tag_id),
)
})
.collect();
diesel::insert_into(posts_tags::table)
.values(&values)
.execute(conn)?;
}
Ok(post)
})
}6.3 Diesel 的编译时安全优势
// ✅ 编译时类型检查:如果 Schema 和 Model 不匹配,编译直接报错
// 假设 schema.rs 中 users 表没有 phone 字段
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::users)]
pub struct UserWithPhone {
pub id: Uuid,
pub name: String,
pub phone: String, // ❌ 编译错误:users 表没有 phone 列!
}
// 假设 schema.rs 中 email 是 Varchar,但 Model 定义为 i32
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::users)]
pub struct UserBadType {
pub id: Uuid,
pub email: i32, // ❌ 编译错误:类型不匹配!Varchar != i32
pub name: String,
}
// ✅ 正确:Model 与 Schema 完全匹配
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::users)]
pub struct UserCorrect {
pub id: Uuid,
pub email: String,
pub name: String,
}
// ✅ 编译通过:类型安全保证7. AI 辅助查询优化工具全景
7.1 查询优化工具对比
| 工具 | 类型 | 核心能力 | 支持数据库 | AI 功能 | 价格 | 适用场景 |
|---|---|---|---|---|---|---|
| Prisma Optimize | SaaS 平台 | 查询延迟分析、N+1 检测、索引建议、全表扫描告警 | PostgreSQL、MySQL、SQLite、MongoDB | AI 驱动的优化建议 | 免费(基础)/ $29/月(Pro) | Prisma 项目专用 |
| PlanetScale Insights | SaaS 平台 | 查询分析、索引建议、慢查询检测、连接监控 | MySQL、PostgreSQL | AI 驱动的索引建议 | 包含在 PlanetScale 计划中($39/月起) | PlanetScale 托管数据库 |
| pganalyze | SaaS/自托管 | EXPLAIN 计划分析、索引建议、查询统计、等待事件分析 | PostgreSQL | Query Advisor(AI 查询优化) | $150/月起(Starter)/ 自定义(Enterprise) | PostgreSQL 生产环境 |
| AI2sql | SaaS 平台 | 自然语言→SQL、SQL 优化、索引建议、查询解释 | PostgreSQL、MySQL、SQLite、SQL Server | 全 AI 驱动 | 免费(基础)/ $12/月(Pro)/ $24/月(Business) | SQL 编写和优化 |
| Datadog DBM | SaaS 平台 | 查询性能监控、执行计划分析、等待分析、资源关联 | PostgreSQL、MySQL、SQL Server、Oracle | AI 异常检测 | $70/主机/月(DBM) | 企业级数据库监控 |
| Metis | SaaS 平台 | SQL 审查、性能预测、Schema 变更影响分析 | PostgreSQL、MySQL | AI SQL 审查 | 免费(开源 Guardian)/ 付费(Platform) | CI/CD 集成 SQL 审查 |
| EverSQL | SaaS 平台 | SQL 优化、索引建议、Schema 优化 | PostgreSQL、MySQL、MariaDB | AI 驱动优化 | 免费(基础)/ $29/月(Pro) | SQL 查询优化 |
| Aiven AI DBOptimizer | SaaS 平台 | 查询优化、索引建议、配置调优 | PostgreSQL、MySQL | AI 驱动 | 包含在 Aiven 计划中 | Aiven 托管数据库 |
7.2 AI 查询优化工作流
┌─────────────────────────────────────────────────────────────────────┐
│ AI 辅助查询优化完整工作流 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ① 查询性能监控 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Prisma Optimize / pganalyze / Datadog DBM │ │
│ │ • 收集查询延迟数据(P50/P95/P99) │ │
│ │ • 识别慢查询(>100ms) │ │
│ │ • 检测 N+1 查询模式 │ │
│ │ • 监控连接池使用率 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ ▼ │
│ ② AI 分析与诊断 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ AI 编码助手 / AI2sql / pganalyze Query Advisor │ │
│ │ • 分析 EXPLAIN ANALYZE 输出 │ │
│ │ • 识别全表扫描和低效 JOIN │ │
│ │ • 检测缺失索引 │ │
│ │ • 评估查询复杂度 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ ▼ │
│ ③ 优化建议生成 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ • 索引创建建议(含 CREATE INDEX 语句) │ │
│ │ • 查询重写建议(含优化后的 SQL/ORM 代码) │ │
│ │ • N+1 修复方案(含具体代码修改) │ │
│ │ • 连接池参数调优建议 │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ ▼ │
│ ④ 人工审查与实施 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ • 审查 AI 建议的索引(评估写入性能影响) │ │
│ │ • 在测试环境验证查询优化效果 │ │
│ │ • 使用 EXPLAIN ANALYZE 对比优化前后 │ │
│ │ • 部署到生产环境并持续监控 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘8. AI 辅助索引优化
8.1 索引类型与适用场景
| 索引类型 | PostgreSQL | MySQL | 适用场景 | AI 建议频率 |
|---|---|---|---|---|
| B-Tree 索引 | ✅ 默认 | ✅ 默认 | 等值查询、范围查询、排序 | ⭐⭐⭐⭐⭐ 最常见 |
| Hash 索引 | ✅ | ✅ | 纯等值查询 | ⭐⭐ 较少使用 |
| GIN 索引 | ✅ | ❌ | 全文搜索、JSONB、数组 | ⭐⭐⭐ 特定场景 |
| GiST 索引 | ✅ | ❌ | 地理空间、范围类型 | ⭐⭐ 特定场景 |
| BRIN 索引 | ✅ | ❌ | 大表时间序列数据 | ⭐⭐⭐ 大数据场景 |
| 部分索引 | ✅ | ❌ | 只索引满足条件的行 | ⭐⭐⭐⭐ AI 常推荐 |
| 覆盖索引 | ✅ INCLUDE | ✅ | 避免回表查询 | ⭐⭐⭐⭐ AI 常推荐 |
| 复合索引 | ✅ | ✅ | 多列查询条件 | ⭐⭐⭐⭐⭐ 最常见 |
| 全文索引 | ✅ tsvector | ✅ FULLTEXT | 文本搜索 | ⭐⭐⭐ 搜索场景 |
8.2 AI 辅助索引建议实战
使用 AI 编码助手分析查询并建议索引
-- 场景:电商平台订单查询
-- 慢查询:按用户 ID + 状态 + 时间范围查询订单
SELECT o.id, o.total_amount, o.status, o.created_at,
oi.product_name, oi.quantity, oi.unit_price
FROM orders o
JOIN order_items oi ON oi.order_id = o.id
WHERE o.user_id = '550e8400-e29b-41d4-a716-446655440000'
AND o.status = 'completed'
AND o.created_at >= '2025-01-01'
AND o.created_at < '2025-07-01'
ORDER BY o.created_at DESC
LIMIT 20;EXPLAIN ANALYZE 输出分析
-- 执行 EXPLAIN ANALYZE 获取查询计划
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT o.id, o.total_amount, o.status, o.created_at,
oi.product_name, oi.quantity, oi.unit_price
FROM orders o
JOIN order_items oi ON oi.order_id = o.id
WHERE o.user_id = '550e8400-e29b-41d4-a716-446655440000'
AND o.status = 'completed'
AND o.created_at >= '2025-01-01'
AND o.created_at < '2025-07-01'
ORDER BY o.created_at DESC
LIMIT 20;
-- 输出示例(优化前):
-- Limit (cost=15234.56..15234.61 rows=20 width=128) (actual time=892.345..892.367 rows=20 loops=1)
-- -> Sort (cost=15234.56..15267.89 rows=13332 width=128) (actual time=892.343..892.358 rows=20 loops=1)
-- Sort Key: o.created_at DESC
-- Sort Method: top-N heapsort Memory: 28kB
-- -> Hash Join (cost=4567.89..14890.12 rows=13332 width=128) (actual time=456.789..878.901 rows=15234 loops=1)
-- Hash Cond: (oi.order_id = o.id)
-- -> Seq Scan on order_items oi (cost=0.00..8901.23 rows=500000 width=64) (actual time=0.012..234.567 rows=500000 loops=1)
-- -> Hash (cost=4234.56..4234.56 rows=13332 width=64) (actual time=456.123..456.123 rows=15234 loops=1)
-- Buckets: 16384 Batches: 1 Memory Usage: 1234kB
-- -> Seq Scan on orders o (cost=0.00..4234.56 rows=13332 width=64) (actual time=0.023..445.678 rows=15234 loops=1)
-- Filter: ((user_id = '550e8400...'::uuid) AND (status = 'completed') AND (created_at >= '2025-01-01') AND (created_at < '2025-07-01'))
-- Rows Removed by Filter: 984766
-- Planning Time: 0.234 ms
-- Execution Time: 892.456 msAI 建议的索引优化方案
-- AI 分析结果:
-- 🔴 问题 1:orders 表全表扫描(Seq Scan),过滤掉 98% 的行
-- 🔴 问题 2:order_items 表全表扫描
-- 🟡 问题 3:排序操作在内存中进行,数据量大时可能溢出到磁盘
-- ✅ 优化方案 1:创建复合索引(覆盖 WHERE + ORDER BY)
CREATE INDEX CONCURRENTLY idx_orders_user_status_created
ON orders (user_id, status, created_at DESC);
-- 效果:将 orders 表的 Seq Scan 变为 Index Scan
-- 复合索引列顺序:等值条件在前,范围条件在后
-- ✅ 优化方案 2:为 order_items 创建外键索引
CREATE INDEX CONCURRENTLY idx_order_items_order_id
ON order_items (order_id);
-- 效果:将 order_items 的 Seq Scan 变为 Index Scan
-- ✅ 优化方案 3:覆盖索引(避免回表)
CREATE INDEX CONCURRENTLY idx_orders_user_status_created_covering
ON orders (user_id, status, created_at DESC)
INCLUDE (id, total_amount);
-- 效果:Index Only Scan,无需回表读取数据页
-- 优化后的 EXPLAIN ANALYZE 预期:
-- Limit (cost=12.34..56.78 rows=20 width=128) (actual time=2.345..2.567 rows=20 loops=1)
-- -> Nested Loop (cost=12.34..1234.56 rows=267 width=128) (actual time=2.340..2.558 rows=20 loops=1)
-- -> Index Only Scan using idx_orders_user_status_created_covering on orders o
-- (cost=0.42..45.67 rows=267 width=64) (actual time=0.034..0.123 rows=20 loops=1)
-- Index Cond: ((user_id = '550e8400...'::uuid) AND (status = 'completed') AND ...)
-- Heap Fetches: 0
-- -> Index Scan using idx_order_items_order_id on order_items oi
-- (cost=0.42..4.56 rows=3 width=64) (actual time=0.008..0.012 rows=3 loops=20)
-- Index Cond: (order_id = o.id)
-- Execution Time: 2.678 ms (从 892ms 优化到 2.7ms,提升 330 倍!)8.3 索引优化提示词模板
提示词模板:分析 EXPLAIN ANALYZE 并建议索引
你是一位 PostgreSQL 性能调优专家。请分析以下 EXPLAIN ANALYZE 输出,
识别性能瓶颈并提供索引优化建议。
## 查询 SQL
[粘贴原始 SQL 查询]
## EXPLAIN ANALYZE 输出
[粘贴完整的 EXPLAIN ANALYZE 输出]
## 表结构和数据规模
- [表名]:约 [行数] 行,[大小] GB
- 现有索引:[列出现有索引]
## 分析要求
1. 逐行解读执行计划,标注性能瓶颈
2. 识别全表扫描(Seq Scan)和低效操作
3. 计算实际过滤比例(Rows Removed by Filter / 总行数)
4. 评估排序和 JOIN 操作的效率
## 优化建议要求
1. 提供具体的 CREATE INDEX 语句
2. 使用 CONCURRENTLY 避免锁表
3. 考虑复合索引的列顺序(等值在前,范围在后)
4. 评估覆盖索引(INCLUDE)的适用性
5. 评估部分索引(WHERE)的适用性
6. 估算索引大小和写入性能影响
7. 提供优化后的预期执行计划
## 输出格式
- 🔴 严重瓶颈(必须优化)
- 🟡 潜在问题(建议优化)
- 🟢 可接受
- 每个建议包含 SQL 语句和预期效果9. N+1 查询检测与修复全攻略
9.1 N+1 问题的本质
N+1 查询是 ORM 中最常见也最隐蔽的性能问题。它的本质是:获取 N 条主记录后,对每条记录分别执行 1 次关联查询,总共产生 N+1 次数据库查询。
┌─────────────────────────────────────────────────────────────────┐
│ N+1 查询问题图解 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 请求:获取 20 篇文章及其作者信息 │
│ │
│ ❌ N+1 模式(21 次查询): │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Query 1: SELECT * FROM posts LIMIT 20 │ │
│ │ Query 2: SELECT * FROM users WHERE id = 'author_1' │ │
│ │ Query 3: SELECT * FROM users WHERE id = 'author_2' │ │
│ │ Query 4: SELECT * FROM users WHERE id = 'author_3' │ │
│ │ ... │ │
│ │ Query 21: SELECT * FROM users WHERE id = 'author_20'│ │
│ └──────────────────────────────────────────────────────┘ │
│ 总耗时:20ms + 20 × 5ms = 120ms │
│ │
│ ✅ 优化后(1-2 次查询): │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 方案 A - JOIN(1 次查询): │ │
│ │ SELECT p.*, u.name FROM posts p │ │
│ │ JOIN users u ON p.author_id = u.id LIMIT 20 │ │
│ │ │ │
│ │ 方案 B - Batch Load(2 次查询): │ │
│ │ Query 1: SELECT * FROM posts LIMIT 20 │ │
│ │ Query 2: SELECT * FROM users WHERE id IN (...) │ │
│ └──────────────────────────────────────────────────────┘ │
│ 总耗时:25ms(方案 A)或 30ms(方案 B) │
│ │
└─────────────────────────────────────────────────────────────────┘9.2 各 ORM 的 N+1 检测与修复速查表
| ORM | N+1 检测方式 | 预加载方案 | 批量加载方案 | 推荐工具 |
|---|---|---|---|---|
| Prisma | Prisma Optimize 自动检测 | include: { author: true } | Prisma 自动批量 | Prisma Optimize |
| Drizzle | 手动审查 / 日志分析 | with: { author: true }(关系 API) | innerJoin() | 查询日志 |
| SQLAlchemy | SQLAlchemy 日志 / echo=True | joinedload() / selectinload() | subqueryload() | pganalyze |
| TypeORM | TypeORM 日志 / logging: true | relations: { author: true } | QueryBuilder JOIN | Datadog DBM |
| Diesel | 编译时关联检查 | inner_join() / left_join() | belonging_to() 批量 | 编译器 |
| Django | Django Debug Toolbar / Silk | select_related() | prefetch_related() | django-silk |
9.3 Django ORM 的 N+1 检测与修复
Django ORM 是 N+1 问题的”重灾区”,因为默认使用 lazy loading。以下是完整的检测和修复方案:
# Django Models
from django.db import models
class Author(models.Model):
name = models.CharField(max_length=100)
email = models.EmailField(unique=True)
class Meta:
db_table = 'authors'
class Post(models.Model):
title = models.CharField(max_length=255)
content = models.TextField()
author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name='posts')
published = models.BooleanField(default=False)
published_at = models.DateTimeField(null=True)
tags = models.ManyToManyField('Tag', related_name='posts')
class Meta:
db_table = 'posts'
class Tag(models.Model):
name = models.CharField(max_length=50, unique=True)
class Meta:
db_table = 'tags'
class Comment(models.Model):
content = models.TextField()
author = models.ForeignKey(Author, on_delete=models.CASCADE)
post = models.ForeignKey(Post, on_delete=models.CASCADE, related_name='comments')
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'comments'# ❌ N+1 问题示例
def get_posts_bad(request):
posts = Post.objects.filter(published=True)[:20]
result = []
for post in posts:
result.append({
'title': post.title,
'author': post.author.name, # N+1:每次访问触发查询
'tags': [t.name for t in post.tags.all()], # N+1:每次访问触发查询
'comment_count': post.comments.count(), # N+1:每次访问触发查询
})
return result
# 总查询:1 + 20 + 20 + 20 = 61 次!
# ✅ 修复方案:select_related + prefetch_related + annotate
from django.db.models import Count, Prefetch
def get_posts_good(request):
posts = (
Post.objects
.filter(published=True)
.select_related('author') # FK 关系:用 JOIN(1 次查询)
.prefetch_related('tags') # M2M 关系:用 SELECT IN(1 次额外查询)
.prefetch_related( # 自定义 prefetch
Prefetch(
'comments',
queryset=Comment.objects.select_related('author'),
)
)
.annotate(comment_count=Count('comments')) # 聚合:在 SQL 层计算
.order_by('-published_at')[:20]
)
result = []
for post in posts:
result.append({
'title': post.title,
'author': post.author.name, # 无额外查询(已 JOIN)
'tags': [t.name for t in post.tags.all()], # 无额外查询(已 prefetch)
'comment_count': post.comment_count, # 无额外查询(已 annotate)
})
return result
# 总查询:3 次(posts JOIN authors + tags IN + comments IN)Django select_related vs prefetch_related 决策
┌─────────────────────────────────────────────────────────────────┐
│ Django 预加载策略选择决策树 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 关系类型? │
│ ├── ForeignKey / OneToOne(多对一/一对一) │
│ │ └── ✅ 使用 select_related() │
│ │ • 生成 SQL JOIN │
│ │ • 单次查询完成 │
│ │ • 适合:author, category, profile │
│ │ │
│ ├── ManyToMany / reverse ForeignKey(多对多/一对多) │
│ │ └── ✅ 使用 prefetch_related() │
│ │ • 生成 SELECT ... WHERE id IN (...) │
│ │ • 两次查询完成 │
│ │ • 适合:tags, comments, posts │
│ │ │
│ └── 需要过滤/排序关联数据? │
│ └── ✅ 使用 Prefetch() 对象 │
│ • 自定义 queryset │
│ • 可以 filter/order_by/select_related │
│ • 适合:最近 5 条评论、已发布的文章 │
│ │
└─────────────────────────────────────────────────────────────────┘9.4 N+1 检测提示词模板
你是一位 ORM 性能专家。请审查以下代码中的 N+1 查询问题。
## ORM 框架
[Prisma / Drizzle / SQLAlchemy / TypeORM / Diesel / Django]
## 代码
[粘贴需要审查的代码]
## 数据模型关系
[描述模型之间的关系:一对一、一对多、多对多]
## 审查要求
1. 识别所有 N+1 查询点(标注具体代码行)
2. 计算当前代码的总查询次数(假设主查询返回 N 条记录)
3. 对每个 N+1 问题提供修复方案:
- 使用该 ORM 的预加载/JOIN 语法
- 修复后的完整代码
- 修复后的预期查询次数
4. 如果有些关联数据不需要预加载(如只在条件分支中使用),请说明
## 输出格式
- 🔴 N+1 问题(标注行号和影响)
- ✅ 修复代码
- 📊 优化前后查询次数对比10. 慢查询分析与 EXPLAIN ANALYZE 深度解读
10.1 慢查询识别流程
┌─────────────────────────────────────────────────────────────────┐
│ 慢查询识别与分析流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ① 启用慢查询日志 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PostgreSQL: │ │
│ │ log_min_duration_statement = 100 -- 记录 >100ms │ │
│ │ log_statement = 'none' │ │
│ │ log_duration = off │ │
│ │ │ │
│ │ MySQL: │ │
│ │ slow_query_log = ON │ │
│ │ long_query_time = 0.1 -- 记录 >100ms │ │
│ │ log_queries_not_using_indexes = ON │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ ▼ │
│ ② 收集慢查询 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • pg_stat_statements(PostgreSQL 扩展) │ │
│ │ • performance_schema(MySQL) │ │
│ │ • pganalyze / Datadog DBM / PlanetScale Insights │ │
│ │ • ORM 查询日志(Prisma logging / SQLAlchemy echo) │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ ▼ │
│ ③ 分析执行计划 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) <query> │ │
│ │ • 识别 Seq Scan(全表扫描) │ │
│ │ • 识别 Nested Loop(嵌套循环 JOIN) │ │
│ │ • 识别 Sort(内存/磁盘排序) │ │
│ │ • 识别 Hash Join vs Merge Join │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ ▼ │
│ ④ AI 辅助优化 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 将 EXPLAIN 输出提供给 AI 编码助手: │ │
│ │ • 索引建议 │ │
│ │ • 查询重写 │ │
│ │ • ORM 代码优化 │ │
│ │ • 连接池调优 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘10.2 PostgreSQL EXPLAIN 关键节点解读
| 节点类型 | 含义 | 性能影响 | 优化方向 |
|---|---|---|---|
| Seq Scan | 全表扫描 | 🔴 大表时极慢 | 添加索引 |
| Index Scan | 索引扫描 + 回表 | 🟢 高效 | 考虑覆盖索引 |
| Index Only Scan | 仅索引扫描(无回表) | 🟢 最高效 | 理想状态 |
| Bitmap Index Scan | 位图索引扫描 | 🟡 中等 | 多条件 OR 查询 |
| Nested Loop | 嵌套循环 JOIN | 🟡 小数据集高效 | 大数据集考虑 Hash Join |
| Hash Join | 哈希 JOIN | 🟢 大数据集高效 | 需要足够 work_mem |
| Merge Join | 归并 JOIN | 🟢 已排序数据高效 | 需要索引排序 |
| Sort | 排序操作 | 🟡 可能溢出磁盘 | 添加排序索引 |
| HashAggregate | 哈希聚合 | 🟡 需要内存 | 调整 work_mem |
| Materialize | 物化子查询 | 🟡 缓存中间结果 | 考虑 CTE 或临时表 |
10.3 EXPLAIN ANALYZE 实战分析示例
-- 场景:博客平台首页查询(获取热门文章 + 作者 + 评论数 + 标签)
EXPLAIN (ANALYZE, BUFFERS, COSTS, FORMAT TEXT)
SELECT
p.id,
p.title,
p.slug,
p.published_at,
u.name AS author_name,
u.avatar_url,
c.name AS category_name,
comment_stats.comment_count,
ARRAY_AGG(DISTINCT t.name) AS tag_names
FROM posts p
INNER JOIN users u ON p.author_id = u.id
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN posts_tags pt ON pt.post_id = p.id
LEFT JOIN tags t ON t.id = pt.tag_id
LEFT JOIN LATERAL (
SELECT COUNT(*) AS comment_count
FROM comments cm
WHERE cm.post_id = p.id
) comment_stats ON true
WHERE p.published = true
AND p.published_at >= NOW() - INTERVAL '30 days'
GROUP BY p.id, p.title, p.slug, p.published_at,
u.name, u.avatar_url, c.name, comment_stats.comment_count
ORDER BY comment_stats.comment_count DESC, p.published_at DESC
LIMIT 20;AI 辅助分析输出示例
## EXPLAIN ANALYZE 分析报告
### 🔴 严重问题
1. **posts 表 Seq Scan**(第 12 行)
- 扫描 100,000 行,过滤后仅保留 2,345 行(97.7% 被丢弃)
- 原因:缺少 (published, published_at) 复合索引
- 修复:
CREATE INDEX CONCURRENTLY idx_posts_published_date
ON posts (published, published_at DESC)
WHERE published = true;
2. **comments 子查询 Seq Scan**(第 18 行)
- 对每篇文章都执行全表扫描计算评论数
- 原因:LATERAL 子查询 + 缺少 post_id 索引
- 修复:
CREATE INDEX CONCURRENTLY idx_comments_post_id
ON comments (post_id);
### 🟡 潜在问题
3. **Sort 操作使用磁盘**(第 5 行)
- Sort Method: external merge Disk: 4096kB
- 原因:work_mem 不足,排序溢出到磁盘
- 修复:SET work_mem = '16MB';(会话级别)
- 或在索引中包含排序列
4. **ARRAY_AGG 聚合**(第 3 行)
- 对每行都执行数组聚合
- 建议:如果标签数量固定,考虑在应用层处理
### 🟢 可接受
5. **users 表 Index Scan**(第 8 行)
- 使用主键索引,效率良好
### 优化后预期
- 执行时间:从 ~450ms 降至 ~15ms
- 查询次数:保持 1 次10.4 慢查询分析提示词模板
你是一位数据库性能调优专家。请帮我分析以下慢查询并提供优化方案。
## 数据库类型
[PostgreSQL / MySQL]
## 慢查询 SQL
[粘贴完整的 SQL 查询]
## EXPLAIN ANALYZE 输出
[粘贴完整的 EXPLAIN ANALYZE 输出]
## 表结构
[粘贴相关表的 CREATE TABLE 语句]
## 现有索引
[粘贴 \di 或 SHOW INDEX 的输出]
## 数据规模
- [表名]:[行数] 行
- 日均查询频率:[次数]
- 当前平均执行时间:[毫秒]
- 目标执行时间:[毫秒]
## 分析要求
1. 逐节点解读执行计划
2. 识别所有性能瓶颈(Seq Scan、磁盘排序、低效 JOIN)
3. 提供索引优化建议(含完整 CREATE INDEX 语句)
4. 提供查询重写建议(如果 SQL 本身可以优化)
5. 评估 ORM 层面的优化可能性
6. 估算优化后的执行时间
## 输出格式
按严重程度排序:🔴 严重 → 🟡 中等 → 🟢 轻微
每个问题包含:问题描述、根因分析、修复方案、预期效果11. 连接池优化
11.1 连接池基础概念
数据库连接是昂贵的资源——每个连接需要 TCP 握手、认证、内存分配。连接池通过复用连接来减少开销。
┌─────────────────────────────────────────────────────────────────┐
│ 连接池工作原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 应用服务器 连接池 数据库 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 请求 1 │──获取连接──→ │ 连接 A ✅ │──复用──→ │ │ │
│ │ 请求 2 │──获取连接──→ │ 连接 B ✅ │──复用──→ │ PostgreSQL│ │
│ │ 请求 3 │──等待... │ 连接 C ✅ │──复用──→ │ │ │
│ │ 请求 4 │──等待... │ (空闲池) │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 关键参数: │
│ • min_connections:最小连接数(预热) │
│ • max_connections:最大连接数(上限) │
│ • idle_timeout:空闲连接超时(回收) │
│ • connection_timeout:获取连接超时(排队等待) │
│ • max_lifetime:连接最大生命周期(防止泄漏) │
│ │
└─────────────────────────────────────────────────────────────────┘11.2 各 ORM 连接池配置
Prisma 连接池
// schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
// Prisma 内置连接池参数通过 URL 配置
// postgresql://user:pass@host:5432/db?connection_limit=20&pool_timeout=10
}// 或使用 Prisma Accelerate(全球边缘连接池)
// schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
directUrl = env("DIRECT_DATABASE_URL") // 直连(迁移用)
}
// Prisma Accelerate 自动管理连接池
// 价格:$0.20/100K 操作(Starter)Drizzle 连接池(使用 node-postgres)
// src/db/index.ts
import { drizzle } from 'drizzle-orm/node-postgres';
import { Pool } from 'pg';
import * as schema from './schema';
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20, // 最大连接数
min: 5, // 最小连接数
idleTimeoutMillis: 30000, // 空闲超时 30s
connectionTimeoutMillis: 5000, // 连接超时 5s
maxUses: 7500, // 每个连接最大使用次数
});
// 监控连接池状态
pool.on('connect', () => console.log('New connection created'));
pool.on('remove', () => console.log('Connection removed'));
pool.on('error', (err) => console.error('Pool error:', err));
export const db = drizzle(pool, { schema });
// 健康检查
export async function checkDbHealth() {
const client = await pool.connect();
try {
await client.query('SELECT 1');
return {
healthy: true,
totalCount: pool.totalCount,
idleCount: pool.idleCount,
waitingCount: pool.waitingCount,
};
} finally {
client.release();
}
}SQLAlchemy 连接池(异步)
# app/db.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost:5432/mydb",
pool_size=20, # 连接池大小
max_overflow=10, # 超出 pool_size 的最大溢出连接数
pool_timeout=30, # 获取连接超时(秒)
pool_recycle=3600, # 连接回收时间(秒,防止数据库端超时)
pool_pre_ping=True, # 使用前 ping 检查连接是否存活
echo=False, # 生产环境关闭 SQL 日志
)
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
# 依赖注入(FastAPI)
async def get_session() -> AsyncSession:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise外部连接池:PgBouncer
对于高并发场景,推荐在应用和数据库之间部署 PgBouncer 作为外部连接池:
# pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
# 连接池模式
pool_mode = transaction # 事务级复用(推荐)
# pool_mode = session # 会话级复用
# pool_mode = statement # 语句级复用(限制最多)
# 连接池参数
default_pool_size = 25 # 每个用户/数据库对的默认连接数
min_pool_size = 5 # 最小连接数
max_client_conn = 200 # 最大客户端连接数
max_db_connections = 50 # 最大数据库连接数
# 超时设置
server_idle_timeout = 600 # 服务端空闲超时
client_idle_timeout = 0 # 客户端空闲超时(0=不限)
query_timeout = 30 # 查询超时11.3 连接池参数调优指南
| 参数 | 推荐值 | 计算公式 | 说明 |
|---|---|---|---|
| max_connections | 20-50 | CPU 核数 × 2 + 磁盘数 | PostgreSQL 官方建议 |
| pool_size | 10-25 | max_connections / 应用实例数 | 每个应用实例的连接数 |
| max_overflow | 5-10 | pool_size × 0.5 | 突发流量缓冲 |
| idle_timeout | 30-300s | 根据流量模式 | 低流量可设短,高流量设长 |
| connection_timeout | 5-10s | 根据 SLA | 超时后返回错误而非无限等待 |
| max_lifetime | 1800-3600s | 根据数据库配置 | 防止连接泄漏 |
连接池优化提示词模板
你是一位数据库连接池调优专家。请根据以下信息优化连接池配置。
## 应用信息
- 框架:[Next.js / FastAPI / NestJS / Actix-web]
- ORM:[Prisma / Drizzle / SQLAlchemy / TypeORM / Diesel]
- 部署方式:[单实例 / 多实例(N 个)/ Serverless]
- 数据库:[PostgreSQL / MySQL],最大连接数:[数量]
## 流量特征
- 日均请求量:[数量]
- 峰值 QPS:[数量]
- 平均查询耗时:[毫秒]
- 长查询比例:[百分比]
## 当前配置
[粘贴当前连接池配置]
## 当前问题
[描述遇到的问题:连接超时、连接耗尽、空闲连接过多等]
## 优化要求
1. 提供优化后的连接池参数
2. 解释每个参数的选择理由
3. 是否需要外部连接池(PgBouncer)
4. Serverless 场景的特殊处理
5. 监控指标和告警阈值建议12. 查询重写与 AI 辅助优化模式
12.1 常见查询重写模式
模式 1:子查询 → JOIN
-- ❌ 低效:相关子查询(每行执行一次子查询)
SELECT p.id, p.title,
(SELECT COUNT(*) FROM comments c WHERE c.post_id = p.id) AS comment_count
FROM posts p
WHERE p.published = true;
-- ✅ 优化:LEFT JOIN + GROUP BY
SELECT p.id, p.title, COUNT(c.id) AS comment_count
FROM posts p
LEFT JOIN comments c ON c.post_id = p.id
WHERE p.published = true
GROUP BY p.id, p.title;
-- ✅ 更优:使用窗口函数(如果需要保留所有列)
SELECT p.*, COUNT(c.id) OVER (PARTITION BY p.id) AS comment_count
FROM posts p
LEFT JOIN comments c ON c.post_id = p.id
WHERE p.published = true;模式 2:EXISTS 替代 IN
-- ❌ 低效:IN 子查询(大数据集时性能差)
SELECT * FROM users
WHERE id IN (
SELECT author_id FROM posts
WHERE published = true AND published_at >= '2025-01-01'
);
-- ✅ 优化:EXISTS(短路求值,找到即停止)
SELECT * FROM users u
WHERE EXISTS (
SELECT 1 FROM posts p
WHERE p.author_id = u.id
AND p.published = true
AND p.published_at >= '2025-01-01'
);模式 3:分页优化(Keyset Pagination)
-- ❌ 低效:OFFSET 分页(深页性能差)
SELECT * FROM posts
WHERE published = true
ORDER BY published_at DESC
LIMIT 20 OFFSET 10000;
-- OFFSET 10000 需要扫描并丢弃 10000 行!
-- ✅ 优化:Keyset 分页(游标分页)
SELECT * FROM posts
WHERE published = true
AND (published_at, id) < ('2025-06-01T12:00:00', 'last-seen-id')
ORDER BY published_at DESC, id DESC
LIMIT 20;
-- 直接从上次位置开始,无需扫描前面的行模式 4:批量操作优化
-- ❌ 低效:逐条插入
INSERT INTO tags (name) VALUES ('rust');
INSERT INTO tags (name) VALUES ('python');
INSERT INTO tags (name) VALUES ('typescript');
-- 3 次网络往返
-- ✅ 优化:批量插入
INSERT INTO tags (name) VALUES
('rust'),
('python'),
('typescript');
-- 1 次网络往返
-- ✅ 更优:COPY 命令(PostgreSQL,大批量数据)
COPY tags (name) FROM STDIN WITH (FORMAT csv);
rust
python
typescript
\.模式 5:条件聚合替代多次查询
-- ❌ 低效:多次查询获取不同状态的计数
SELECT COUNT(*) FROM orders WHERE status = 'pending';
SELECT COUNT(*) FROM orders WHERE status = 'completed';
SELECT COUNT(*) FROM orders WHERE status = 'cancelled';
-- 3 次查询
-- ✅ 优化:条件聚合(1 次查询)
SELECT
COUNT(*) FILTER (WHERE status = 'pending') AS pending_count,
COUNT(*) FILTER (WHERE status = 'completed') AS completed_count,
COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled_count
FROM orders;
-- PostgreSQL 语法
-- MySQL 等效写法
SELECT
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) AS pending_count,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) AS completed_count,
SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) AS cancelled_count
FROM orders;12.2 ORM 层面的查询重写
// Prisma:使用 groupBy 替代多次 count
// ❌ 低效:3 次查询
const pending = await prisma.order.count({ where: { status: 'PENDING' } });
const completed = await prisma.order.count({ where: { status: 'COMPLETED' } });
const cancelled = await prisma.order.count({ where: { status: 'CANCELLED' } });
// ✅ 优化:1 次查询
const stats = await prisma.order.groupBy({
by: ['status'],
_count: { id: true },
});
// 返回:[{ status: 'PENDING', _count: { id: 150 } }, ...]# SQLAlchemy:使用 case 表达式替代多次查询
from sqlalchemy import select, func, case
# ❌ 低效:3 次查询
pending = await session.scalar(
select(func.count()).where(Order.status == 'pending')
)
completed = await session.scalar(
select(func.count()).where(Order.status == 'completed')
)
# ✅ 优化:1 次查询
stmt = select(
func.count().filter(Order.status == 'pending').label('pending'),
func.count().filter(Order.status == 'completed').label('completed'),
func.count().filter(Order.status == 'cancelled').label('cancelled'),
)
result = await session.execute(stmt)
stats = result.one()13. Kiro Hooks 自动化查询审查
13.1 配置 Kiro Hooks 自动检测查询问题
Kiro 的 Hooks 系统可以在开发过程中自动检测 ORM 查询的常见问题:
Hook:检测 N+1 查询模式
# .kiro/hooks/detect-n-plus-one.yaml
name: "N+1 查询检测"
description: "检测 ORM 代码中的 N+1 查询模式"
trigger:
event: fileEdited
patterns:
- "**/*.repository.ts"
- "**/*.service.ts"
- "**/repositories/**/*.ts"
- "**/repositories/**/*.py"
action:
type: askAgent
prompt: |
请审查修改的文件中是否存在 N+1 查询问题。
检查以下模式:
1. 循环中调用 ORM 查询(for/map/forEach 中的 findUnique/findFirst)
2. Promise.all 中的重复查询
3. 缺少 include/select/joinedload 的关联数据访问
4. Django 中缺少 select_related/prefetch_related
如果发现问题,提供具体的修复建议。
如果没有问题,简短确认即可。Hook:审查新增索引
# .kiro/hooks/review-index-changes.yaml
name: "索引变更审查"
description: "审查迁移文件中的索引变更"
trigger:
event: fileCreated
patterns:
- "**/migrations/**/*.sql"
- "**/migrations/**/*.ts"
- "**/migrations/**/*.py"
action:
type: askAgent
prompt: |
请审查新创建的迁移文件中的索引变更。
检查以下问题:
1. CREATE INDEX 是否使用了 CONCURRENTLY(PostgreSQL)?
2. 复合索引的列顺序是否合理(等值条件在前,范围条件在后)?
3. 是否有冗余索引(新索引是否被现有索引覆盖)?
4. 索引是否会显著影响写入性能?
5. 是否考虑了部分索引(WHERE 子句)的适用性?
提供具体的改进建议。Hook:Prisma Schema 变更审查
# .kiro/hooks/prisma-schema-review.yaml
name: "Prisma Schema 审查"
description: "审查 Prisma Schema 变更的查询性能影响"
trigger:
event: fileEdited
patterns:
- "**/schema.prisma"
action:
type: askAgent
prompt: |
请审查 Prisma Schema 的变更,关注查询性能影响。
检查以下问题:
1. 新增的关系是否需要索引?
2. 是否缺少常用查询字段的索引(@@index)?
3. 多对多关系是否使用了隐式关系表(性能影响)?
4. 是否有不必要的 @unique 约束(每个 unique 都会创建索引)?
5. 枚举类型是否合理(vs 关联表)?
提供具体的优化建议。14. ORM 模型代码生成提示词模板库
14.1 通用 ORM 模型生成提示词
你是一位 [Prisma/Drizzle/SQLAlchemy/TypeORM/Diesel] 专家。
请根据以下业务需求生成完整的 ORM 模型代码。
## 业务需求
[描述业务实体和关系]
## 技术要求
- 数据库:[PostgreSQL/MySQL/SQLite]
- ORM:[具体 ORM 名称和版本]
- 语言:[TypeScript/Python/Rust]
## 代码规范
1. 使用 UUID 作为主键
2. 所有表包含 created_at 和 updated_at 时间戳
3. 字符串字段指定合理的长度限制
4. 外键关系指定 ON DELETE 行为
5. 为常用查询字段添加索引
6. 使用 snake_case 数据库列名,camelCase 代码属性名
7. 包含完整的类型定义
## 输出要求
1. 完整的模型/Schema 定义代码
2. 关系定义(一对一、一对多、多对多)
3. 建议的索引列表及理由
4. 基础 CRUD 操作示例
5. 常见查询模式示例(分页、搜索、关联查询)14.2 查询优化审查提示词
你是一位数据库查询优化专家。请审查以下 ORM 查询代码的性能。
## ORM 框架
[Prisma/Drizzle/SQLAlchemy/TypeORM/Diesel/Django]
## 代码
[粘贴需要审查的查询代码]
## 数据模型
[粘贴 Schema/Model 定义]
## 数据规模
[描述各表的数据量]
## 审查维度
1. **N+1 检测**:是否存在循环查询?
2. **索引使用**:查询条件是否有索引支持?
3. **数据获取**:是否获取了不必要的字段?
4. **分页**:大列表是否有分页?
5. **事务**:写操作是否使用了事务?
6. **并发**:是否有竞态条件风险?
7. **缓存**:是否有适合缓存的查询?
## 输出
- 问题列表(按严重程度排序)
- 每个问题的修复代码
- 优化前后的查询次数对比
- 建议的索引变更14.3 从 SQL 到 ORM 代码转换提示词
你是一位 ORM 专家。请将以下 SQL 查询转换为 [ORM 名称] 代码。
## 原始 SQL
[粘贴 SQL 查询]
## 目标 ORM
[Prisma/Drizzle/SQLAlchemy/TypeORM/Diesel]
## 已有模型定义
[粘贴现有的 ORM 模型/Schema]
## 转换要求
1. 保持查询语义完全一致
2. 利用 ORM 的类型安全特性
3. 使用 ORM 的关系查询 API(而非原生 SQL)
4. 如果 ORM 无法表达某些操作,使用原生查询并说明原因
5. 确保转换后的查询性能不低于原始 SQL
## 输出
1. 转换后的 ORM 代码
2. 类型定义(如果需要)
3. 性能对比说明
4. 无法用 ORM 表达的部分(如果有)实战案例:电商平台查询优化全流程
案例背景
一个中型电商平台(日活 5 万用户,商品 10 万+,订单 500 万+)使用 Prisma + PostgreSQL,首页加载时间从 200ms 恶化到 2.5s。
步骤 1:使用 Prisma Optimize 识别问题
// 集成 Prisma Optimize
import { PrismaClient } from '@prisma/client';
import { withOptimize } from '@prisma/extension-optimize';
const prisma = new PrismaClient().$extends(
withOptimize({ apiKey: process.env.PRISMA_OPTIMIZE_KEY! })
);
// Prisma Optimize 报告发现以下问题:
// 🔴 N+1 查询:首页商品列表查询触发 50+ 次额外查询
// 🔴 缺失索引:products 表的 category_id + is_active 缺少复合索引
// 🟡 全表扫描:热门商品查询扫描全表
// 🟡 不必要的数据获取:返回了商品的完整描述(TEXT 字段)步骤 2:修复 N+1 查询
// ❌ 原始代码(首页商品列表)
async function getHomepageProducts() {
const products = await prisma.product.findMany({
where: { isActive: true },
take: 50,
orderBy: { salesCount: 'desc' },
});
// N+1:每个商品单独查询分类、图片、评价统计
return Promise.all(products.map(async (product) => ({
...product,
category: await prisma.category.findUnique({
where: { id: product.categoryId },
}),
images: await prisma.productImage.findMany({
where: { productId: product.id },
take: 3,
}),
rating: await prisma.review.aggregate({
where: { productId: product.id },
_avg: { rating: true },
_count: true,
}),
})));
}
// 总查询:1 + 50 + 50 + 50 = 151 次!
// ✅ 优化后
async function getHomepageProductsOptimized() {
return prisma.product.findMany({
where: { isActive: true },
take: 50,
orderBy: { salesCount: 'desc' },
select: {
id: true,
name: true,
slug: true,
price: true,
originalPrice: true,
salesCount: true,
// 不返回 description(大 TEXT 字段)
category: {
select: { id: true, name: true, slug: true },
},
images: {
select: { url: true, alt: true },
take: 3,
orderBy: { sortOrder: 'asc' },
},
_count: {
select: { reviews: true },
},
},
});
}
// 总查询:Prisma 自动优化为 2-3 次批量查询步骤 3:添加索引
-- 基于 Prisma Optimize 和 EXPLAIN ANALYZE 的建议
-- 1. 商品列表查询索引
CREATE INDEX CONCURRENTLY idx_products_active_sales
ON products (is_active, sales_count DESC)
WHERE is_active = true;
-- 2. 分类筛选索引
CREATE INDEX CONCURRENTLY idx_products_category_active
ON products (category_id, is_active)
WHERE is_active = true;
-- 3. 商品图片查询索引
CREATE INDEX CONCURRENTLY idx_product_images_product_sort
ON product_images (product_id, sort_order);
-- 4. 评价统计索引
CREATE INDEX CONCURRENTLY idx_reviews_product_rating
ON reviews (product_id, rating);步骤 4:验证优化效果
-- 优化前
EXPLAIN ANALYZE SELECT ... -- Execution Time: 2,345.678 ms
-- 优化后
EXPLAIN ANALYZE SELECT ... -- Execution Time: 12.345 ms
-- 性能提升:190 倍!案例分析
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 数据库查询次数 | 151 次 | 3 次 | 50x |
| 平均查询耗时 | 2,345ms | 12ms | 190x |
| 首页加载时间 | 2.5s | 180ms | 14x |
| 数据库 CPU 使用率 | 85% | 15% | 5.7x |
| 网络传输量 | 4.2MB | 0.3MB | 14x |
关键决策点:
- 使用 Prisma Optimize 快速定位问题,而非手动分析日志
- 优先修复 N+1 查询(影响最大),再优化索引
- 使用
select精确控制返回字段,减少网络传输 - 使用部分索引(
WHERE is_active = true)减少索引大小 - 使用
CONCURRENTLY创建索引,避免锁表影响线上服务
避坑指南
❌ 常见错误
-
盲目添加索引
- 问题:为每个查询条件都创建索引,导致写入性能下降 50%+,存储空间翻倍
- 正确做法:基于 EXPLAIN ANALYZE 和实际查询频率决定索引策略,定期清理未使用的索引(
pg_stat_user_indexes中idx_scan = 0的索引)
-
忽略 N+1 查询
- 问题:开发环境数据少时感知不到,上线后数据量增长导致性能急剧下降
- 正确做法:在开发阶段就启用查询日志或 Prisma Optimize,CI 中集成 N+1 检测
-
在生产环境使用 ORM 的 synchronize/auto-migrate
- 问题:TypeORM 的
synchronize: true或 Django 的migrate在启动时自动修改 Schema,可能导致数据丢失 - 正确做法:生产环境只使用版本化的迁移文件,禁用自动同步
- 问题:TypeORM 的
-
过度使用原生 SQL
- 问题:绕过 ORM 的类型安全和参数化查询,引入 SQL 注入风险
- 正确做法:优先使用 ORM 查询 API,只在 ORM 无法表达时使用参数化的原生查询
-
连接池配置不当
- 问题:连接数设置过高导致数据库内存耗尽,或设置过低导致请求排队超时
- 正确做法:根据公式
CPU 核数 × 2 + 磁盘数设置数据库最大连接数,应用连接池大小 = 最大连接数 / 应用实例数
-
忽略查询返回的数据量
- 问题:使用
SELECT *或 ORM 默认返回所有字段,包括大 TEXT/BLOB 字段 - 正确做法:使用
select(Prisma)、load_only(SQLAlchemy)、columns(Drizzle)精确控制返回字段
- 问题:使用
-
OFFSET 分页在深页时性能崩溃
- 问题:
OFFSET 100000需要扫描并丢弃 10 万行数据 - 正确做法:使用 Keyset 分页(游标分页),基于上一页最后一条记录的排序键定位
- 问题:
-
AI 生成的查询未经审查直接上线
- 问题:AI 可能生成看似正确但性能极差的查询(如不必要的子查询、错误的 JOIN 顺序)
- 正确做法:所有 AI 生成的查询必须经过 EXPLAIN ANALYZE 验证,在测试环境用生产级数据量测试
✅ 最佳实践
- 开发阶段就启用查询监控:集成 Prisma Optimize、Django Debug Toolbar 或 SQLAlchemy echo,在开发时就发现性能问题
- 建立查询性能基线:记录关键查询的 P50/P95/P99 延迟,设置告警阈值
- 索引策略文档化:在 Schema 注释或专门文档中记录每个索引的用途和对应的查询
- 定期审查慢查询日志:每周审查 Top 10 慢查询,持续优化
- 使用 AI 辅助但人工审查:让 AI 生成初始查询和索引建议,但必须经过 EXPLAIN ANALYZE 验证
- 连接池监控:监控连接池使用率、等待队列长度、连接创建/销毁频率
- Keyset 分页作为默认:除非业务需要跳页,否则默认使用 Keyset 分页
相关资源与延伸阅读
- Prisma Optimize 官方文档 — Prisma AI 驱动查询优化工具的完整使用指南
- Drizzle ORM 官方文档 — Drizzle 查询 API 和性能优化最佳实践
- SQLAlchemy 2.0 文档 - Loading Relationships — SQLAlchemy 关系加载策略详解
- pganalyze Query Advisor — PostgreSQL AI 驱动的查询优化顾问
- PlanetScale AI Index Suggestions — AI 驱动的 PostgreSQL 索引建议功能
- AI2sql 查询优化指南 — AI 辅助 SQL 查询优化完整指南
- Diesel ORM 官方文档 — Rust Diesel ORM 查询构建和性能指南
- Use The Index, Luke — SQL 索引设计经典教程,覆盖 B-Tree 原理和优化策略
- TypeORM QueryBuilder 文档 — TypeORM 高级查询构建器使用指南
- PostgreSQL EXPLAIN 可视化工具 - explain.dalibo.com — 在线 EXPLAIN ANALYZE 可视化分析工具
参考来源
- Prisma Optimize GA 发布公告 (2024-10)
- PlanetScale AI-Powered Postgres Index Suggestions (2025-05)
- pganalyze Query Advisor GA (2025-05)
- AI2sql: How AI is Transforming SQL Query Optimization in 2025 (2025-06)
- Bytebase: Top TypeScript ORM 2025 (2025-05)
- Bytebase: Drizzle vs Prisma 2025 (2025-05)
- Drizzle ORM Benchmarks (2025)
- Datadog AI-Driven Observability (2025-05)
- Diesel ORM 官方文档 (2025)
- Top AI Tools for Database Query Optimization (2025-06)
📖 返回 总览与导航 | 上一节:29c-迁移文件生成与审查 | 下一节:29e-数据库Prompt模板与反模式