运维

运维相关知识和内容

大模型+SQL双引擎:数据库慢查询自动诊断与优化实战

大模型+SQL双引擎:数据库慢查询自动诊断与优化实战

背景:慢查询是DBA最大的时间杀手

调查显示,中级DBA平均每天花费3-4小时处理慢查询报警,重复处理相似问题约占其中的70%。这些工作高度模式化,非常适合AI自动化。

本文将构建一个完整的AI驱动慢查询诊断系统,实测可将处理时间从平均2小时缩短至5分钟

系统架构

MySQL慢查询日志
    ↓
慢查询采集器(Python)
    ├── 解析slow_query.log
    ├── 去重和聚合
    └── 写入分析队列
    ↓
SQL分析引擎
    ├── EXPLAIN分析(执行计划解析)
    ├── 统计信息收集(表大小、索引使用率)
    └── 性能反模式识别
    ↓
LLM诊断引擎(DeepSeek V4-Flash)
    ├── 上下文组装
    ├── 智能诊断(理解业务含义)
    └── 优化建议生成
    ↓
通知与执行
    ├── 钉钉/飞书告警
    ├── 生成优化工单
    └── (可选)自动执行低风险优化

一、慢查询采集模块

1.1 MySQL配置

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1;      -- 超过1秒记录
SET GLOBAL log_queries_not_using_indexes = 'ON';  -- 记录未使用索引的查询
SET GLOBAL log_throttle_queries_not_using_indexes = 10;  -- 每分钟最多记录10条
SET GLOBAL min_examined_row_limit = 100;  -- 至少扫描100行才记录

-- Performance Schema(更详细的统计)
UPDATE performance_schema.setup_consumers 
SET ENABLED = 'YES' 
WHERE NAME = 'events_statements_history_long';

1.2 日志采集器

import re
import hashlib
from datetime import datetime
from dataclasses import dataclass
from typing import Generator

@dataclass
class SlowQuery:
    query_time: float
    lock_time: float
    rows_sent: int
    rows_examined: int
    sql: str
    query_hash: str  # 参数化后的哈希,用于去重
    timestamp: datetime
    user: str
    db: str

class MySQLSlowLogParser:
    """MySQL慢查询日志解析器"""

    HEADER_PATTERN = re.compile(
        r"# Time: (\S+ \S+)\n"
        r"# User@Host: (\S+)\[.*?\] @ .*?\[.*?\]\s+Id:\s+\d+\n"
        r"# Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+"
        r"Rows_sent: (\d+)\s+Rows_examined: (\d+)\n"
        r"(?:use (\w+);\n)?"
        r"SET timestamp=\d+;\n"
        r"([\s\S]+?)(?=# Time:|$)",
        re.MULTILINE
    )

    def parse(self, log_content: str) -> Generator[SlowQuery, None, None]:
        for match in self.HEADER_PATTERN.finditer(log_content):
            timestamp_str = match.group(1)
            user = match.group(2)
            query_time = float(match.group(3))
            lock_time = float(match.group(4))
            rows_sent = int(match.group(5))
            rows_examined = int(match.group(6))
            db = match.group(7) or "unknown"
            sql = match.group(8).strip()

            # 参数化SQL(用于去重)
            normalized_sql = self._normalize_sql(sql)
            query_hash = hashlib.md5(normalized_sql.encode()).hexdigest()

            yield SlowQuery(
                query_time=query_time,
                lock_time=lock_time,
                rows_sent=rows_sent,
                rows_examined=rows_examined,
                sql=sql,
                query_hash=query_hash,
                timestamp=datetime.fromisoformat(timestamp_str),
                user=user,
                db=db
            )

    def _normalize_sql(self, sql: str) -> str:
        """参数化SQL,去除具体值"""
        # 数字替换
        sql = re.sub(r'\b\d+\b', '?', sql)
        # 字符串替换
        sql = re.sub(r"'[^']*'", "'?'", sql)
        # IN列表压缩
        sql = re.sub(r'IN\s*\([^)]+\)', 'IN (?)', sql, flags=re.IGNORECASE)
        return sql.strip().upper()

二、SQL分析引擎

2.1 EXPLAIN执行计划解析

import mysql.connector
from typing import Optional

class SQLAnalyzer:
    """SQL分析引擎"""

    def __init__(self, db_config: dict):
        self.db_config = db_config

    def get_explain(self, sql: str, db: str) -> dict:
        """获取SQL执行计划"""
        conn = mysql.connector.connect(**self.db_config, database=db)
        cursor = conn.cursor(dictionary=True)

        try:
            # 获取标准EXPLAIN
            cursor.execute(f"EXPLAIN FORMAT=JSON {sql}")
            explain_json = cursor.fetchone()

            # 获取EXPLAIN ANALYZE(实际执行数据,MySQL 8.0+)
            cursor.execute(f"EXPLAIN ANALYZE {sql}")
            explain_analyze = cursor.fetchall()

            return {
                "explain_json": explain_json,
                "explain_analyze": explain_analyze
            }
        except Exception as e:
            return {"error": str(e)}
        finally:
            cursor.close()
            conn.close()

    def collect_table_stats(self, sql: str, db: str) -> dict:
        """收集相关表的统计信息"""

        # 提取表名
        tables = self._extract_tables(sql)

        conn = mysql.connector.connect(**self.db_config, database=db)
        cursor = conn.cursor(dictionary=True)

        stats = {}
        for table in tables:
            # 表基本信息
            cursor.execute(f"""
                SELECT 
                    TABLE_ROWS,
                    DATA_LENGTH / 1024 / 1024 AS data_mb,
                    INDEX_LENGTH / 1024 / 1024 AS index_mb
                FROM information_schema.TABLES
                WHERE TABLE_SCHEMA = '{db}' AND TABLE_NAME = '{table}'
            """)
            table_info = cursor.fetchone()

            # 索引信息
            cursor.execute(f"SHOW INDEX FROM `{table}`")
            indexes = cursor.fetchall()

            # 列信息
            cursor.execute(f"""
                SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY, COLUMN_COMMENT
                FROM information_schema.COLUMNS
                WHERE TABLE_SCHEMA = '{db}' AND TABLE_NAME = '{table}'
                ORDER BY ORDINAL_POSITION
            """)
            columns = cursor.fetchall()

            stats[table] = {
                "row_count": table_info.get("TABLE_ROWS") if table_info else None,
                "data_mb": table_info.get("data_mb") if table_info else None,
                "indexes": indexes,
                "columns": columns
            }

        cursor.close()
        conn.close()
        return stats

    def detect_antipatterns(self, explain_result: dict, slow_query: SlowQuery) -> list[str]:
        """识别SQL性能反模式"""

        issues = []

        # 全表扫描检测
        if explain_result.get("explain_json"):
            plan = explain_result["explain_json"]
            if '"access_type": "ALL"' in str(plan):
                issues.append("FULL_TABLE_SCAN: 存在全表扫描,应添加合适索引")

        # 扫描行数过多
        if slow_query.rows_examined > slow_query.rows_sent * 1000:
            ratio = slow_query.rows_examined / max(slow_query.rows_sent, 1)
            issues.append(
                f"INEFFICIENT_SCAN: 扫描行/返回行比率={ratio:.0f}x,"
                f"索引过滤效果极差"
            )

        # 锁等待时间过长
        if slow_query.lock_time > slow_query.query_time * 0.3:
            issues.append(
                f"LOCK_WAIT: 锁等待时间占执行时间{slow_query.lock_time/slow_query.query_time*100:.0f}%,"
                f"存在锁竞争"
            )

        # SQL特征分析
        sql_upper = slow_query.sql.upper()

        if "SELECT *" in sql_upper:
            issues.append("SELECT_STAR: 使用SELECT *,应只查询需要的列")

        if "OR" in sql_upper and "INDEX" in str(explain_result):
            issues.append("OR_CONDITION: OR条件可能导致索引失效,考虑改用UNION")

        if sql_upper.count("JOIN") >= 4:
            issues.append(f"MANY_JOINS: 包含{sql_upper.count('JOIN')}个JOIN,可能导致笛卡尔积或内存压力")

        if "LIKE '%" in sql_upper:
            issues.append("LEADING_WILDCARD: 前缀通配符 LIKE '%xxx' 无法使用索引")

        if re.search(r'WHERE.*FUNCTION\(', sql_upper):
            issues.append("FUNCTION_IN_WHERE: WHERE子句中对列使用函数会导致索引失效")

        return issues

三、LLM诊断引擎

3.1 上下文组装与诊断

from openai import OpenAI

class LLMDiagnosticEngine:
    """大模型驱动的SQL诊断引擎"""

    def __init__(self, api_key: str):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )

    def diagnose(
        self,
        slow_query: SlowQuery,
        explain_result: dict,
        table_stats: dict,
        antipatterns: list[str]
    ) -> dict:
        """使用LLM进行综合诊断"""

        context = self._build_context(
            slow_query, explain_result, table_stats, antipatterns
        )

        response = self.client.chat.completions.create(
            model="deepseek-v4-flash",
            messages=[
                {
                    "role": "system",
                    "content": """你是一名资深MySQL DBA,擅长SQL性能优化。
请根据提供的慢查询信息给出专业诊断和优化方案。
你的分析需要:
1. 准确识别性能问题的根本原因
2. 提供可直接执行的优化SQL
3. 评估优化效果(预期提升百分比)
4. 指出潜在风险
输出格式使用Markdown,要简洁专业。"""
                },
                {
                    "role": "user",
                    "content": context
                }
            ],
            temperature=0.1,
            max_tokens=2048
        )

        diagnosis = response.choices[0].message.content

        # 提取优化SQL
        optimized_sql = self._extract_sql(diagnosis)

        return {
            "diagnosis": diagnosis,
            "optimized_sql": optimized_sql,
            "severity": self._assess_severity(slow_query, antipatterns)
        }

    def _build_context(self, sq, explain, stats, antipatterns) -> str:
        return f"""
## 慢查询信息
- 执行时间:{sq.query_time:.2f}秒
- 锁等待:{sq.lock_time:.2f}秒
- 扫描行数:{sq.rows_examined:,}行
- 返回行数:{sq.rows_sent:,}行
- 数据库:{sq.db}

## SQL语句
```sql
{sq.sql}

EXPLAIN执行计划

{json.dumps(explain.get('explain_json', {}), ensure_ascii=False, indent=2)[:2000]}

相关表统计

{self._format_table_stats(stats)}

已识别的性能问题

{chr(10).join(f'- {issue}' for issue in antipatterns)}

请给出:
1. 根本原因分析
2. 优化方案(包括具体的索引创建语句或改写的SQL)
3. 预期效果
4. 注意事项
"""

def _assess_severity(self, sq: SlowQuery, antipatterns: list) -> str:
    if sq.query_time > 10 or sq.rows_examined > 1_000_000:
        return "CRITICAL"
    elif sq.query_time > 3 or sq.rows_examined > 100_000:
        return "HIGH"
    elif sq.query_time > 1 or len(antipatterns) >= 3:
        return "MEDIUM"
    return "LOW"
## 四、实战案例

### 案例:N+1查询自动识别

```sql
-- 问题SQL(循环执行,N+1问题)
-- 扫描行数:50,000,执行时间:8.3秒
SELECT * FROM orders WHERE user_id = 12345;
-- 然后对每个order再执行:
SELECT * FROM order_items WHERE order_id = ?;

-- LLM识别出N+1模式,给出优化方案
-- 优化SQL(一次JOIN查询)
SELECT 
    o.id, o.total_amount, o.status, o.created_at,
    oi.product_id, oi.quantity, oi.unit_price
FROM orders o
INNER JOIN order_items oi ON o.id = oi.order_id
WHERE o.user_id = 12345
ORDER BY o.created_at DESC;

-- 推荐添加的索引
CREATE INDEX idx_orders_user_created 
    ON orders(user_id, created_at DESC);
CREATE INDEX idx_order_items_order_id 
    ON order_items(order_id);

-- 优化效果:执行时间从8.3秒降至0.03秒(提升277倍)

五、部署与集成

# 作为定时任务运行
# 每5分钟分析一次慢查询日志
crontab -e
# */5 * * * * /usr/bin/python3 /opt/sql-analyzer/main.py >> /var/log/sql-analyzer.log 2>&1

# 或作为服务持续运行
systemctl enable sql-slow-analyzer
systemctl start sql-slow-analyzer

通过大模型+SQL分析引擎的组合,慢查询诊断实现了真正的自动化。在我们的实践中,系统可以自动处理约80%的常规慢查询告警,DBA只需关注剩余20%的复杂场景,运维效率大幅提升。