NL2SQL Agent 是一个基于自然语言的数据库查询和分析系统,能够将用户的自然语言查询自动转换为 SQL 语句,执行查询并生成易于理解的中文分析报告。系统具备错误自愈能力,支持多轮对话,并可部署到 AWS AgentCore 平台。
- 自然语言转 SQL: 将用户的中文查询自动转换为准确的 SQL 语句
- 智能执行: 自动执行 SQL 查询并返回结果
- 错误自愈: 当 SQL 执行失败时,自动分析错误并修正,最多重试 3 次
- 结果分析: 使用 AI 分析查询结果,生成易懂的中文报告
- 多轮对话: 支持上下文理解,可以进行连续的对话式查询
- 记忆功能: 集成 AgentCore Memory Service,记录查询历史和用户偏好
- 框架: Strands Agent Framework
- AI 模型: AWS Bedrock Claude 4 Sonnet
- 数据库: MySQL (支持 AWS RDS)
- 部署平台: AWS AgentCore Runtime
- 记忆服务: AgentCore Memory Service
- 日志: CloudWatch Logs
.
├── agent.py # AgentCore 入口文件
├── local_test.py # 本地测试脚本
├── setup_memory.py # Memory 资源创建脚本
├── requirements.txt # Python 依赖
├── nl2sql_agent/ # 核心业务逻辑包
│ ├── __init__.py
│ ├── agent.py # NL2SQL Agent 核心类
│ ├── executor.py # SQL 查询执行器
│ ├── analyzer.py # 结果分析器
│ ├── memory_hook.py # Memory Service 集成
│ ├── tools.py # Strands 自定义工具
│ ├── models.py # 数据模型
│ ├── llm_client.py # LLM 客户端
│ ├── error_handler.py # 错误处理器
│ └── utils.py # 工具函数
├── rds_demo/ # 数据库示例
│ ├── db_config.txt # 数据库配置
│ ├── init_schema.sql # 数据库表结构
│ └── generate_data.py # 测试数据生成
└── tests/ # 测试文件
├── test_agent.py
├── test_executor.py
├── test_analyzer.py
└── test_tools.py
- Python 3.9 或更高版本
- MySQL 数据库(本地或 RDS)
- AWS 账号和凭证(用于访问 Bedrock 服务)
- (可选)AgentCore Memory ID(用于多轮对话功能)
git clone <repository-url>
cd sql-analysis-agentpython -m venv .venv
source .venv/bin/activate # Linux/Mac
# 或
.venv\Scripts\activate # Windowspip install -r requirements.txt创建或编辑 rds_demo/db_config.txt 文件:
endpoint=localhost
database=demodb
username=root
password=your_password
port=3306
region=us-west-2
export DB_ENDPOINT=localhost
export DB_PORT=3306
export DB_NAME=demodb
export DB_USER=root
export DB_PASSWORD=your_password
export AWS_REGION=us-west-2# 连接到 MySQL 并创建数据库
mysql -u root -p < rds_demo/init_schema.sql
# 生成测试数据
python rds_demo/generate_data.py# 配置 AWS CLI(如果还没有配置)
aws configure
# 或者设置环境变量
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_REGION=us-west-2如果需要使用多轮对话功能,需要创建 AgentCore Memory 资源:
python setup_memory.py记录输出的 Memory ID,后续使用时需要:
export MEMORY_ID=<your-memory-id># 单次查询
python local_test.py "查询所有客户的数量"
# 交互模式
python local_test.py --interactive
# 使用 Memory(支持多轮对话)
python local_test.py --memory-id <your-memory-id> --interactive
# 使用 Mock LLM(不调用 Bedrock,用于快速测试)
python local_test.py --mock "查询所有客户的数量"- 已安装 AgentCore CLI
- 已配置 AWS 凭证
- 已创建 Memory 资源(可选)
- 数据库可从 AWS 访问(建议使用 RDS)
pip install bedrock-agentcore-cli创建 .env 文件或设置环境变量:
# 数据库配置
export DB_ENDPOINT=<your-rds-endpoint>
export DB_PORT=3306
export DB_NAME=demodb
export DB_USER=admin
export DB_PASSWORD=<your-db-password>
# AWS 配置
export AWS_REGION=us-west-2
# 模型配置
export MODEL_ID=us.anthropic.claude-sonnet-4-20250514-v1:0
# Memory 配置(可选)
export MEMORY_ID=<your-memory-id>
# 其他配置
export MAX_RETRIES=3
export LOG_LEVEL=INFOagentcore configure -e agent.py这将创建 AgentCore 配置文件,包含:
- 入口文件路径
- 依赖包列表
- 环境变量
- IAM 权限
agentcore launch部署过程包括:
- 打包应用代码和依赖
- 上传到 AWS
- 创建 AgentCore Runtime 资源
- 配置 IAM 角色和权限
- 启动 Agent
# 单次调用
agentcore invoke '{"prompt": "查询所有客户的数量"}'
# 指定会话 ID(用于多轮对话)
agentcore invoke '{"prompt": "查询订单总金额超过 1000 的客户"}' --session-id my-session
# 后续查询(基于上下文)
agentcore invoke '{"prompt": "这些客户都来自哪些城市?"}' --session-id my-session# 查看最近的日志
agentcore logs
# 实时查看日志
agentcore logs --follow
# 查看特定时间范围的日志
agentcore logs --start-time "2025-11-02T10:00:00" --end-time "2025-11-02T11:00:00"用户查询:
查询所有客户的数量
系统处理流程:
- 生成 SQL:
SELECT COUNT(*) as customer_count FROM customers - 执行查询
- 分析结果
输出:
查询分析报告
================
根据查询结果,数据库中共有 150 位客户。
查询结果概述:
系统成功从 customers 表中统计了客户总数。
关键发现:
- 客户总数:150 位
- 这表明数据库中有活跃的客户记录
执行的 SQL 语句:
```sql
SELECT COUNT(*) as customer_count FROM customers
执行时间: 0.12 秒
---
### 示例 2: 分组聚合查询
**用户查询:**
查询每个城市的客户数量,按数量降序排列,只显示前5个城市
**系统处理流程:**
1. 生成 SQL: `SELECT city, COUNT(*) as customer_count FROM customers GROUP BY city ORDER BY customer_count DESC LIMIT 5`
2. 执行查询
3. 分析结果并生成报告
**输出:**
根据查询结果,客户分布最多的前5个城市如下:
查询结果概述: 系统分析了客户在不同城市的分布情况,并按客户数量进行了排序。
关键发现:
- 北京:45 位客户(30%)
- 上海:38 位客户(25.3%)
- 深圳:32 位客户(21.3%)
- 广州:28 位客户(18.7%)
- 杭州:25 位客户(16.7%)
数据特征分析:
- 前5个城市共有 168 位客户,占总客户数的约 89%
- 显示出明显的地域集中特征
- 一线城市(北京、上海、深圳、广州)占据主导地位
- 客户分布呈现明显的梯度递减趋势
业务建议:
- 可以考虑在这些重点城市加强市场投入
- 针对不同城市的客户特点制定差异化营销策略
- 关注其他城市的市场开发潜力
执行的 SQL 语句:
SELECT city, COUNT(*) as customer_count
FROM customers
GROUP BY city
ORDER BY customer_count DESC
LIMIT 5执行时间: 0.18 秒
---
### 示例 3: 关联查询
**用户查询:**
查询销售额最高的前10个产品及其分类
**系统处理流程:**
1. 生成 SQL(包含 JOIN)
2. 执行查询
3. 分析结果
**输出:**
查询结果概述: 系统分析了产品销售数据,识别出销售额最高的10个产品及其所属分类。
关键发现: 销售额前10的产品:
- iPhone 14 Pro (电子产品) - ¥128,500
- MacBook Air M2 (电子产品) - ¥95,200
- AirPods Pro (电子产品) - ¥78,900
- iPad Air (电子产品) - ¥65,400
- Apple Watch Series 8 (电子产品) - ¥52,300
- Sony WH-1000XM5 (电子产品) - ¥45,600
- Samsung Galaxy S23 (电子产品) - ¥42,800
- Dell XPS 13 (电子产品) - ¥38,900
- Nintendo Switch (游戏设备) - ¥35,200
- Kindle Paperwhite (电子产品) - ¥28,700
数据特征分析:
- 电子产品类别占据了前10名中的9个位置
- 总销售额:¥611,500
- 平均单品销售额:¥61,150
- 销售额分布呈现明显的头部效应
业务建议:
- 电子产品是核心销售品类,应继续保持库存充足
- 考虑扩大电子产品的产品线
- 关注游戏设备类别的增长潜力
执行的 SQL 语句:
SELECT
p.product_name,
c.category_name,
SUM(oi.quantity * oi.unit_price) as total_sales
FROM products p
JOIN categories c ON p.category_id = c.category_id
JOIN order_items oi ON p.product_id = oi.product_id
GROUP BY p.product_id, p.product_name, c.category_name
ORDER BY total_sales DESC
LIMIT 10执行时间: 0.25 秒
---
### 示例 4: 多轮对话场景
**第 1 轮对话:**
**用户:** 查询订单总金额超过 1000 的客户
**系统输出:**
查询结果概述: 系统识别出订单总金额超过 1000 元的高价值客户。
关键发现: 共有 15 位客户的订单总金额超过 1000 元:
- 张伟 (北京) - 总订单金额:¥3,250
- 李娜 (上海) - 总订单金额:¥2,890
- 王芳 (深圳) - 总订单金额:¥2,650
- 刘强 (广州) - 总订单金额:¥2,420
- 陈静 (杭州) - 总订单金额:¥2,180 ... (还有10位客户)
数据特征分析:
- 高价值客户总数:15 位
- 平均订单金额:¥2,156
- 总订单金额:¥32,340
- 这些客户占总客户数的 10%,但贡献了约 45% 的销售额
执行的 SQL 语句:
SELECT
c.name,
c.city,
SUM(o.total_amount) as total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.city
HAVING SUM(o.total_amount) > 1000
ORDER BY total_spent DESC执行时间: 0.22 秒
---
**第 2 轮对话:**
**用户:** 这些客户都来自哪些城市?
**系统理解:** "这些客户" 指的是第 1 轮查询中订单总金额超过 1000 的客户
**系统输出:**
查询结果概述: 基于上一次查询的高价值客户,系统分析了他们的城市分布情况。
关键发现: 高价值客户的城市分布:
- 北京:5 位客户 (33.3%)
- 上海:4 位客户 (26.7%)
- 深圳:3 位客户 (20.0%)
- 广州:2 位客户 (13.3%)
- 杭州:1 位客户 (6.7%)
数据特征分析:
- 高价值客户主要集中在一线城市
- 北京和上海合计占比达到 60%
- 城市分布与整体客户分布基本一致
- 显示出经济发达地区的消费能力更强
业务建议:
- 在一线城市加强高端产品的推广
- 针对这些城市的客户提供专属服务
- 考虑在这些城市开设线下体验店
执行的 SQL 语句:
SELECT
c.city,
COUNT(DISTINCT c.customer_id) as customer_count,
ROUND(COUNT(DISTINCT c.customer_id) * 100.0 / 15, 1) as percentage
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.city
HAVING SUM(o.total_amount) > 1000
ORDER BY customer_count DESC执行时间: 0.19 秒
---
**第 3 轮对话:**
**用户:** 他们的平均订单金额是多少?
**系统理解:** "他们" 指的是前面提到的高价值客户
**系统输出:**
查询结果概述: 系统计算了高价值客户(订单总金额超过 1000 元)的平均订单金额。
关键发现:
- 平均订单金额:¥456.80
- 订单总数:71 笔
- 客户总数:15 位
- 人均订单数:4.7 笔
对比分析:
- 全体客户的平均订单金额:¥285.50
- 高价值客户的平均订单金额比整体高出 60%
- 显示出高价值客户的单笔消费能力更强
数据特征分析:
- 高价值客户不仅订单总额高,单笔订单金额也显著高于平均水平
- 这些客户的购买频次也较高(人均 4.7 笔)
- 表明这些客户既有消费能力,也有较高的品牌忠诚度
业务建议:
- 为这些高价值客户提供 VIP 服务
- 推送高端产品和限量版商品
- 建立客户关系管理系统,定期跟进
- 考虑推出会员积分和专属优惠
执行的 SQL 语句:
SELECT
AVG(o.total_amount) as avg_order_amount,
COUNT(o.order_id) as total_orders,
COUNT(DISTINCT o.customer_id) as total_customers,
ROUND(COUNT(o.order_id) * 1.0 / COUNT(DISTINCT o.customer_id), 1) as orders_per_customer
FROM orders o
WHERE o.customer_id IN (
SELECT c.customer_id
FROM customers c
JOIN orders o2 ON c.customer_id = o2.customer_id
GROUP BY c.customer_id
HAVING SUM(o2.total_amount) > 1000
)执行时间: 0.21 秒
---
### 示例 5: 错误自愈场景
**用户查询:**
查询产品表中的所有商品名称
**第 1 次尝试(失败):**
系统生成的 SQL:
```sql
SELECT product_name FORM products
错误信息:
SQL 语法错误: You have an error in your SQL syntax near 'FORM'
系统自动修正:
系统分析错误,识别为拼写错误(FORM → FROM),生成修正后的 SQL:
SELECT product_name FROM products第 2 次尝试(成功):
最终输出:
查询分析报告
================
查询结果概述:
系统成功查询了产品表中的所有商品名称。
关键发现:
数据库中共有 50 个产品,包括:
电子产品类:
- iPhone 14 Pro
- MacBook Air M2
- AirPods Pro
- iPad Air
- Apple Watch Series 8
... (还有 15 个电子产品)
家居用品类:
- 智能扫地机器人
- 空气净化器
- 电动牙刷
... (还有 12 个家居用品)
服装类:
- 运动鞋
- T恤
- 牛仔裤
... (还有 10 个服装产品)
数据特征分析:
- 产品种类丰富,覆盖多个品类
- 电子产品占比最高(约 40%)
- 产品命名规范,便于识别
注意事项:
⚠️ 系统自动修正了 SQL 语法错误(FORM → FROM)
✓ 查询已成功执行
执行的 SQL 语句:
```sql
SELECT product_name FROM products
重试次数: 1 执行时间: 0.15 秒
---
### 示例 6: 时间范围查询
**用户查询:**
查询最近30天的订单数量和总金额
**系统输出:**
查询结果概述: 系统分析了最近30天(2025-10-03 至 2025-11-02)的订单数据。
关键发现:
- 订单总数:245 笔
- 订单总金额:¥68,950
- 平均订单金额:¥281.43
- 日均订单数:8.2 笔
- 日均销售额:¥2,298.33
趋势分析:
- 订单数量呈现稳定增长趋势
- 周末订单量明显高于工作日(约高出 35%)
- 最高单日订单数:15 笔(2025-10-28,周六)
- 最低单日订单数:3 笔(2025-10-15,周二)
数据特征分析:
- 订单量保持健康增长态势
- 周末效应明显,建议在周末加强促销活动
- 平均订单金额稳定,客户消费习惯较为固定
业务建议:
- 在周末推出限时优惠活动
- 在工作日通过营销活动提升订单量
- 关注客单价提升机会
执行的 SQL 语句:
SELECT
COUNT(order_id) as order_count,
SUM(total_amount) as total_sales,
AVG(total_amount) as avg_order_amount,
MIN(order_date) as earliest_order,
MAX(order_date) as latest_order
FROM orders
WHERE order_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)执行时间: 0.16 秒
---
### 示例 7: 复杂业务分析
**用户查询:**
分析每个产品分类的销售情况,包括销售额、订单数和平均单价
**系统输出:**
查询结果概述: 系统对所有产品分类进行了全面的销售分析,包括销售额、订单数量和平均单价等关键指标。
关键发现:
-
电子产品
- 销售额:¥425,600 (占比 58.5%)
- 订单数:156 笔
- 平均单价:¥2,728
- 表现:⭐⭐⭐⭐⭐ 最佳品类
-
家居用品
- 销售额:¥185,300 (占比 25.5%)
- 订单数:98 笔
- 平均单价:¥1,891
- 表现:⭐⭐⭐⭐ 优秀
-
服装
- 销售额:¥78,900 (占比 10.8%)
- 订单数:125 笔
- 平均单价:¥631
- 表现:⭐⭐⭐ 良好
-
图书
- 销售额:¥28,500 (占比 3.9%)
- 订单数:87 笔
- 平均单价:¥328
- 表现:⭐⭐ 一般
-
运动器材
- 销售额:¥9,200 (占比 1.3%)
- 订单数:12 笔
- 平均单价:¥767
- 表现:⭐ 需要改进
数据特征分析:
- 电子产品是绝对的主力品类,贡献了近60%的销售额
- 家居用品表现稳定,是第二大品类
- 服装类订单数量多但客单价较低
- 图书类订单频繁但金额较小
- 运动器材销售疲软,需要关注
业务建议:
- 电子产品:保持优势,扩大产品线,增加高端产品
- 家居用品:稳步发展,关注新兴智能家居产品
- 服装:提升客单价,推出高端服装系列
- 图书:考虑捆绑销售,提升客单价
- 运动器材:重新评估产品定位,加强营销推广
执行的 SQL 语句:
SELECT
c.category_name,
SUM(oi.quantity * oi.unit_price) as total_sales,
COUNT(DISTINCT o.order_id) as order_count,
AVG(oi.unit_price) as avg_unit_price,
ROUND(SUM(oi.quantity * oi.unit_price) * 100.0 /
(SELECT SUM(quantity * unit_price) FROM order_items), 1) as sales_percentage
FROM categories c
JOIN products p ON c.category_id = p.category_id
JOIN order_items oi ON p.product_id = oi.product_id
JOIN orders o ON oi.order_id = o.order_id
GROUP BY c.category_id, c.category_name
ORDER BY total_sales DESC执行时间: 0.28 秒
## 配置说明
### 环境变量
| 变量名 | 说明 | 默认值 | 必需 |
|--------|------|--------|------|
| `DB_ENDPOINT` | 数据库地址 | localhost | 是 |
| `DB_PORT` | 数据库端口 | 3306 | 否 |
| `DB_NAME` | 数据库名称 | demodb | 是 |
| `DB_USER` | 数据库用户名 | root | 是 |
| `DB_PASSWORD` | 数据库密码 | - | 是 |
| `AWS_REGION` | AWS 区域 | us-west-2 | 否 |
| `MODEL_ID` | Bedrock 模型 ID | us.anthropic.claude-sonnet-4-20250514-v1:0 | 否 |
| `MEMORY_ID` | Memory 资源 ID | - | 否 |
| `MAX_RETRIES` | 最大重试次数 | 3 | 否 |
| `LOG_LEVEL` | 日志级别 | INFO | 否 |
### 数据库表结构
系统支持以下数据库表:
- **customers**: 客户信息表
- **categories**: 产品分类表
- **products**: 产品信息表
- **orders**: 订单表
- **order_items**: 订单明细表
- **reviews**: 产品评论表
详细表结构请参考 `rds_demo/init_schema.sql`。
## 常见问题
### Q1: 如何处理数据库连接失败?
**A:** 检查以下几点:
1. 数据库地址和端口是否正确
2. 用户名和密码是否正确
3. 数据库是否允许远程连接
4. 防火墙规则是否允许访问
5. 如果使用 RDS,检查安全组配置
### Q2: 如何启用多轮对话功能?
**A:** 需要配置 Memory ID:
```bash
# 1. 创建 Memory 资源
python setup_memory.py
# 2. 设置环境变量
export MEMORY_ID=<your-memory-id>
# 3. 使用相同的 session_id 进行多轮对话
A:
- 本地环境:使用
--verbose参数python local_test.py --verbose "查询内容" - AgentCore 环境:查看 CloudWatch Logs
agentcore logs --follow
A:
- 优化查询,添加 WHERE 条件缩小范围
- 添加 LIMIT 子句限制返回记录数
- 为常用查询字段添加索引
- 增加数据库连接超时时间
A: 修改 nl2sql_agent/analyzer.py 中的 ResultAnalyzer 类,自定义分析逻辑和报告格式。
A:
- 在数据库中创建新表
- 系统会自动获取表结构信息
- 无需修改代码,Agent 会自动识别新表
A: 修改 nl2sql_agent/executor.py 中的 get_schema_info 方法,只返回允许查询的表结构。
A:
# 1. 修改代码
# 2. 重新部署
agentcore launch --update
# 或者完全重新部署
agentcore destroy
agentcore launch-
数据库优化
- 为常用查询字段添加索引
- 使用连接池管理数据库连接
- 定期清理过期数据
-
查询优化
- 自动添加 LIMIT 子句限制返回记录数
- 避免 SELECT *,明确指定需要的字段
- 使用数据库的聚合函数而不是在应用层计算
-
缓存策略
- 缓存数据库 Schema 信息
- 对于相同的查询,缓存结果(可选)
-
并发处理
- 调整数据库连接池大小
- 使用异步处理长时间运行的查询
-
SQL 注入防护
- 系统只允许 SELECT 语句
- 禁止 DROP、DELETE、UPDATE 等危险操作
- 使用参数化查询
-
数据库凭证管理
- 不要将凭证提交到版本控制
- 使用 AWS Secrets Manager 存储凭证
- 使用 IAM 角色进行数据库认证(如果支持)
-
访问控制
- 每个会话使用唯一的 session_id
- Memory Service 按 session_id 隔离数据
- 在 AgentCore 层面实现用户认证
-
数据脱敏
- 对敏感字段(如邮箱、电话)进行脱敏处理
- 在分析报告中避免显示完整的敏感信息
部署到 AgentCore 后,所有日志会自动输出到 CloudWatch Logs。
日志格式(JSON):
{
"timestamp": "2025-11-02T10:30:45Z",
"level": "INFO",
"session_id": "session-12345",
"query_id": "query-67890",
"message": "查询处理成功",
"sql_statement": "SELECT COUNT(*) FROM customers",
"execution_time": 0.15,
"result_count": 1
}- SQL 生成时间
- SQL 执行时间
- 结果分析时间
- 端到端响应时间
- 错误率和重试率
- Memory Service 调用延迟
欢迎贡献代码!请遵循以下步骤:
- Fork 项目
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 开启 Pull Request
[添加许可证信息]
[添加联系方式]
- 初始版本发布
- 支持自然语言转 SQL
- 支持错误自愈
- 支持多轮对话
- 集成 AgentCore Memory Service
- 支持部署到 AgentCore Runtime