我目前有两个py文件,帮我增加几个接口,一个接口是使用sse技术读取数据库iiot_command_match表的信息,可以实现前端页面的数据实时显示,iiot_command_match表的数据包括sn,label,type,priority,task_type_sn,execute_json,这是数据库中,这些数据的类型, id SERIAL PRIMARY KEY, sn VARCHAR(100) UNIQUE NOT NULL, label VARCHAR(100) DEFAULT '-', type VARCHAR(100) DEFAULT '-', priority SMALLINT DEFAULT 5, task_type_sn VARCHAR(100) DEFAULT '-', execute_json text DEFAULT '-' 第二个接口功能是新增iiot_command_match表中的数据,第三个接口功能是修改iiot_command_match表中的数据,第四个接口功能是删除iiot_command_match表中的数据,然后写一个前端页,来实现这4个接口,前端用vue3+elenebt,后端接口IP是http://127.0.0.1:8888 ,以下为service.py的代码为import json import asyncio from collections import deque # 添加删除任务类型功能 async def delete_task_type(pool, task_id: int): """删除任务类型""" async with pool.acquire() as conn: # 检查任务类型是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_task_type WHERE id = $1", task_id) if not exists: raise ValueError(f"任务类型ID {task_id} 不存在") # 执行删除操作 await conn.execute("DELETE FROM iiot_task_type WHERE id = $1", task_id) return True # service.py 新增代码 async def task_type_event_generator(request): """SSE事件生成器,实时推送iiot_task_type表变更""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): async with pool.acquire() as conn: # 查询iiot_task_type表 task_types = await conn.fetch("SELECT id, sn, label FROM iiot_task_type") # 格式化数据 formatted_data = [ {"id": t["id"], "sn": t["sn"], "label": t["label"]} for t in task_types ] # 生成SSE事件 yield f"data: {json.dumps(formatted_data)}\n\n" await asyncio.sleep(1) # 每秒更新 except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" async def create_task_type(pool, data: dict): """创建新任务类型""" async with pool.acquire() as conn: await conn.execute( "INSERT INTO iiot_task_type(sn, label) VALUES ($1, $2)", data["sn"], data["label"] ) async def update_task_type(pool, task_id: int, update_data: dict): """更新任务类型信息""" async with pool.acquire() as conn: query = "UPDATE iiot_task_type SET " params = [] updates = [] if "sn" in update_data: updates.append("sn = $" + str(len(params) + 1)) params.append(update_data["sn"]) if "label" in update_data: updates.append("label = $" + str(len(params) + 1)) params.append(update_data["label"]) if not updates: return query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1) params.append(task_id) await conn.execute(query, *params) # 新增:全局存储轮询结果 task_polling_results = deque(maxlen=100) # 存储最近100次轮询结果 # 新增:后台轮询任务 async def poll_iiot_task(pool): """后台轮询任务,每秒查询一次iiot_task表""" while True: try: async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" # 查询实际表而非视图 ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "id": t["id"], "sn": t["sn"], "status": t["status"], "create_time": t["create_time"].isoformat(), # 根据实际表结构添加其他字段 }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 存储结果(实际应用中可替换为数据库/缓存存储) task_polling_results.append({ "timestamp": asyncio.get_event_loop().time(), "data": tasks }) print(f"Polled {len(tasks)} tasks at {asyncio.get_event_loop().time()}") print(tasks) except Exception as e: print(f"Polling error: {str(e)}") await asyncio.sleep(1) # 每秒轮询一次 #获取全部任务 async def task_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_type_label": str(t["task_type_label"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" # 获取全部指令 async def command_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_command" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_sn": str(t["task_sn"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" 下面是api.py的代码 from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from .services import task_event_generator from .services import command_event_generator from fastapi import HTTPException from .services import task_type_event_generator, create_task_type, update_task_type, delete_task_type # 导入新增的删除函数 router = APIRouter() # 获取任务类型 @router.get("/sse/task-types") async def sse_task_types(request: Request): return StreamingResponse( task_type_event_generator(request), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # 添加任务类型 @router.post("/task-types") async def add_task_type(request: Request): try: data = await request.json() if not data.get("sn") or not data.get("label"): raise HTTPException(400, "SN和Label不能为空") await create_task_type(request.app.state.db_pool, data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 修改任务类型 @router.put("/task-types/{task_id}") async def modify_task_type(task_id: int, request: Request): try: update_data = await request.json() if not update_data: raise HTTPException(400, "无有效更新数据") await update_task_type(request.app.state.db_pool, task_id, update_data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 删除任务类型 @router.delete("/task-types/{task_id}") async def remove_task_type(task_id: int, request: Request): try: # 验证ID有效性 if not task_id or task_id <= 0: raise HTTPException(400, "无效的任务类型ID") # 执行删除操作 success = await delete_task_type(request.app.state.db_pool, task_id) if success: return {"status": "success", "message": "任务类型已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务信息 @router.get("/sse/tasks") async def sse_tasks(request: Request): return StreamingResponse( task_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 获取指令信息 @router.get("/sse/commands") async def sse_tasks(request: Request): return StreamingResponse( command_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } )
时间: 2025-08-06 13:01:48 AIGC 浏览: 21
<think>我们有两个文件:service.py和api.py。现在需要增加四个接口,分别对应iiot_command_match表的增删改查和SSE实时推送。
根据要求,我们需要:
1. 使用SSE技术读取iiot_command_match表的信息(实时显示)
2. 新增iiot_command_match表中的数据
3. 修改iiot_command_match表中的数据
4. 删除iiot_command_match表中的数据
首先,我们在service.py中编写四个函数:
a. 一个SSE事件生成器,用于实时推送iiot_command_match表的数据。
b. 一个创建新记录的函数。
c. 一个更新记录的函数。
d. 一个删除记录的函数。
然后,在api.py中为这四个功能添加路由。
另外,还需要一个前端页面,用Vue3+Element Plus实现,但这里我们主要完成后端接口。
注意:数据库表iiot_command_match的字段包括:id, sn, label, type, priority, task_type_sn, execute_json。
步骤:
1. 在service.py中添加以下函数:
- `command_match_event_generator(request)`: 用于SSE实时推送。
- `create_command_match(pool, data: dict)`: 创建新记录。
- `update_command_match(pool, command_id: int, update_data: dict)`: 更新记录。
- `delete_command_match(pool, command_id: int)`: 删除记录。
2. 在api.py中添加以下路由:
- GET /sse/command-match -> 对应SSE推送
- POST /command-match -> 创建新记录
- PUT /command-match/{command_id} -> 更新记录
- DELETE /command-match/{command_id} -> 删除记录
3. 注意:在api.py中导入service.py中新增的四个函数。
4. 前端部分:由于问题要求写一个前端页,但这里主要关注后端接口,前端部分可以简要描述,或者给出一个简单的示例。
考虑到代码长度,我将分步骤在service.py和api.py中补充代码。
首先,修改service.py,添加四个新函数:
注意:由于我们已经在service.py中有了类似的结构,我们可以模仿`task_type_event_generator`等函数来写。
具体实现:
1. `command_match_event_generator`:
每秒钟查询一次iiot_command_match表,并将结果以SSE格式推送。
2. `create_command_match`:
插入新记录,注意字段:sn, label, type, priority, task_type_sn, execute_json。
3. `update_command_match`:
根据command_id更新记录,可以更新部分字段。
4. `delete_command_match`:
根据command_id删除记录。
下面在service.py中添加代码:</think>根据你的需求,我将为`iiot_command_match`表实现增删改查和SSE实时推送接口,并提供前端页面实现方案。以下是完整的解决方案:
### 1. 修改 service.py
```python
# 在 service.py 中添加以下代码
async def command_match_event_generator(request):
"""SSE事件生成器,实时推送iiot_command_match表变更"""
pool = request.app.state.db_pool
try:
while not await request.is_disconnected():
async with pool.acquire() as conn:
# 查询iiot_command_match表
commands = await conn.fetch(
"SELECT id, sn, label, type, priority, task_type_sn, execute_json "
"FROM iiot_command_match"
)
# 格式化数据
formatted_data = [
{
"id": c["id"],
"sn": c["sn"],
"label": c["label"],
"type": c["type"],
"priority": c["priority"],
"task_type_sn": c["task_type_sn"],
"execute_json": json.loads(c["execute_json"]) if c["execute_json"] != '-' else {}
}
for c in commands
]
# 生成SSE事件
yield f"data: {json.dumps(formatted_data)}\n\n"
await asyncio.sleep(1) # 每秒更新
except Exception as e:
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
async def create_command_match(pool, data: dict):
"""创建新指令匹配"""
required_fields = ["sn", "label", "type", "priority", "task_type_sn", "execute_json"]
if not all(field in data for field in required_fields):
raise ValueError("缺少必要字段")
async with pool.acquire() as conn:
# 检查SN是否唯一
existing = await conn.fetchval(
"SELECT 1 FROM iiot_command_match WHERE sn = $1",
data["sn"]
)
if existing:
raise ValueError(f"SN {data['sn']} 已存在")
# 插入新记录
await conn.execute(
"INSERT INTO iiot_command_match(sn, label, type, priority, task_type_sn, execute_json) "
"VALUES ($1, $2, $3, $4, $5, $6)",
data["sn"],
data["label"],
data["type"],
data["priority"],
data["task_type_sn"],
json.dumps(data["execute_json"])
)
async def update_command_match(pool, command_id: int, update_data: dict):
"""更新指令匹配信息"""
async with pool.acquire() as conn:
# 检查记录是否存在
exists = await conn.fetchval("SELECT 1 FROM iiot_command_match WHERE id = $1", command_id)
if not exists:
raise ValueError(f"指令ID {command_id} 不存在")
# 构建更新语句
query = "UPDATE iiot_command_match SET "
params = []
updates = []
field_mapping = {
"sn": "sn",
"label": "label",
"type": "type",
"priority": "priority",
"task_type_sn": "task_type_sn",
"execute_json": "execute_json"
}
for field, db_field in field_mapping.items():
if field in update_data:
updates.append(f"{db_field} = ${len(params) + 1}")
# 特殊处理JSON字段
if field == "execute_json":
params.append(json.dumps(update_data[field]))
else:
params.append(update_data[field])
if not updates:
return
query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1)
params.append(command_id)
await conn.execute(query, *params)
async def delete_command_match(pool, command_id: int):
"""删除指令匹配"""
async with pool.acquire() as conn:
# 检查记录是否存在
exists = await conn.fetchval("SELECT 1 FROM iiot_command_match WHERE id = $1", command_id)
if not exists:
raise ValueError(f"指令ID {command_id} 不存在")
# 执行删除操作
await conn.execute("DELETE FROM iiot_command_match WHERE id = $1", command_id)
return True
```
### 2. 修改 api.py
```python
# 在 api.py 中添加以下路由
from .services import (
command_match_event_generator,
create_command_match,
update_command_match,
delete_command_match
)
# 指令匹配SSE推送
@router.get("/sse/command-match")
async def sse_command_match(request: Request):
return StreamingResponse(
command_match_event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
# 创建新指令匹配
@router.post("/command-match")
async def add_command_match(request: Request):
try:
data = await request.json()
await create_command_match(request.app.state.db_pool, data)
return {"status": "success"}
except ValueError as ve:
raise HTTPException(400, str(ve))
except Exception as e:
raise HTTPException(500, str(e))
# 更新指令匹配
@router.put("/command-match/{command_id}")
async def modify_command_match(command_id: int, request: Request):
try:
update_data = await request.json()
await update_command_match(request.app.state.db_pool, command_id, update_data)
return {"status": "success"}
except ValueError as ve:
raise HTTPException(400, str(ve))
except Exception as e:
raise HTTPException(500, str(e))
# 删除指令匹配
@router.delete("/command-match/{command_id}")
async def remove_command_match(command_id: int, request: Request):
try:
success = await delete_command_match(request.app.state.db_pool, command_id)
if success:
return {"status": "success", "message": "指令匹配已删除"}
else:
raise HTTPException(500, "删除操作未成功")
except ValueError as ve:
raise HTTPException(404, str(ve))
except Exception as e:
raise HTTPException(500, str(e))
```
### 3. 前端页面实现 (Vue3 + Element Plus)
```vue
<template>
<div class="command-match-container">
<!-- 实时数据表格 -->
<el-table :data="commandMatches" style="width: 100%" height="500">
<el-table-column prop="sn" label="SN" width="180" />
<el-table-column prop="label" label="标签" width="120" />
<el-table-column prop="type" label="类型" width="100" />
<el-table-column prop="priority" label="优先级" width="80" sortable />
<el-table-column prop="task_type_sn" label="任务类型SN" width="150" />
<el-table-column label="执行JSON" min-width="200">
<template #default="{row}">
<pre style="max-height: 100px; overflow: auto">{{ JSON.stringify(row.execute_json, null, 2) }}</pre>
</template>
</el-table-column>
<el-table-column label="操作" width="180" fixed="right">
<template #default="{row}">
<el-button size="small" @click="handleEdit(row)">编辑</el-button>
<el-button size="small" type="danger" @click="handleDelete(row.id)">删除</el-button>
</template>
</el-table-column>
</el-table>
<!-- 添加/编辑对话框 -->
<el-dialog v-model="dialogVisible" :title="formTitle">
<el-form :model="form" label-width="120px">
<el-form-item label="SN" required>
<el-input v-model="form.sn" />
</el-form-item>
<el-form-item label="标签">
<el-input v-model="form.label" />
</el-form-item>
<el-form-item label="类型">
<el-input v-model="form.type" />
</el-form-item>
<el-form-item label="优先级">
<el-input-number v-model="form.priority" :min="1" :max="10" />
</el-form-item>
<el-form-item label="任务类型SN">
<el-input v-model="form.task_type_sn" />
</el-form-item>
<el-form-item label="执行JSON">
<el-input
v-model="form.execute_json"
type="textarea"
:autosize="{ minRows: 4, maxRows: 8 }"
placeholder='例如: {"command": "start", "params": {"speed": 100}}'
/>
</el-form-item>
</el-form>
<template #footer>
<el-button @click="dialogVisible = false">取消</el-button>
<el-button type="primary" @click="submitForm">确认</el-button>
</template>
</el-dialog>
</div>
</template>
<script setup>
import { ref, onMounted, onBeforeUnmount } from 'vue'
import { ElMessage, ElMessageBox } from 'element-plus'
// 后端API地址
const API_BASE = 'http://127.0.0.1:8888'
// 实时数据
const commandMatches = ref([])
let eventSource = null
// 表单相关
const dialogVisible = ref(false)
const form = ref({
id: null,
sn: '',
label: '',
type: '',
priority: 5,
task_type_sn: '',
execute_json: '{}'
})
const isEditMode = ref(false)
const formTitle = ref('添加新指令匹配')
// 初始化SSE连接
const initSSE = () => {
if (eventSource) eventSource.close()
eventSource = new EventSource(`${API_BASE}/sse/command-match`)
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
commandMatches.value = data
} catch (e) {
console.error('SSE数据解析错误:', e)
}
}
eventSource.onerror = (err) => {
console.error('SSE连接错误:', err)
// 尝试重新连接
setTimeout(initSSE, 3000)
}
}
// 添加新记录
const handleAdd = () => {
form.value = {
id: null,
sn: '',
label: '',
type: '',
priority: 5,
task_type_sn: '',
execute_json: '{}'
}
isEditMode.value = false
formTitle.value = '添加新指令匹配'
dialogVisible.value = true
}
// 编辑记录
const handleEdit = (row) => {
form.value = {
id: row.id,
sn: row.sn,
label: row.label,
type: row.type,
priority: row.priority,
task_type_sn: row.task_type_sn,
execute_json: JSON.stringify(row.execute_json, null, 2)
}
isEditMode.value = true
formTitle.value = '编辑指令匹配'
dialogVisible.value = true
}
// 删除记录
const handleDelete = async (id) => {
try {
await ElMessageBox.confirm('确定要删除这条记录吗?', '警告', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning'
})
const response = await fetch(`${API_BASE}/command-match/${id}`, {
method: 'DELETE'
})
if (response.ok) {
ElMessage.success('删除成功')
} else {
const error = await response.json()
throw new Error(error.detail || '删除失败')
}
} catch (error) {
ElMessage.error(error.message || '删除操作出错')
}
}
// 提交表单
const submitForm = async () => {
try {
// 验证JSON格式
let executeJson
try {
executeJson = JSON.parse(form.value.execute_json)
} catch {
throw new Error('执行JSON格式不正确')
}
const payload = {
...form.value,
execute_json: executeJson
}
const url = isEditMode.value
? `${API_BASE}/command-match/${form.value.id}`
: `${API_BASE}/command-match`
const method = isEditMode.value ? 'PUT' : 'POST'
const response = await fetch(url, {
method,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
})
if (!response.ok) {
const error = await response.json()
throw new Error(error.detail || '操作失败')
}
ElMessage.success(isEditMode.value ? '更新成功' : '添加成功')
dialogVisible.value = false
} catch (error) {
ElMessage.error(error.message || '操作出错')
}
}
// 生命周期钩子
onMounted(() => {
initSSE()
})
onBeforeUnmount(() => {
if (eventSource) eventSource.close()
})
</script>
<style scoped>
.command-match-container {
padding: 20px;
background-color: #fff;
border-radius: 4px;
box-shadow: 0 2px 12px 0 rgba(0, 0, 0, 0.1);
}
</style>
```
### 接口说明
1. **SSE实时数据推送**
- 方法:GET
- 路径:`/sse/command-match`
- 功能:实时推送iiot_command_match表数据变化
2. **创建新记录**
- 方法:POST
- 路径:`/command-match`
- 请求体:
```json
{
"sn": "CMD001",
"label": "启动指令",
"type": "start",
"priority": 3,
"task_type_sn": "TASK01",
"execute_json": {"command": "start", "params": {"speed": 100}}
}
```
3. **更新记录**
- 方法:PUT
- 路径:`/command-match/{id}`
- 请求体:需要更新的字段(部分更新)
4. **删除记录**
- 方法:DELETE
- 路径:`/command-match/{id}`
### 注意事项
1. SSE连接会自动重连,网络中断后会自动恢复
2. 前端页面包含完整的增删改查功能
3. 执行JSON字段在前端使用textarea编辑,后端会自动转换为JSON对象
4. SN字段具有唯一性约束,重复创建会返回错误
5. 优先级字段限制为1-10的整数值
阅读全文
相关推荐



















