from flask import Flask, request, jsonify import pymssql app = Flask(__name__) # 数据库配置 DATABASE_CONFIG = { 'host': 'SBDC1Dxxx.xxxx.local', # 自己的数据库地址 'port': '1433', 'user': 'itexxx', # 自己数据库的账户 'password': 'xxxx#', #自己数据库的密码 'db': 'IportalData', # 自己数据库的库名 'charset': 'utf8' } # 新增根路径路由 @app.route('/') def home(): return "欢迎访问SQL Server接口服务!" # 自定GET义返回内容 @app.route('/query', methods=['POST']) def query_database(): print("接收到请求") # 获取关键字 keyword = request.json.get('keyword') print("keyword为:"+keyword) if not keyword: return jsonify({"error": "Keyword is required"}), 400 # 参数化查询,避免 SQL 注入,修改成自己的库 query = """ SELECT Shop, KPI Area, SAP_TP_name, SAP_Text, Prodtag, Ausfallzeit, SD_Bewertung_Ebene1, SD_Bewertung_Ebene2, SD_Bewertung_Ebene3, SD_Bewertung_Ebene4 FROM Classification WHERE (Prodtag between '2025-03-01' and '2025-03-12') and (SAP_Text LIKE %s) """ params = ('%' + keyword + '%',) try: # 建立数据库连接 connection = pymssql.connect(**DATABASE_CONFIG) with connection.cursor() as cursor: # 执行查询 cursor.execute(query, params) result = cursor.fetchall() connection.commit() connection.close() if not result: return"未查询到有效数据", 400 # 生成 Markdown 表格 markdown_table = generate_markdown_table(result) return markdown_table, 200 except Exception as e: return str(e), 500 def generate_markdown_table(results): """ 生成 Markdown 表格 """ if not results: return"" # 获取列名 columns = results[0].keys() # 表头 table_md = "| " + " | ".join([col for col in columns]) + " |\n" # 分隔线 table_md += "| " + " --- |" * len(columns) + "\n" # 表格内容 for row in results: table_md += "| " + " | ".join([str(cell) for cell in row.values()]) + " |\n" return table_md if __name__ == '__main__': # 注意这里绑定本机的内容ip,省事点,就0.0.0.0即可。不要绑定127.0.0.1,docker内访问不到 app.run(host='0.0.0.0', port=5000) 我在用Dify平台创建工作流,想要连接数据库,用这个里面的数据,他需要先写一个服务,然后再用http请求节点,我这个服务写的对吗?然后整个工作流应该怎么操作?
时间: 2025-03-13 20:19:46 浏览: 83
<think>好的,我需要帮助用户分析他们的Flask服务代码是否正确,并指导如何在Dify平台上构建工作流。首先,我应该先仔细阅读用户提供的代码,看看是否存在潜在的问题。
用户使用Flask创建了一个接口,主要处理POST请求到/query路径,从数据库中查询数据并返回Markdown表格。首先,检查数据库连接部分,用户使用了pymssql,配置信息看起来正确,但需要注意敏感信息如密码的处理,不过用户可能已经自行处理了。接下来查看路由处理函数query_database,这里需要注意SQL注入的问题,用户使用了参数化查询,这是正确的做法,使用%s作为占位符,pymssql应该支持这种方式。不过,在生成查询结果时,用户尝试使用result[0].keys()来获取列名,但pymssql的cursor默认返回的是元组而非字典,所以这里可能有问题,需要确认是否设置了游标的字典模式,比如在连接时指定as_dict=True,否则会引发AttributeError,因为元组没有keys方法。
另外,在错误处理部分,当查询结果为空时,用户返回了400状态码,但根据HTTP标准,404可能更合适,不过400也可以接受,说明客户端请求有问题。生成Markdown表格的函数中,用户假设结果中的每一行是字典,但实际上如果游标没有设置为返回字典,这里会出错。需要建议用户修改游标的配置,或者在代码中将元组转换为字典,例如通过cursor.description获取列名。
关于Dify平台的工作流配置,用户需要创建一个HTTP请求节点,指向部署好的Flask服务的/query端点,使用POST方法,传递keyword参数。需要确保Dify能够访问到该服务,尤其是在Docker环境中运行时,Flask应用必须监听0.0.0.0,用户已经正确设置了这一点。此外,服务返回的是Markdown表格,Dify可能需要进一步处理这个响应,比如解析成结构化数据,用户需要根据需求调整返回格式或添加处理步骤。
最后,需要提醒用户测试服务是否正常运行,使用工具如Postman发送POST请求到/query,检查返回结果是否正确。另外,安全方面,建议用户启用HTTPS,添加身份验证,如API密钥,避免暴露数据库信息在代码中,可以使用环境变量。如果用户遇到具体错误,需要根据错误信息进一步排查,例如数据库连接失败、查询错误等。</think>以下是对您代码的详细分析和Dify工作流操作指南:
一、代码分析及优化建议
1. 代码正确性
√ 基础架构正确:Flask服务结构完整,具备路由、数据库连接、查询处理等功能模块
× 列名获取错误:pymssql默认返回元组类型数据,需添加`as_dict=True`参数转换为字典
需修改数据库连接部分:
```python
connection = pymssql.connect(**DATABASE_CONFIG)
with connection.cursor(as_dict=True) as cursor: # 添加as_dict参数
```
2. SQL注入防护
√ 已使用参数化查询(%s占位符)
! 注意:pymssql要求必须使用tuple传参,即使单个参数也要写成(params,)
3. 错误处理优化建议
• 建议将空数据响应码改为404
• 添加数据库连接异常捕获
• 建议记录日志而非直接print
二、Dify工作流配置步骤
1. 服务部署
(1) 确保服务可访问
• 测试访问:`curl -X POST http://服务器IP:5000/query -H "Content-Type: application/json" -d '{"keyword":"test"}'`
• 防火墙开放5000端口
2. Dify平台配置
```mermaid
graph TD
A[触发节点] --> B[HTTP请求节点]
B --> C[数据处理节点]
C --> D[输出节点]
```
具体操作:
1. 创建工作流
- 新建工作流 → 命名(如"数据库查询")
2. 配置HTTP请求节点
• URL:`http://你的服务器IP:5000/query`
• 方法:POST
• Headers:`Content-Type: application/json`
• Body参数:
```json
{
"keyword": "{{input.keyword}}"
}
```
3. 添加响应处理器(推荐)
建议在后续节点中添加JSON解析:
```python
try:
return json.loads(response.content)
except:
return {"error": "响应解析失败", "raw": response.text}
```
4. 错误处理配置
建议添加异常捕获节点,处理以下情况:
- 连接超时(设置timeout=10)
- 非200响应
- 空数据返回
三、完整调试流程
1. 本地测试
```bash
# 测试服务
curl -X POST http://localhost:5000/query -H "Content-Type: application/json" -d '{"keyword":"汽车"}'
# 预期返回样例
| Shop | KPI Area | SAP_TP_name | ...
|------|----------|-------------|-----
| 上海店 | 华东区 | 发动机部件 | ...
```
2. Dify测试用例
```json
{
"input": {
"keyword": "发动机"
},
"expected_output": {
"contains_table": true,
"min_rows": 1
}
}
```
四、安全增强建议
1. 添加API密钥验证
```python
from flask import abort
@app.before_request
def check_auth():
if request.endpoint == 'query_database':
if request.headers.get('X-API-KEY') != 'YOUR_SECRET_KEY':
abort(403)
```
2. 数据库连接池配置
推荐使用`DBUtils`实现连接池:
```python
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymssql,
**DATABASE_CONFIG,
maxconnections=10
)
# 在视图函数中使用
connection = pool.connection()
```
五、性能优化建议
1. 添加缓存机制
```python
from flask_caching import Cache
cache = Cache(config={'CACHE_TYPE': 'SimpleCache'})
cache.init_app(app)
@app.route('/query', methods=['POST'])
@cache.cached(timeout=300, query_string=True)
def query_database():
#...
```
2. 分页查询支持
可添加分页参数:
```sql
OFFSET %s ROWS FETCH NEXT %s ROWS ONLY
```
遇到具体问题时可进一步优化的方向:
1. 若出现连接超时:检查MSSQL的TCP/IP协议是否启用
2. 若返回中文乱码:确保数据库字符集为UTF-8
3. 若性能低下:添加数据库索引(特别是SAP_Text和Prodtag字段)
阅读全文
相关推荐



















