Static analysis tool that reads your ORM query patterns and predicts which database columns are missing indexes — before a slow query alert fires in production. Works by counting how often each column appears in .filter(), .where(), .order_by(), and JOIN conditions across your entire codebase, then cross-referencing model definitions to suppress columns already indexed.
User.objects.filter(email=email) running 1,000× per minute causes full table scansEXPLAIN ANALYZE only catches issues after the fact| Access Pattern | Why It Matters |
|---|---|
| --------------- | ---------------- |
| WHERE / filter() | Full table scan without index — O(n) per query |
| ORDER BY / order_by() | Sort without index reads all rows then sorts in memory |
| JOIN ON column | Nested-loop join without index is O(n²) |
| UNIQUE constraint candidates | Columns with unique=True queries need unique indexes |
| ORM | Language | Patterns Detected |
|---|---|---|
| ----- | ---------- | ------------------- |
| Django ORM | Python | .filter(col=), .get(col=), .exclude(col=), .order_by('col'), Meta.ordering |
| SQLAlchemy | Python | .filter(Model.col ==), .filter_by(col=), .order_by(col), join(Model, on=) |
| Peewee | Python | .where(Model.col ==), .order_by(Model.col) |
| TypeORM | TypeScript | .where("t.col = :val"), findBy({col:}), .orderBy("t.col"), @JoinColumn({name: 'col'}) |
| Prisma | TypeScript | where: { col: }, orderBy: { col: }, include: { relation: } |
| Sequelize | TypeScript/JS | where: { col: }, order: [['col', 'ASC']] |
| GORM | Go | .Where("col = ?"), .Order("col"), .Joins("JOIN ... ON col") |
| ActiveRecord | Ruby | .where(col:), .find_by(col:), .order(:col), .joins() |
The scanner reads existing index definitions so it doesn't recommend indexes that already exist:
| ORM | Where Indexes Are Found |
|---|---|
| ----- | ------------------------ |
| Django | db_index=True on field, Meta.indexes, Meta.unique_together |
| SQLAlchemy | Column(index=True), Column(unique=True), Index(...) objects |
| TypeORM | @Index() decorator, @Column({index: true}), @Unique() |
| Prisma | @@index([col]), @@unique([col]), @unique on field |
| GORM | gorm:"index", gorm:"uniqueIndex" struct tags |
| ActiveRecord | add_index in migrations, index: true in column definition |
| SQL migrations | CREATE INDEX, CREATE UNIQUE INDEX statements |
#!/usr/bin/env python3
"""
phy-db-index-advisor — ORM query pattern analyzer for missing indexes
Usage: python3 advisor.py [path] [--json] [--min-count N]
"""
import argparse
import json
import os
import re
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# ─── Data structures ─────────────────────────────────────────────────────────
@dataclass
class QueryHit:
file: str
line: int
pattern: str
orm: str
access_type: str # WHERE, ORDER_BY, JOIN
@dataclass
class ColumnReport:
table_hint: str # Guessed model/table name
column: str
where_count: int = 0
order_count: int = 0
join_count: int = 0
files: set = field(default_factory=set)
hits: list = field(default_factory=list)
already_indexed: bool = False
@property
def total_count(self) -> int:
return self.where_count + self.order_count + self.join_count
@property
def priority(self) -> str:
if self.already_indexed:
return "INDEXED"
if self.where_count >= 10 or self.total_count >= 15:
return "CRITICAL"
if self.where_count >= 5 or self.total_count >= 8:
return "HIGH"
if self.total_count >= 3:
return "MEDIUM"
return "LOW"
# ─── Query pattern registry ───────────────────────────────────────────────────
# (orm_name, access_type, regex, model_group_idx, col_group_idx)
QUERY_PATTERNS = [
# ── Django ORM ──
("Django", "WHERE",
re.compile(r'\.(?:filter|get|exclude|count|exists)\s*\([^)]*?(\w+)__?\w*\s*='),
None, 1),
("Django", "WHERE",
re.compile(r'\.(?:filter|get|exclude)\s*\(\s*(\w+)\s*='),
None, 1),
("Django", "ORDER_BY",
re.compile(r'\.order_by\s*\(\s*[\'"-](\w+)[\'"]\s*\)'),
None, 1),
("Django", "ORDER_BY",
re.compile(r'ordering\s*=\s*\[[^\]]*?[\'"](\w+)[\'"]'),
None, 1),
# ── SQLAlchemy ──
("SQLAlchemy", "WHERE",
re.compile(r'\.filter\s*\(\s*(\w+)\.(\w+)\s*=='),
1, 2),
("SQLAlchemy", "WHERE",
re.compile(r'\.filter_by\s*\([^)]*?(\w+)\s*='),
None, 1),
("SQLAlchemy", "ORDER_BY",
re.compile(r'\.order_by\s*\(\s*(\w+)\.(\w+)'),
1, 2),
("SQLAlchemy", "ORDER_BY",
re.compile(r'\.order_by\s*\(\s*(?:asc|desc)\s*\(\s*(\w+)\.(\w+)'),
1, 2),
# ── TypeORM ──
("TypeORM", "WHERE",
re.compile(r'where\s*:\s*\{[^}]*?(\w+)\s*:'),
None, 1),
("TypeORM", "WHERE",
re.compile(r'\.where\s*\(\s*[\'"`](?:\w+\.)?(\w+)\s*(?:=|LIKE|IN|>|<)'),
None, 1),
("TypeORM", "ORDER_BY",
re.compile(r'\.orderBy\s*\(\s*[\'"`](?:\w+\.)?(\w+)[\'"`]'),
None, 1),
("TypeORM", "ORDER_BY",
re.compile(r'orderBy\s*:\s*\{[^}]*?(\w+)\s*:'),
None, 1),
# ── Prisma ──
("Prisma", "WHERE",
re.compile(r'where\s*:\s*\{[^}]*?(\w+)\s*:'),
None, 1),
("Prisma", "ORDER_BY",
re.compile(r'orderBy\s*:\s*\{[^}]*?(\w+)\s*:'),
None, 1),
# ── Sequelize ──
("Sequelize", "WHERE",
re.compile(r'where\s*:\s*\{[^}]*?(\w+)\s*:'),
None, 1),
("Sequelize", "ORDER_BY",
re.compile(r'order\s*:\s*\[\s*\[\s*[\'"`](\w+)[\'"`]'),
None, 1),
# ── GORM ──
("GORM", "WHERE",
re.compile(r'\.(?:Where|Find|First|Last)\s*\([^,)]*?[\'"`](?:\w+\.)?(\w+)\s*(?:=|LIKE|IN|>|<|\?)'),
None, 1),
("GORM", "ORDER_BY",
re.compile(r'\.Order\s*\(\s*[\'"`](\w+)'),
None, 1),
("GORM", "JOIN",
re.compile(r'\.Joins\s*\([^)]*?ON\s+\w+\.(\w+)\s*=\s*\w+\.(\w+)'),
None, 1),
# ── ActiveRecord (Ruby) ──
("ActiveRecord", "WHERE",
re.compile(r'\.where\s*\(\s*(\w+):\s*'),
None, 1),
("ActiveRecord", "WHERE",
re.compile(r'\.find_by\s*\(\s*(\w+):\s*'),
None, 1),
("ActiveRecord", "ORDER_BY",
re.compile(r'\.order\s*\(\s*:(\w+)\s*\)'),
None, 1),
("ActiveRecord", "ORDER_BY",
re.compile(r'\.order\s*\(\s*[\'"](\w+)'),
None, 1),
]
# ─── Existing index detection ─────────────────────────────────────────────────
EXISTING_INDEX_PATTERNS = [
# Django
re.compile(r'(\w+)\s*=\s*\w+Field\s*\([^)]*\bdb_index\s*=\s*True'),
re.compile(r'(\w+)\s*=\s*\w+Field\s*\([^)]*\bunique\s*=\s*True'),
re.compile(r'models\.Index\s*\(\s*fields\s*=\s*\[([^\]]+)\]'),
# SQLAlchemy
re.compile(r'Column\s*\([^)]*\bindex\s*=\s*True[^)]*\).*?#.*?(\w+)'),
re.compile(r'(\w+)\s*=\s*Column\s*\([^)]*\bindex\s*=\s*True'),
re.compile(r'(\w+)\s*=\s*Column\s*\([^)]*\bunique\s*=\s*True'),
re.compile(r'Index\s*\(\s*[\'"`]\w+[\'"`]\s*,\s*\w+\.(\w+)'),
# TypeORM
re.compile(r'@(?:Index|Unique|Column)\s*\([^)]*\bindex\s*:\s*true'),
re.compile(r'@Column\s*\([^)]*\bunique\s*:\s*true[^)]*\)\s*\w+\s*:\s*\w+\s*(\w+)'),
# Prisma
re.compile(r'@@index\s*\(\s*\[([^\]]+)\]'),
re.compile(r'@@unique\s*\(\s*\[([^\]]+)\]'),
re.compile(r'(\w+)\s+\w+\s+@unique'),
# GORM
re.compile(r'(\w+)\s+\w+\s+`[^`]*gorm:"[^"]*(?:index|uniqueIndex)[^"]*"`'),
# SQL migrations
re.compile(r'CREATE\s+(?:UNIQUE\s+)?INDEX\s+\w+\s+ON\s+\w+\s*\(([^)]+)\)', re.IGNORECASE),
re.compile(r'add_index\s+:\w+\s*,\s*:(\w+)'), # ActiveRecord migration
]
# Columns to always skip (noise)
SKIP_COLUMNS = {
"id", "pk", "uuid", "created_at", "updated_at", "deleted_at",
"created_by", "updated_by", "None", "null", "true", "false",
"True", "False", "self", "cls", "this", "kwargs", "args",
}
SKIP_DIRS = {".git", "node_modules", "vendor", "__pycache__", ".venv", "venv",
"dist", "build", "target", "migrations", "alembic"}
FILE_EXTS = {".py", ".ts", ".js", ".rb", ".go"}
def collect_existing_indexes(root: Path) -> set[str]:
"""Return set of column names already indexed."""
indexed = set()
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
for fname in filenames:
fpath = Path(dirpath) / fname
if fpath.suffix.lower() not in FILE_EXTS | {".sql", ".rb"}:
continue
try:
text = fpath.read_text(encoding="utf-8", errors="replace")
except OSError:
continue
for pat in EXISTING_INDEX_PATTERNS:
for m in pat.finditer(text):
for grp in m.groups():
if grp:
for col in re.split(r'[\s,\'"`]+', grp):
col = col.strip().strip('"\'`')
if col:
indexed.add(col.lower())
return indexed
def scan_queries(root: Path) -> dict[str, ColumnReport]:
"""Scan all source files and collect query patterns per column."""
col_reports: dict[str, ColumnReport] = {}
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
for fname in filenames:
fpath = Path(dirpath) / fname
if fpath.suffix.lower() not in FILE_EXTS:
continue
# Skip test files
if any(x in fname.lower() for x in ("test", "spec", "mock", "fixture")):
continue
try:
lines = fpath.read_text(encoding="utf-8", errors="replace").splitlines()
except OSError:
continue
full_text = "\n".join(lines)
rel_path = os.path.relpath(str(fpath))
for (orm, access_type, pat, model_grp, col_grp) in QUERY_PATTERNS:
for m in pat.finditer(full_text):
try:
col = m.group(col_grp)
except IndexError:
continue
if not col or col.lower() in SKIP_COLUMNS:
continue
if len(col) < 2 or not col.replace("_", "").isalpha():
continue
model = None
if model_grp:
try:
model = m.group(model_grp)
except IndexError:
pass
lineno = full_text[:m.start()].count("\n") + 1
key = col.lower()
if key not in col_reports:
col_reports[key] = ColumnReport(
table_hint=model or "",
column=col,
)
report = col_reports[key]
if model and not report.table_hint:
report.table_hint = model
report.files.add(rel_path)
report.hits.append(QueryHit(rel_path, lineno, m.group(0)[:80], orm, access_type))
if access_type == "WHERE":
report.where_count += 1
elif access_type == "ORDER_BY":
report.order_count += 1
elif access_type == "JOIN":
report.join_count += 1
return col_reports
def format_report(reports: list[ColumnReport], existing_indexes: set[str]) -> str:
# Mark already-indexed
for r in reports:
if r.column.lower() in existing_indexes:
r.already_indexed = True
# Filter to only non-indexed, min 3 total hits
actionable = [r for r in reports if not r.already_indexed and r.total_count >= 3]
actionable.sort(key=lambda x: x.total_count, reverse=True)
priority_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
actionable.sort(key=lambda x: priority_order.get(x.priority, 4))
already_indexed = [r for r in reports if r.already_indexed and r.total_count >= 3]
lines = [
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
" DB INDEX ADVISOR — Missing Index Analysis",
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
f" Columns queried: {len(reports)}",
f" Missing indexes: {len(actionable)} ({sum(1 for r in actionable if r.priority=='CRITICAL')} CRITICAL)",
f" Already indexed: {len(already_indexed)} (suppressed)",
"",
]
icons = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "⚪"}
current_priority = None
for r in actionable:
p = r.priority
if p != current_priority:
current_priority = p
lines.append(f"\n{icons.get(p, '⚪')} {p}")
lines.append("")
table = r.table_hint or "?"
breakdown = []
if r.where_count:
breakdown.append(f"WHERE×{r.where_count}")
if r.order_count:
breakdown.append(f"ORDER_BY×{r.order_count}")
if r.join_count:
breakdown.append(f"JOIN×{r.join_count}")
# Show top 3 call sites
sample_files = sorted(r.files)[:3]
samples_str = ", ".join(sample_files)
if len(r.files) > 3:
samples_str += f" (+{len(r.files)-3} more)"
lines += [
f" {table}.{r.column} [{' | '.join(breakdown)}] across {len(r.files)} file(s)",
f" Files: {samples_str}",
f" SQL: CREATE INDEX idx_{table.lower()}_{r.column.lower()} ON {table.lower()} ({r.column});",
f" Django: {r.column} = models.{r.column.title()}Field(..., db_index=True)",
f" SQLAlchemy: {r.column} = Column(String, index=True)",
f" Prisma: @@index([{r.column}])",
"",
]
if not actionable:
lines.append(" ✅ No missing indexes detected (all queried columns are already indexed)")
lines.append("")
if already_indexed:
lines.append(f" ✅ Already indexed ({len(already_indexed)} columns): "
+ ", ".join(r.column for r in already_indexed[:8])
+ ("..." if len(already_indexed) > 8 else ""))
lines.append("")
critical_count = sum(1 for r in actionable if r.priority == "CRITICAL")
lines += [
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
f" CI gate: {'exit 1 — missing critical indexes' if critical_count else 'exit 0'}",
" Runtime verification: EXPLAIN ANALYZE your most frequent queries",
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
]
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="DB index advisor — finds missing indexes from ORM patterns")
parser.add_argument("path", nargs="?", default=".", help="Root directory to scan")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--min-count", type=int, default=3,
help="Minimum query count to report (default: 3)")
parser.add_argument("--ci", action="store_true", help="Exit 1 if CRITICAL indexes missing")
args = parser.parse_args()
root = Path(args.path).resolve()
existing_indexes = collect_existing_indexes(root)
col_reports_dict = scan_queries(root)
reports = list(col_reports_dict.values())
for r in reports:
if r.column.lower() in existing_indexes:
r.already_indexed = True
actionable = sorted(
[r for r in reports if not r.already_indexed and r.total_count >= args.min_count],
key=lambda x: x.total_count,
reverse=True,
)
if args.json:
import dataclasses
output = []
for r in actionable:
d = dataclasses.asdict(r)
d["priority"] = r.priority
d["total_count"] = r.total_count
d["files"] = list(r.files)
output.append(d)
print(json.dumps(output, indent=2))
else:
print(format_report(reports, existing_indexes))
if args.ci:
has_critical = any(r.priority == "CRITICAL" for r in actionable)
sys.exit(1 if has_critical else 0)
if __name__ == "__main__":
main()
# Scan current project
python3 advisor.py
# Scan a specific path
python3 advisor.py ~/projects/myapp
# Only show columns queried 5+ times
python3 advisor.py --min-count 5
# CI fail-gate (exits 1 if CRITICAL missing indexes found)
python3 advisor.py --ci
# JSON output for dashboard/ticketing
python3 advisor.py --json | jq '[.[] | select(.priority == "CRITICAL")]'
# GitHub Actions
- name: DB Index Advisor
run: python3 .claude/skills/phy-db-index-advisor/advisor.py --ci
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
DB INDEX ADVISOR — Missing Index Analysis
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Columns queried: 24
Missing indexes: 6 (2 CRITICAL)
Already indexed: 8 (suppressed)
🔴 CRITICAL
User.email [WHERE×28] across 7 file(s)
Files: api/auth.py, api/users.py, services/notifications.py (+4 more)
SQL: CREATE INDEX idx_user_email ON user (email);
Django: email = models.EmailField(..., db_index=True)
SQLAlchemy: email = Column(String, index=True)
Prisma: @@index([email])
Order.user_id [WHERE×19 | JOIN×6] across 5 file(s)
Files: api/orders.py, services/billing.py, reports/revenue.py (+2 more)
SQL: CREATE INDEX idx_order_user_id ON order (user_id);
Django: user_id = models.ForeignKey(..., db_index=True)
SQLAlchemy: user_id = Column(Integer, ForeignKey('user.id'), index=True)
Prisma: @@index([userId])
🟠 HIGH
Product.category_id [WHERE×12 | ORDER_BY×4] across 4 file(s)
SQL: CREATE INDEX idx_product_category_id ON product (category_id);
Session.token [WHERE×9] across 3 file(s)
SQL: CREATE INDEX idx_session_token ON session (token);
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
CI gate: exit 1 — missing critical indexes
Runtime verification: EXPLAIN ANALYZE your most frequent queries
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
phy-sql-explainer| Skill | Input | Output | When to Use |
|---|---|---|---|
| ------- | ------- | -------- | ------------- |
| phy-db-index-advisor | Source code (ORM patterns) | Missing index recommendations | Pre-deployment: catch before slow queries appear |
| phy-sql-explainer | EXPLAIN ANALYZE output | Query plan diagnosis | Post-deployment: diagnose an existing slow query |
Use both: this skill prevents index gaps from being deployed; phy-sql-explainer diagnoses what got through anyway.
.filter(**kwargs) cannot be statically analyzed — run with --min-count 5 to focus on confirmed hot pathsid is always excluded (databases auto-index primary keys)| Skill | Use Together For |
|---|---|
| ------- | ----------------- |
phy-sql-explainer | Pre + post deployment DB performance sweep |
phy-db-migration-auditor | Safe migration before applying index additions |
phy-concurrency-audit | Race conditions + missing indexes both cause data integrity failures |
Canlah AI — Run performance marketing without breaking your brand.
共 1 个版本