运维
运维相关知识和内容
大模型+SQL双引擎:数据库慢查询自动诊断与优化实战
大模型+SQL双引擎:数据库慢查询自动诊断与优化实战
背景:慢查询是DBA最大的时间杀手
调查显示,中级DBA平均每天花费3-4小时处理慢查询报警,重复处理相似问题约占其中的70%。这些工作高度模式化,非常适合AI自动化。
本文将构建一个完整的AI驱动慢查询诊断系统,实测可将处理时间从平均2小时缩短至5分钟。
系统架构
MySQL慢查询日志
↓
慢查询采集器(Python)
├── 解析slow_query.log
├── 去重和聚合
└── 写入分析队列
↓
SQL分析引擎
├── EXPLAIN分析(执行计划解析)
├── 统计信息收集(表大小、索引使用率)
└── 性能反模式识别
↓
LLM诊断引擎(DeepSeek V4-Flash)
├── 上下文组装
├── 智能诊断(理解业务含义)
└── 优化建议生成
↓
通知与执行
├── 钉钉/飞书告警
├── 生成优化工单
└── (可选)自动执行低风险优化
一、慢查询采集模块
1.1 MySQL配置
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒记录
SET GLOBAL log_queries_not_using_indexes = 'ON'; -- 记录未使用索引的查询
SET GLOBAL log_throttle_queries_not_using_indexes = 10; -- 每分钟最多记录10条
SET GLOBAL min_examined_row_limit = 100; -- 至少扫描100行才记录
-- Performance Schema(更详细的统计)
UPDATE performance_schema.setup_consumers
SET ENABLED = 'YES'
WHERE NAME = 'events_statements_history_long';
1.2 日志采集器
import re
import hashlib
from datetime import datetime
from dataclasses import dataclass
from typing import Generator
@dataclass
class SlowQuery:
query_time: float
lock_time: float
rows_sent: int
rows_examined: int
sql: str
query_hash: str # 参数化后的哈希,用于去重
timestamp: datetime
user: str
db: str
class MySQLSlowLogParser:
"""MySQL慢查询日志解析器"""
HEADER_PATTERN = re.compile(
r"# Time: (\S+ \S+)\n"
r"# User@Host: (\S+)\[.*?\] @ .*?\[.*?\]\s+Id:\s+\d+\n"
r"# Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+"
r"Rows_sent: (\d+)\s+Rows_examined: (\d+)\n"
r"(?:use (\w+);\n)?"
r"SET timestamp=\d+;\n"
r"([\s\S]+?)(?=# Time:|$)",
re.MULTILINE
)
def parse(self, log_content: str) -> Generator[SlowQuery, None, None]:
for match in self.HEADER_PATTERN.finditer(log_content):
timestamp_str = match.group(1)
user = match.group(2)
query_time = float(match.group(3))
lock_time = float(match.group(4))
rows_sent = int(match.group(5))
rows_examined = int(match.group(6))
db = match.group(7) or "unknown"
sql = match.group(8).strip()
# 参数化SQL(用于去重)
normalized_sql = self._normalize_sql(sql)
query_hash = hashlib.md5(normalized_sql.encode()).hexdigest()
yield SlowQuery(
query_time=query_time,
lock_time=lock_time,
rows_sent=rows_sent,
rows_examined=rows_examined,
sql=sql,
query_hash=query_hash,
timestamp=datetime.fromisoformat(timestamp_str),
user=user,
db=db
)
def _normalize_sql(self, sql: str) -> str:
"""参数化SQL,去除具体值"""
# 数字替换
sql = re.sub(r'\b\d+\b', '?', sql)
# 字符串替换
sql = re.sub(r"'[^']*'", "'?'", sql)
# IN列表压缩
sql = re.sub(r'IN\s*\([^)]+\)', 'IN (?)', sql, flags=re.IGNORECASE)
return sql.strip().upper()
二、SQL分析引擎
2.1 EXPLAIN执行计划解析
import mysql.connector
from typing import Optional
class SQLAnalyzer:
"""SQL分析引擎"""
def __init__(self, db_config: dict):
self.db_config = db_config
def get_explain(self, sql: str, db: str) -> dict:
"""获取SQL执行计划"""
conn = mysql.connector.connect(**self.db_config, database=db)
cursor = conn.cursor(dictionary=True)
try:
# 获取标准EXPLAIN
cursor.execute(f"EXPLAIN FORMAT=JSON {sql}")
explain_json = cursor.fetchone()
# 获取EXPLAIN ANALYZE(实际执行数据,MySQL 8.0+)
cursor.execute(f"EXPLAIN ANALYZE {sql}")
explain_analyze = cursor.fetchall()
return {
"explain_json": explain_json,
"explain_analyze": explain_analyze
}
except Exception as e:
return {"error": str(e)}
finally:
cursor.close()
conn.close()
def collect_table_stats(self, sql: str, db: str) -> dict:
"""收集相关表的统计信息"""
# 提取表名
tables = self._extract_tables(sql)
conn = mysql.connector.connect(**self.db_config, database=db)
cursor = conn.cursor(dictionary=True)
stats = {}
for table in tables:
# 表基本信息
cursor.execute(f"""
SELECT
TABLE_ROWS,
DATA_LENGTH / 1024 / 1024 AS data_mb,
INDEX_LENGTH / 1024 / 1024 AS index_mb
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = '{db}' AND TABLE_NAME = '{table}'
""")
table_info = cursor.fetchone()
# 索引信息
cursor.execute(f"SHOW INDEX FROM `{table}`")
indexes = cursor.fetchall()
# 列信息
cursor.execute(f"""
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY, COLUMN_COMMENT
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = '{db}' AND TABLE_NAME = '{table}'
ORDER BY ORDINAL_POSITION
""")
columns = cursor.fetchall()
stats[table] = {
"row_count": table_info.get("TABLE_ROWS") if table_info else None,
"data_mb": table_info.get("data_mb") if table_info else None,
"indexes": indexes,
"columns": columns
}
cursor.close()
conn.close()
return stats
def detect_antipatterns(self, explain_result: dict, slow_query: SlowQuery) -> list[str]:
"""识别SQL性能反模式"""
issues = []
# 全表扫描检测
if explain_result.get("explain_json"):
plan = explain_result["explain_json"]
if '"access_type": "ALL"' in str(plan):
issues.append("FULL_TABLE_SCAN: 存在全表扫描,应添加合适索引")
# 扫描行数过多
if slow_query.rows_examined > slow_query.rows_sent * 1000:
ratio = slow_query.rows_examined / max(slow_query.rows_sent, 1)
issues.append(
f"INEFFICIENT_SCAN: 扫描行/返回行比率={ratio:.0f}x,"
f"索引过滤效果极差"
)
# 锁等待时间过长
if slow_query.lock_time > slow_query.query_time * 0.3:
issues.append(
f"LOCK_WAIT: 锁等待时间占执行时间{slow_query.lock_time/slow_query.query_time*100:.0f}%,"
f"存在锁竞争"
)
# SQL特征分析
sql_upper = slow_query.sql.upper()
if "SELECT *" in sql_upper:
issues.append("SELECT_STAR: 使用SELECT *,应只查询需要的列")
if "OR" in sql_upper and "INDEX" in str(explain_result):
issues.append("OR_CONDITION: OR条件可能导致索引失效,考虑改用UNION")
if sql_upper.count("JOIN") >= 4:
issues.append(f"MANY_JOINS: 包含{sql_upper.count('JOIN')}个JOIN,可能导致笛卡尔积或内存压力")
if "LIKE '%" in sql_upper:
issues.append("LEADING_WILDCARD: 前缀通配符 LIKE '%xxx' 无法使用索引")
if re.search(r'WHERE.*FUNCTION\(', sql_upper):
issues.append("FUNCTION_IN_WHERE: WHERE子句中对列使用函数会导致索引失效")
return issues
三、LLM诊断引擎
3.1 上下文组装与诊断
from openai import OpenAI
class LLMDiagnosticEngine:
"""大模型驱动的SQL诊断引擎"""
def __init__(self, api_key: str):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
def diagnose(
self,
slow_query: SlowQuery,
explain_result: dict,
table_stats: dict,
antipatterns: list[str]
) -> dict:
"""使用LLM进行综合诊断"""
context = self._build_context(
slow_query, explain_result, table_stats, antipatterns
)
response = self.client.chat.completions.create(
model="deepseek-v4-flash",
messages=[
{
"role": "system",
"content": """你是一名资深MySQL DBA,擅长SQL性能优化。
请根据提供的慢查询信息给出专业诊断和优化方案。
你的分析需要:
1. 准确识别性能问题的根本原因
2. 提供可直接执行的优化SQL
3. 评估优化效果(预期提升百分比)
4. 指出潜在风险
输出格式使用Markdown,要简洁专业。"""
},
{
"role": "user",
"content": context
}
],
temperature=0.1,
max_tokens=2048
)
diagnosis = response.choices[0].message.content
# 提取优化SQL
optimized_sql = self._extract_sql(diagnosis)
return {
"diagnosis": diagnosis,
"optimized_sql": optimized_sql,
"severity": self._assess_severity(slow_query, antipatterns)
}
def _build_context(self, sq, explain, stats, antipatterns) -> str:
return f"""
## 慢查询信息
- 执行时间:{sq.query_time:.2f}秒
- 锁等待:{sq.lock_time:.2f}秒
- 扫描行数:{sq.rows_examined:,}行
- 返回行数:{sq.rows_sent:,}行
- 数据库:{sq.db}
## SQL语句
```sql
{sq.sql}
EXPLAIN执行计划
{json.dumps(explain.get('explain_json', {}), ensure_ascii=False, indent=2)[:2000]}
相关表统计
{self._format_table_stats(stats)}
已识别的性能问题
{chr(10).join(f'- {issue}' for issue in antipatterns)}
请给出:
1. 根本原因分析
2. 优化方案(包括具体的索引创建语句或改写的SQL)
3. 预期效果
4. 注意事项
"""
def _assess_severity(self, sq: SlowQuery, antipatterns: list) -> str:
if sq.query_time > 10 or sq.rows_examined > 1_000_000:
return "CRITICAL"
elif sq.query_time > 3 or sq.rows_examined > 100_000:
return "HIGH"
elif sq.query_time > 1 or len(antipatterns) >= 3:
return "MEDIUM"
return "LOW"
## 四、实战案例
### 案例:N+1查询自动识别
```sql
-- 问题SQL(循环执行,N+1问题)
-- 扫描行数:50,000,执行时间:8.3秒
SELECT * FROM orders WHERE user_id = 12345;
-- 然后对每个order再执行:
SELECT * FROM order_items WHERE order_id = ?;
-- LLM识别出N+1模式,给出优化方案
-- 优化SQL(一次JOIN查询)
SELECT
o.id, o.total_amount, o.status, o.created_at,
oi.product_id, oi.quantity, oi.unit_price
FROM orders o
INNER JOIN order_items oi ON o.id = oi.order_id
WHERE o.user_id = 12345
ORDER BY o.created_at DESC;
-- 推荐添加的索引
CREATE INDEX idx_orders_user_created
ON orders(user_id, created_at DESC);
CREATE INDEX idx_order_items_order_id
ON order_items(order_id);
-- 优化效果:执行时间从8.3秒降至0.03秒(提升277倍)
五、部署与集成
# 作为定时任务运行
# 每5分钟分析一次慢查询日志
crontab -e
# */5 * * * * /usr/bin/python3 /opt/sql-analyzer/main.py >> /var/log/sql-analyzer.log 2>&1
# 或作为服务持续运行
systemctl enable sql-slow-analyzer
systemctl start sql-slow-analyzer
通过大模型+SQL分析引擎的组合,慢查询诊断实现了真正的自动化。在我们的实践中,系统可以自动处理约80%的常规慢查询告警,DBA只需关注剩余20%的复杂场景,运维效率大幅提升。