使用MediaType.TEXT_EVENT_STREAM_VALUE时,应该如何设置其他头

时间: 2025-06-06 16:35:01 AIGC 浏览: 158 评论: 6
<think>好的,用户现在问的是在使用MediaType.TEXT_EVENT_STREAM_VALUE时,应该如何设置其他头。我需要先理解这个问题的背景。 首先,用户之前已经询问过如何在Spring Boot中设置特定的响应头,比如Cache-Control和Connection。现在他们的问题转向了使用Server-Sent Events (SSE)时如何正确配置响应头,特别是与MediaType.TEXT_EVENT_STREAM_VALUE相关的内容。 我需要回忆一下SSE的相关知识。SSE是一种允许服务器向客户端推送事件的技术,通常用于实时更新。正确的响应头设置对于SSE的正常工作至关重要。比如,Content-Type必须设置为text/event-stream,这是SSE的规范要求的。此外,可能需要设置缓存控制,避免中间缓存干扰,比如Cache-Control: no-cache。同时,连接需要保持开启,所以Connection: keep-alive也很重要。另外,有时可能还需要设置X-Accel-Buffering这样的头来禁用某些代理的缓冲。 接下来,我需要考虑用户可能的深层需求。他们可能正在尝试实现一个SSE的接口,但遇到了问题,比如连接被提前关闭或者数据没有实时推送。他们可能没有意识到除了Content-Type之外,其他响应头的重要性。或者他们可能在使用过程中遇到了缓存问题,导致客户端没有及时接收到新的事件。 然后,我需要确定回答的结构。应该提供几种方法,类似于之前的回答,比如手动设置HttpServletResponse、使用ResponseEntity、全局过滤器配置,以及可能的额外注意事项。此外,需要强调每个头的作用,比如为什么需要Cache-Control和Connection的设置,以及可能需要的其他头如X-Accel-Buffering。 还要注意用户可能是在Spring Boot环境下使用@GetMapping或者其他注解,所以需要给出具体的代码示例。例如,使用ResponseEntity设置头,或者在控制器中注入HttpServletResponse来手动添加头。同时,可能需要提醒用户关于Tomcat或Undertow等服务器的特定配置,比如超时设置,以确保连接保持活动状态。 另外,用户可能没有意识到测试SSE的方法,所以提供一些测试建议,比如使用curl或者前端EventSource API的例子也是有帮助的。此外,扩展内容可以提到如何结合Spring的SseEmitter来简化实现,或者如何配置服务器参数来优化连接管理。 需要检查是否有遗漏的头信息,例如,除了必须的Content-Type之外,是否还有其他推荐的头,如Transfer-Encoding: chunked,或者是否需要处理CORS相关的头,比如Access-Control-Allow-Origin,如果SSE的客户端来自不同的域。 最后,总结注意事项,比如避免在SSE连接中使用压缩,或者确保服务器配置允许长连接,这些都可能影响SSE的正常工作。同时,强调正确设置响应头的重要性,以确保SSE的兼容性和稳定性。</think>在 Spring Boot 中使用 `MediaType.TEXT_EVENT_STREAM_VALUE`(即 SSE 协议)时,需特别注意以下响应头配置: --- ### **核心响应头设置** #### 1. **必须设置的响应头** ```java Content-Type: text/event-stream # 标识为 SSE 流 Cache-Control: no-cache # 禁用缓存 Connection: keep-alive # 保持长连接 ``` #### 2. **推荐设置的响应头** ```java X-Accel-Buffering: no # 禁用 Nginx 等代理的缓冲 Transfer-Encoding: chunked # 启用分块传输(HTTP/1.1 默认) ``` --- ### **实现方式** #### **方法 1:使用 ResponseEntity** ```java @GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public ResponseEntity<Flux<String>> streamData() { HttpHeaders headers = new HttpHeaders(); headers.setCacheControl("no-cache"); headers.setConnection("keep-alive"); headers.set("X-Accel-Buffering", "no"); Flux<String> dataStream = Flux.interval(Duration.ofSeconds(1)) .map(seq -> "data: Event " + seq + "\n\n"); return new ResponseEntity<>(dataStream, headers, HttpStatus.OK); } ``` #### **方法 2:手动设置 HttpServletResponse** ```java @GetMapping("/sse-endpoint") public void sseStream(HttpServletResponse response) { response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Connection", "keep-alive"); response.setHeader("X-Accel-Buffering", "no"); // 流式输出逻辑 try (PrintWriter writer = response.getWriter()) { for (int i = 0; i < 10; i++) { writer.write("data: Message " + i + "\n\n"); writer.flush(); Thread.sleep(1000); } } catch (Exception e) { // 处理异常 } } ``` --- ### **关键注意事项** 1. **连接保持** - 配置服务器超时时间(Tomcat 示例): ```yaml server: tomcat: keep-alive-timeout: 30000 # 30秒空闲超时 max-connections: 100 # 最大连接数 ``` 2. **禁止压缩** ```yaml server: compression: enabled: false # 禁用压缩(否则流式数据可能被缓冲) ``` 3. **跨域配置** ```java @CrossOrigin( origins = "*", allowedHeaders = "*", exposedHeaders = {"Cache-Control", "Connection"} // 暴露必要头 ) ``` --- ### **SSE 事件格式规范** 每个事件需遵循格式: ```plaintext event: <event_type> // 可选 data: <payload> // 必须 id: <unique_id> // 可选 retry: <milliseconds>// 重连间隔 \n\n // 双换行符结尾 ``` --- ### **测试方法** #### 使用 curl 验证: ```bash curl -v -N http://localhost:8080/stream ``` #### 前端监听示例: ```javascript const eventSource = new EventSource('/stream'); eventSource.onmessage = (e) => { console.log('Received:', e.data); }; ``` --- ### **扩展:使用 SseEmitter** 更推荐的 Spring 封装方式: ```java @GetMapping("/sse") public SseEmitter streamEvents() { SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时 // 自动设置以下头: // Content-Type: text/event-stream // Cache-Control: no-cache // Transfer-Encoding: chunked Executors.newSingleThreadExecutor().submit(() -> { try { for (int i = 0; i < 10; i++) { emitter.send(SseEmitter.event() .id(String.valueOf(i)) .data("Event " + i)); Thread.sleep(1000); } emitter.complete(); } catch (Exception e) { emitter.completeWithError(e); } }); return emitter; } ```
阅读全文

相关推荐

import tkinter as tk from tkinter import ttk, messagebox, filedialog from PIL import Image, ImageTk, ImageDraw, ImageFont import vlc import os import cv2 import json import time import threading import queue import random import subprocess import sqlite3 from datetime import datetime import pandas as pd import pymysql from threading import Thread, Event OUT_DIR = "./output" class EmployeeClockSystem: def __init__(self, root): self.root = root self.root.title("员工工牌识别打卡系统") self.root.geometry("1200x700") self.root.configure(bg="#f0f0f0") self.idle = Event() # 创建事件对象 self.idle.set() # 初始设置为空闲状态(允许采集) self.capture_thread = None self.cap = None # 设置输出目录 self.OUT_DIR = "./output" os.makedirs(self.OUT_DIR, exist_ok=True) # 创建样式 self.style = ttk.Style() self.style.configure("Title.TLabel", font=("微软雅黑", 18, "bold"), foreground="#2c3e50") self.style.configure("Subtitle.TLabel", font=("微软雅黑", 14), foreground="#34495e") self.style.configure("Info.TLabel", font=("微软雅黑", 12), foreground="#2c3e50") self.style.configure("Card.TFrame", background="#ffffff", borderwidth=1, relief="raised", padding=10) self.style.configure("Control.TFrame", background="#e0e0e0", borderwidth=1, relief="sunken", padding=10) # 主布局框架 - 使用PanedWindow实现可调整的分割 main_paned = tk.PanedWindow(root, orient=tk.HORIZONTAL, sashrelief=tk.RAISED, sashwidth=4) main_paned.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 左侧视频区域 (50%) left_frame = ttk.Frame(main_paned) main_paned.add(left_frame, stretch="always") # 右侧员工信息区域 (50%) right_frame = ttk.Frame(main_paned) main_paned.add(right_frame, stretch="always") # 视频流标题 ttk.Label(left_frame, text="实时视频监控", style="Title.TLabel").pack(pady=(0, 10), anchor=tk.W, padx=10) # 视频显示区域 video_card = ttk.Frame(left_frame, style="Card.TFrame") video_card.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10)) self.video_container = ttk.Frame(video_card) self.video_container.pack(fill=tk.BOTH, expand=True) # 视频控制面板 control_frame = ttk.Frame(left_frame, style="Control.TFrame") control_frame.pack(fill=tk.X, padx=10, pady=(0, 10)) # URL输入框 ttk.Label(control_frame, text="RTSP地址:").pack(side=tk.LEFT, padx=(0, 5)) self.url_entry = ttk.Entry(control_frame, width=40) self.url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10)) self.url_entry.insert(0, "rtsp://192.168.1.101/stream1") # 连接按钮 self.connect_button = ttk.Button(control_frame, text="启动监控", command=self.toggle_stream, width=12) self.connect_button.pack(side=tk.LEFT, padx=(0, 5)) # 截图按钮 self.snapshot_button = ttk.Button(control_frame, text="抓拍", command=self.take_snapshot, width=8, state=tk.DISABLED) self.snapshot_button.pack(side=tk.LEFT) # 员工信息标题 ttk.Label(right_frame, text="员工信息识别", style="Title.TLabel").pack(pady=(0, 10), anchor=tk.W, padx=10) # 员工信息卡片 info_card = ttk.Frame(right_frame, style="Card.TFrame") info_card.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10)) # 员工照片和基本信息 info_frame = ttk.Frame(info_card) info_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 左侧员工照片区域 avatar_frame = ttk.Frame(info_frame, width=180, height=200) avatar_frame.pack(side=tk.LEFT, padx=(0, 20), fill=tk.Y) self.avatar_label = ttk.Label(avatar_frame) self.avatar_label.pack(fill=tk.BOTH, expand=True) # 默认头像 self.update_avatar() # 右侧员工详细信息 detail_frame = ttk.Frame(info_frame) detail_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) ttk.Label(detail_frame, text="员工基本信息", style="Subtitle.TLabel").pack(anchor=tk.W, pady=(0, 10)) # 信息标签 - 使用Grid布局更精确控制 label_frame = ttk.Frame(detail_frame) label_frame.pack(fill=tk.X, pady=5) ttk.Label(label_frame, text="姓名:", width=8, anchor=tk.E, style="Info.TLabel").grid(row=0, column=0, sticky="e", padx=5, pady=5) self.name_value = ttk.Label(label_frame, text="", width=20, anchor=tk.W, style="Info.TLabel") self.name_value.grid(row=0, column=1, sticky="w", padx=5, pady=5) ttk.Label(label_frame, text="工号:", width=8, anchor=tk.E, style="Info.TLabel").grid(row=1, column=0, sticky="e", padx=5, pady=5) self.id_value = ttk.Label(label_frame, text="", width=20, anchor=tk.W, style="Info.TLabel") self.id_value.grid(row=1, column=1, sticky="w", padx=5, pady=5) ttk.Label(label_frame, text="部门:", width=8, anchor=tk.E, style="Info.TLabel").grid(row=2, column=0, sticky="e", padx=5, pady=5) self.dept_value = ttk.Label(label_frame, text="", width=20, anchor=tk.W, style="Info.TLabel") self.dept_value.grid(row=2, column=1, sticky="w", padx=5, pady=5) # ttk.Label(label_frame, text="打卡状态:", width=8, anchor=tk.E, style="Info.TLabel").grid(row=4, column=0, sticky="e", padx=5, pady=5) # self.status_value = ttk.Label(label_frame, text="未识别", width=20, anchor=tk.W, style="Info.TLabel") # self.status_value.grid(row=4, column=1, sticky="w", padx=5, pady=5) # 打卡按钮 # 打卡按钮 button_frame = ttk.Frame(detail_frame) button_frame.pack(fill=tk.X, pady=10) self.clock_button = ttk.Button(button_frame, text="导出打卡EXCEL", command=self.export_attendance_to_excel, width=15) self.clock_button.pack(side=tk.RIGHT, padx=(0, 20)) self.search_button = ttk.Button(button_frame, text="查询", command=self.open_search_window, width=15) self.search_button.pack(side=tk.RIGHT, padx=(0, 30)) # 考勤记录标题 ttk.Label(right_frame, text="今日考勤记录", style="Title.TLabel").pack(pady=(10, 10), anchor=tk.W, padx=10) # 考勤记录表格 record_card = ttk.Frame(right_frame, style="Card.TFrame") record_card.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10)) # 创建表格 columns = ("time", "id", "name", "dept", "status") self.record_tree = ttk.Treeview(record_card, columns=columns, show="headings", height=8) # 设置列标题 self.record_tree.heading("time", text="时间", anchor=tk.W) self.record_tree.heading("id", text="工号", anchor=tk.W) self.record_tree.heading("name", text="姓名", anchor=tk.W) self.record_tree.heading("dept", text="部门", anchor=tk.W) self.record_tree.heading("status", text="状态", anchor=tk.W) # 设置列宽 self.record_tree.column("time", width=150, anchor=tk.W) self.record_tree.column("id", width=100, anchor=tk.W) self.record_tree.column("name", width=100, anchor=tk.W) self.record_tree.column("dept", width=120, anchor=tk.W) self.record_tree.column("status", width=80, anchor=tk.W) # 添加滚动条 scrollbar = ttk.Scrollbar(record_card, orient="vertical", command=self.record_tree.yview) self.record_tree.configure(yscrollcommand=scrollbar.set) # 布局 self.record_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=10, pady=10) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 状态栏 self.status_var = tk.StringVar(value="系统就绪") status_bar = ttk.Label(root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(side=tk.BOTTOM, fill=tk.X) # 初始化变量 self.stream_active = False self.instance = vlc.Instance("--no-xlib") self.player = self.instance.media_player_new() # 模拟员工数据库 self.employees = { "1001": {"name": "张明", "dept": "技术部", "position": "高级工程师", "avatar": "avatar1.jpg"}, "1002": {"name": "李华", "dept": "市场部", "position": "市场经理", "avatar": "avatar2.jpg"}, "1003": {"name": "王芳", "dept": "财务部", "position": "会计", "avatar": "avatar3.jpg"}, "1004": {"name": "赵刚", "dept": "人力资源", "position": "招聘主管", "avatar": "avatar4.jpg"}, "1005": {"name": "陈晓", "dept": "产品部", "position": "产品经理", "avatar": "avatar5.jpg"}, } # 考勤记录 self.attendance_records = [] self.add_sample_records() # 初始化线程队列 self.gui_queue = queue.Queue() self.root.after(100, self.process_queue) def process_queue(self): """处理队列中的GUI更新任务""" while not self.gui_queue.empty(): try: task = self.gui_queue.get_nowait() if task["type"] == "update_employee_info": self.name_value.config(text=task["name"]) self.id_value.config(text=task["id"]) self.dept_value.config(text=task["dept"]) self.position_value.config(text=task["position"]) self.status_value.config(text=task["status"]) self.show_employee_avatar(task["avatar"]) self.clock_button.config(state=tk.NORMAL) elif task["type"] == "update_status": self.status_var.set(task["message"]) elif task["type"] == "clock_in": self.clock_in_task(task["emp_id"], task["emp_name"], task["emp_dept"]) except queue.Empty: pass self.root.after(100, self.process_queue) def update_avatar(self, image_path=None): if image_path and os.path.exists(image_path): try: # 尝试加载图片 print("opennnnnnnnnnnn") img = Image.open(image_path) print("opennnnnnnnnnnn") # 调整图片大小以适应显示区域 img = self.resize_image(img, target_width=180, target_height=200) # 转换为Tkinter PhotoImage photo = ImageTk.PhotoImage(img) # 更新显示 self.avatar_label.config(image=photo) self.avatar_label.image = photo # 保持引用 print(f"成功加载头像: {image_path}") return True except Exception as e: print(f"加载图片失败: {e}") # 加载失败时显示默认头像 self.show_default_avatar(error=True) return False else: # 没有提供图片路径或路径无效 self.show_default_avatar() return False def show_default_avatar(self): """显示默认头像""" default_img = Image.new('RGB', (180, 200), color='#3498db') draw = ImageDraw.Draw(default_img) try: font = ImageFont.truetype("arial.ttf", 24) except: font = ImageFont.load_default() draw.text((40, 85), "无数据", fill="white", font=font) default_photo = ImageTk.PhotoImage(default_img) self.avatar_label.config(image=default_photo) self.avatar_label.image = default_photo def resize_image(self, img, target_width, target_height): # """调整图片大小,保持宽高比并填充背景""" # 计算缩放比例 width, height = img.size scale = min(target_width / width, target_height / height) # 计算新尺寸 new_width = int(width * scale) new_height = int(height * scale) # 调整图片大小 img = img.resize((new_width, new_height), Image.LANCZOS) # 创建新图片并粘贴调整后的图片到中心 new_img = Image.new('RGB', (target_width, target_height), color='white') position = ( (target_width - new_width) // 2, (target_height - new_height) // 2 ) new_img.paste(img, position) return new_img def open_search_window(self): # """打开查找员工记录的弹窗""" search_window = tk.Toplevel(self.root) search_window.title("查找员工考勤记录") search_window.geometry("600x400") search_window.resizable(False, False) # 输入框 input_frame = ttk.Frame(search_window) input_frame.pack(pady=10, padx=10, fill=tk.X) ttk.Label(input_frame, text="姓名或工号:", width=12, anchor=tk.E).pack(side=tk.LEFT) search_entry = ttk.Entry(input_frame, width=30) search_entry.pack(side=tk.LEFT, padx=5) # 查询按钮 search_btn = ttk.Button(input_frame, text="查询", command=lambda: self.perform_search(search_entry, tree)) search_btn.pack(side=tk.LEFT) # 表格 tree_frame = ttk.Frame(search_window) tree_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=True) tree = ttk.Treeview(tree_frame, columns=("工号", "姓名", "部门","时间", "次数"), show="headings", height=15) tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # 设置列标题 for col in tree["columns"]: tree.heading(col, text=col) tree.column(col, width=100, anchor=tk.CENTER) # 滚动条 scrollbar = ttk.Scrollbar(tree_frame, orient="vertical", command=tree.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) tree.configure(yscrollcommand=scrollbar.set) # 提示信息 self.search_status_label = ttk.Label(search_window, text="", foreground="red") self.search_status_label.pack(pady=5) # 保存状态 self.current_search_tree = tree def perform_search(self, entry_widget, tree_widget): keyword = entry_widget.get().strip() if not keyword: print("//////////////") self.search_status_label.config(text="请输入姓名或工号") return try: conn = pymysql.connect( host='localhost', user='root', password='139800', database='employeeinfomation' ) print("**************") cursor = conn.cursor() print("**************") query = """ SELECT * FROM workcard WHERE NAME LIKE %s """ cursor.execute(query, (f"%{keyword}%")) results = cursor.fetchall() print(results) # 清空表格 for item in tree_widget.get_children(): tree_widget.delete(item) # 插入查询结果 for record in results: tree_widget.insert("", tk.END, values=record) if not results: self.search_status_label.config(text=f"未找到包含“{keyword}”的记录") else: self.search_status_label.config(text=f"找到 {len(results)} 条记录") conn.close() except Exception as e: self.search_status_label.config(text=f"查询失败:{str(e)}") def add_sample_records(self): """添加示例考勤记录""" records = [ ("09:00:24", "1001", "张明", "技术部", "正常"), ("09:01:35", "1002", "李华", "市场部", "迟到"), ("09:05:47", "1003", "王芳", "财务部", "正常"), ("12:01:15", "1001", "张明", "技术部", "外出"), ("13:30:08", "1004", "赵刚", "人力资源", "正常"), ] for record in records: self.attendance_records.append(record) self.record_tree.insert("", tk.END, values=record) def add_new_records(self, record): if (record.get('id') and record.get('name') and record.get('dept')): new_record = ( datetime.now().strftime("%H:%M:%S"), # 当前时间 record.get('id'), # 员工ID record.get('name'), # 姓名 record.get('dept'), # 部门 "正常" # 考勤状态 ) self.attendance_records.append(new_record) self.record_tree.insert("", tk.END, values=new_record) else: # 可选:添加错误处理(如日志记录或弹窗提示) print("错误:缺少必要的字段信息,记录未添加") def export_attendance_to_excel(self): # """从本地数据库导出考勤记录到Excel文件""" # 弹出文件保存路径 file_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel 文件", "*.xlsx"), ("所有文件", "*.*")], title="保存考勤记录为Excel" ) if not file_path: return # 用户取消了保存 try: # 1. 连接 MySQL 数据库(根据你的实际配置填写) # conn = mysql.connector.connect( # host="localhost", # 数据库地址 # user="root", # 用户名 # password="yourpass", # 密码 # database="attendance_db" # 数据库名称 # ) conn = pymysql.connect( host='localhost', user='root', password='139800', database='employeeinfomation' ) # 2. 使用 pandas 直接读取 SQL 查询结果 query = "SELECT * FROM workcard" df = pd.read_sql_query(query, conn) # 3. 关闭数据库连接 conn.close() # 4. 如果没有数据 if df.empty: messagebox.showwarning("导出失败", "数据库中没有可导出的考勤记录!") return # 5. 导出为 Excel 文件 df.to_excel(file_path, index=False) messagebox.showinfo("导出成功", f"考勤记录已成功导出到:\n{file_path}") except Exception as e: messagebox.showerror("导出失败", f"导出考勤记录时发生错误:\n{str(e)}") def search_count(self, name): conn = pymysql.connect( host='localhost', user='root', password='139800', database='employeeinfomation' ) # 2. 使用 pandas 直接读取 SQL 查询结果 cursor = conn.cursor() query = "SELECT COUNT FROM workcard WHERE NAME LIKE %s " cursor.execute(query, (f"%{name}%")) count = cursor.fetchall() # 3. 关闭数据库连接 conn.close() return count def toggle_stream(self): """切换视频流状态""" if self.stream_active: self.stop_stream() else: self.start_stream() def start_stream(self): """启动视频流""" url = self.url_entry.get().strip() if not url: messagebox.showerror("错误", "请输入有效的视频流URL") return try: media = self.instance.media_new(url) self.player.set_media(media) win_id = self.video_container.winfo_id() if os.name == 'nt': win_id = int(win_id) self.player.set_hwnd(win_id) else: self.player.set_xwindow(win_id) self.player.play() self.stream_active = True self.connect_button.config(text="停止监控") self.snapshot_button.config(state=tk.NORMAL) self.status_var.set(f"正在播放: {url}") # 启动视频流线程 threading.Thread(target=self.video_thread, daemon=True).start() # 启动识别线程 # threading.Thread(target=self.recognition_thread, daemon=True).start() # 启动采集线程 self.capture_thread = Thread(target=self.capture_loop, daemon=True) self.capture_thread.start() except Exception as e: messagebox.showerror("连接错误", f"无法连接到视频流: {str(e)}") self.status_var.set("连接失败") def video_thread(self): """视频流播放线程""" pass # 视频流由VLC内部处理,无需额外操作 # def recognition_thread(self, out_dir: str = OUT_DIR): # """使用实际工牌识别代码进行识别的线程""" # cap = cv2.VideoCapture(self.url_entry.get().strip()) # if not cap.isOpened(): # print("[错误] 无法打开视频流") # return # # 加载预训练的人脸检测模型 # face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # while self.stream_active: # ret, frame = cap.read() # if not ret: # print("[调试信息] 无法读取视频帧") # continue # # 转为灰度图,提高检测效率 # gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # faces = 0 # # 检测人脸 # faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) # if len(faces) > 0: # print(f"[调试信息] 检测到 {len(faces)} 张人脸") # # 保存当前帧供后续处理 # temp_image_path = "temp_frame.jpg" # cv2.imwrite(temp_image_path, frame) # print(f"[调试信息] 图像已保存至 {temp_image_path}") # # 调用main.py 进行图像识别 # try: # print(f"[调试信息] 正在调用 main.py 处理 {temp_image_path}") # subprocess.run(["python", "main.py", temp_image_path], check=True) # print("[调试信息] main.py 执行完成") # except subprocess.CalledProcessError as e: # print(f"[错误信息] main.py 执行失败: {e}") # # 读取main.py 输出的JSON文件 # stem = os.path.splitext(os.path.basename(temp_image_path))[0] # final_json = os.path.join(OUT_DIR, f"{stem}_face_result.json") # final_picture = os.path.join(OUT_DIR, f"{stem}_face.jpg") # img = Image.open(final_picture) # print(img) # self.update_avatar(final_picture) # if os.path.exists(final_json): # print(f"[调试信息] JSON文件已找到: {final_json}") # with open(final_json, "r", encoding="utf-8") as f: # result = json.load(f) # ocr_info = result.get("OCR", {}) # emp_id = ocr_info.get("id") # emp_name = ocr_info.get("name") # emp_dept = ocr_info.get("department") # # 打印识别结果 # print(f"[调试信息] 识别结果: ID={emp_id}, 姓名={emp_name}, 部门={emp_dept}") # # 将识别结果发送到GUI队列 # task = { # "type": "update_employee_info", # "name": emp_name, # "id": emp_id, # "dept": emp_dept, # "position": "未知", # "avatar": os.path.join(OUT_DIR, f"{stem}_face.jpg") # } # self.gui_queue.put(task) # # 添加示例记录 # count = self.search_count(emp_name) # if(count == 1): # self.add_new_records(task) # messagebox.showinfo("打卡成功", f"{emp_name} 已成功打卡!") # else: # messagebox.showinfo("Warnning!", f"{emp_name} 重复打卡!") # # time.sleep(10) # 控制识别频率 # cap.release() def capture_loop(self): # """视频采集主循环,使用事件控制采集频率""" self.cap = cv2.VideoCapture(self.url_entry.get().strip()) if not self.cap.isOpened(): print("[错误] 无法打开视频流") return # 加载预训练的人脸检测模型 face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') while self.stream_active: # 等待空闲状态(允许采集) self.idle.wait() # time.sleep(5) # 读取一帧 ret, frame = self.cap.read() if not ret: print("[调试信息] 无法读取视频帧") time.sleep(0.1) continue # 转为灰度图,提高检测效率 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 检测人脸 faces = face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30) ) if len(faces) > 0: print(f"[调试信息] 检测到 {len(faces)} 张人脸") # 立即设置为忙碌状态(禁止进一步采集) self.idle.clear() # 启动处理线程 Thread( target=self.process_frame, args=(frame.copy(),), # 复制帧以避免后续修改 daemon=True ).start() else: # 未检测到人脸时短暂休眠,控制抽帧频率 time.sleep(0.05) def process_frame(self, frame): # """处理检测到人脸的帧""" try: # 保存当前帧 temp_image_path = "temp_frame.jpg" cv2.imwrite(temp_image_path, frame) print(f"[调试信息] 图像已保存至 {temp_image_path}") # 调用main.py进行图像识别 print(f"[调试信息] 正在调用 main.py 处理 {temp_image_path}") subprocess.run(["python", "main.py", temp_image_path], check=True) print("[调试信息] main.py 执行完成") # 读取main.py输出的JSON文件 stem = os.path.splitext(os.path.basename(temp_image_path))[0] final_json = os.path.join(OUT_DIR, f"{stem}_face_result.json") final_picture = os.path.join(OUT_DIR, f"{stem}_face.jpg") if os.path.exists(final_json): print(f"[调试信息] JSON文件已找到: {final_json}") with open(final_json, "r", encoding="utf-8") as f: result = json.load(f) ocr_info = result.get("OCR", {}) emp_id = ocr_info.get("id") emp_name = ocr_info.get("name") emp_dept = ocr_info.get("department") # 打印识别结果 print(f"[调试信息] 识别结果: ID={emp_id}, 姓名={emp_name}, 部门={emp_dept}") # 更新GUI img = Image.open(final_picture) self.update_avatar(final_picture) # 将识别结果发送到GUI队列 task = { "type": "update_employee_info", "name": emp_name, "id": emp_id, "dept": emp_dept, "position": "未知", "avatar": final_picture } self.gui_queue.put(task) # 检查打卡记录 count = self.search_count(emp_name) if count == 0: self.add_new_records(task) messagebox.showinfo("打卡成功", f"{emp_name} 已成功打卡!") else: messagebox.showinfo("警告", f"{emp_name} 重复打卡!") else: print(f"[错误] 未找到JSON文件: {final_json}") # 处理失败时也恢复采集 except subprocess.CalledProcessError as e: print(f"[错误] main.py 执行失败: {e}") except Exception as e: print(f"[错误] 处理过程中发生异常: {str(e)}") finally: # 无论处理成功与否,都恢复空闲状态 self.idle.set() def stop(self): # """停止采集和处理""" self.stream_active = False self.idle.set() # 确保线程可以退出 if self.cap and self.cap.isOpened(): self.cap.release() def stop_stream(self): """停止视频流""" if self.player: self.player.stop() self.stream_active = False self.connect_button.config(text="启动监控") self.snapshot_button.config(state=tk.DISABLED) self.clock_button.config(state=tk.DISABLED) self.status_var.set("已停止视频流") # 清空员工信息 self.name_value.config(text="") self.id_value.config(text="") self.dept_value.config(text="") self.position_value.config(text="") self.status_value.config(text="未识别") self.show_default_avatar() def take_snapshot(self): """抓拍当前帧""" if self.stream_active: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"snapshot_{timestamp}.png" self.player.video_take_snapshot(0, filename, 0, 0) messagebox.showinfo("抓拍成功", f"已保存截图: {filename}") self.status_var.set(f"截图已保存: {filename}") def show_employee_avatar(self, avatar_path): """显示员工头像""" try: colors = ["#3498db", "#2ecc71", "#e74c3c", "#f39c12", "#9b59b6"] color = random.choice(colors) img = Image.new('RGB', (180, 200), color=color) draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("arial.ttf", 20) except: font = ImageFont.load_default() draw.text((40, 85), "员工照片", fill="white", font=font) photo = ImageTk.PhotoImage(img) self.avatar_label.config(image=photo) self.avatar_label.image = photo except Exception as e: print(f"头像加载错误: {e}") def clock_in(self): """员工打卡""" emp_id = self.id_value.cget("text") emp_name = self.name_value.cget("text") emp_dept = self.dept_value.cget("text") task = {"type": "clock_in", "emp_id": emp_id, "emp_name": emp_name, "emp_dept": emp_dept} self.gui_queue.put(task) def clock_in_task(self, emp_id, emp_name, emp_dept): """执行打卡逻辑""" current_time = time.strftime("%H:%M:%S") hour = int(time.strftime("%H")) minute = int(time.strftime("%M")) status = "迟到" if (hour > 9 or (hour == 9 and minute > 0)) else "正常" record = (current_time, emp_id, emp_name, emp_dept, status) self.attendance_records.append(record) self.record_tree.insert("", tk.END, values=record) self.status_value.config(text=f"已打卡 ({status})") self.status_var.set(f"{emp_name} 打卡成功! 时间: {current_time}") self.clock_button.config(state=tk.DISABLED) self.record_tree.see(self.record_tree.get_children()[-1]) if __name__ == "__main__": root = tk.Tk() app = EmployeeClockSystem(root) root.mainloop() 为什么使用了event还是一直截取同一个时刻的照片,而不是隔一段时间再截图

目前我触发了sse错误,可能错误是路径,,这是前端代码<template> <el-table :data="logData" style="width: 100%" height="calc(100vh - 120px)"> <el-table-column prop="sn" label="设备SN" width="180" /> <el-table-column prop="type_label" label="日志类型" width="120" /> <el-table-column prop="label" label="标签" width="180" /> <el-table-column prop="message" label="消息内容" /> <el-table-column prop="timestamp" label="时间" width="180"> <template> {{ new Date().toLocaleTimeString() }} </template> </el-table-column> </el-table> <el-tag v-for="type in logTypes" :key="type.value" :type="activeType === type.value ? 'primary' : 'info'" @click="changeType(type.value)" class="type-tag" > {{ type.label }} </el-tag> </template> <script setup> import { ref, onMounted, onBeforeUnmount } from 'vue' import { useRoute } from 'vue-router' const route = useRoute() const logData = ref([]) const eventSource = ref(null) const activeType = ref(1) // 日志类型配置 const logTypes = ref([ { value: 1, label: '系统日志' }, { value: 2, label: '设备日志' }, { value: 3, label: '操作日志' }, { value: 4, label: '告警日志' } ]) // 初始化时从路由参数获取类型 onMounted(() => { const typeParam = parseInt(route.params.type) if (typeParam && typeParam >= 1 && typeParam <= 4) { activeType.value = typeParam } connectSSE() }) // 切换日志类型 const changeType = (type) => { activeType.value = type // 更新URL但不刷新页面 window.history.pushState({}, '', /logs/${type}) // 重新连接SSE disconnectSSE() connectSSE() } // 连接SSE const connectSSE = () => { disconnectSSE() eventSource.value = new EventSource(http://127.0.0.1:8888/sse/logs/${activeType.value}) eventSource.value.onmessage = (event) => { try { const data = JSON.parse(event.data) logData.value = data } catch (e) { console.error('解析日志数据失败:', e) } } eventSource.value.onerror = (err) => { console.error('SSE连接错误:', err) // 尝试重新连接 setTimeout(connectSSE, 3000) } } // 断开SSE连接 const disconnectSSE = () => { if (eventSource.value) { eventSource.value.close() eventSource.value = null } } // 组件卸载时断开连接 onBeforeUnmount(() => { disconnectSSE() }) </script> <style scoped> .log-container { padding: 20px; height: 100%; } .type-tabs { margin-top: 15px; display: flex; gap: 10px; } .type-tag { cursor: pointer; padding: 10px 15px; font-size: 14px; } </style> 这是后端代码from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from .services import task_event_generator from .services import command_event_generator from fastapi import HTTPException from .services import task_type_event_generator, create_task_type, update_task_type, delete_task_type,log_event_generator # 导入新增的删除函数 router = APIRouter() # 在 api.py 中添加以下路由 from .services import ( command_match_event_generator, create_command_match, update_command_match, delete_command_match ) # 在 api.py 中添加以下路由 @router.get("/sse/logs/{log_type}") async def sse_logs_by_type(request: Request, log_type: int): """根据日志类型获取日志的SSE流""" return StreamingResponse( log_event_generator(request, log_type), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 指令匹配SSE推送 @router.get("/sse/command-match") async def sse_command_match(request: Request): return StreamingResponse( command_match_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 创建新指令匹配 @router.post("/command-match") async def add_command_match(request: Request): try: data = await request.json() await create_command_match(request.app.state.db_pool, data) return {"status": "success"} except ValueError as ve: raise HTTPException(400, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 更新指令匹配 @router.put("/command-match/{command_id}") async def modify_command_match(command_id: int, request: Request): try: update_data = await request.json() await update_command_match(request.app.state.db_pool, command_id, update_data) return {"status": "success"} except ValueError as ve: raise HTTPException(400, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 删除指令匹配 @router.delete("/command-match/{command_id}") async def remove_command_match(command_id: int, request: Request): try: success = await delete_command_match(request.app.state.db_pool, command_id) if success: return {"status": "success", "message": "指令匹配已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务类型 @router.get("/sse/task-types") async def sse_task_types(request: Request): return StreamingResponse( task_type_event_generator(request), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # 添加任务类型 @router.post("/task-types") async def add_task_type(request: Request): try: data = await request.json() if not data.get("sn") or not data.get("label"): raise HTTPException(400, "SN和Label不能为空") await create_task_type(request.app.state.db_pool, data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 修改任务类型 @router.put("/task-types/{task_id}") async def modify_task_type(task_id: int, request: Request): try: update_data = await request.json() if not update_data: raise HTTPException(400, "无有效更新数据") await update_task_type(request.app.state.db_pool, task_id, update_data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 删除任务类型 @router.delete("/task-types/{task_id}") async def remove_task_type(task_id: int, request: Request): try: # 验证ID有效性 if not task_id or task_id <= 0: raise HTTPException(400, "无效的任务类型ID") # 执行删除操作 success = await delete_task_type(request.app.state.db_pool, task_id) if success: return {"status": "success", "message": "任务类型已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务信息 @router.get("/sse/tasks") async def sse_tasks(request: Request): return StreamingResponse( task_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 获取指令信息 @router.get("/sse/commands") async def sse_tasks(request: Request): return StreamingResponse( command_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } )

流式接口访问报错Unable to handle the Spring Security Exception because the response is already committed., 这是我的相关配置@Bean protected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception { httpSecurity // CSRF禁用,因为不使用session .csrf(AbstractHttpConfigurer::disable) // 禁用HTTP响应标头 .headers((headersCustomizer) -> headersCustomizer.cacheControl(HeadersConfigurer.CacheControlConfig::disable).frameOptions(HeadersConfigurer.FrameOptionsConfig::sameOrigin)) // 基于token,所以不需要session .sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS)) // 注解标记允许匿名访问的url .authorizeHttpRequests((requests) -> { // 对于登录login 注册register 验证码captchaImage 允许匿名访问 requests.requestMatchers("/login", "/register", "/captchaImage", "/getRegisterCondition").permitAll() // 静态资源,可匿名访问 .requestMatchers(HttpMethod.GET, "/profile/**").permitAll() // 除上面外的所有请求全部需要鉴权认证 .anyRequest().authenticated(); }) // 认证失败处理类 .exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler)) // 添加Logout filter .logout(logout -> logout.logoutUrl("/logout").logoutSuccessHandler(logoutSuccessHandler)) // 添加JWT filter .addFilterBefore(authenticationTokenFilter, UsernamePasswordAuthenticationFilter.class) // 添加CORS filter .addFilterBefore(corsFilter, JwtAuthenticationTokenFilter.class) .addFilterBefore(corsFilter, LogoutFilter.class); return httpSecurity.build(); } 下面的是接口代码 @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> stream(@RequestBody ChatMessage chatMessage) { if (StringUtils.isEmpty(chatMessage.getMessage())) { throw new ServiceException("消息不能为空"); } return chatModel.stream(chatMessage.getMessage()) .doOnSubscribe(s -> System.out.println("连接成功")) .onErrorResume(e -> Flux.just("连接失败")) .doFinally(f -> System.out.println("连接关闭")); }

我目前有两个py文件,帮我增加几个接口,一个接口是使用sse技术读取数据库iiot_command_match表的信息,可以实现前端页面的数据实时显示,iiot_command_match表的数据包括sn,label,type,priority,task_type_sn,execute_json,这是数据库中,这些数据的类型, id SERIAL PRIMARY KEY, sn VARCHAR(100) UNIQUE NOT NULL, label VARCHAR(100) DEFAULT '-', type VARCHAR(100) DEFAULT '-', priority SMALLINT DEFAULT 5, task_type_sn VARCHAR(100) DEFAULT '-', execute_json text DEFAULT '-' 第二个接口功能是新增iiot_command_match表中的数据,第三个接口功能是修改iiot_command_match表中的数据,第四个接口功能是删除iiot_command_match表中的数据,然后写一个前端页,来实现这4个接口,前端用vue3+elenebt,后端接口IP是http://127.0.0.1:8888 ,以下为service.py的代码为import json import asyncio from collections import deque # 添加删除任务类型功能 async def delete_task_type(pool, task_id: int): """删除任务类型""" async with pool.acquire() as conn: # 检查任务类型是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_task_type WHERE id = $1", task_id) if not exists: raise ValueError(f"任务类型ID {task_id} 不存在") # 执行删除操作 await conn.execute("DELETE FROM iiot_task_type WHERE id = $1", task_id) return True # service.py 新增代码 async def task_type_event_generator(request): """SSE事件生成器,实时推送iiot_task_type表变更""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): async with pool.acquire() as conn: # 查询iiot_task_type表 task_types = await conn.fetch("SELECT id, sn, label FROM iiot_task_type") # 格式化数据 formatted_data = [ {"id": t["id"], "sn": t["sn"], "label": t["label"]} for t in task_types ] # 生成SSE事件 yield f"data: {json.dumps(formatted_data)}\n\n" await asyncio.sleep(1) # 每秒更新 except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" async def create_task_type(pool, data: dict): """创建新任务类型""" async with pool.acquire() as conn: await conn.execute( "INSERT INTO iiot_task_type(sn, label) VALUES ($1, $2)", data["sn"], data["label"] ) async def update_task_type(pool, task_id: int, update_data: dict): """更新任务类型信息""" async with pool.acquire() as conn: query = "UPDATE iiot_task_type SET " params = [] updates = [] if "sn" in update_data: updates.append("sn = $" + str(len(params) + 1)) params.append(update_data["sn"]) if "label" in update_data: updates.append("label = $" + str(len(params) + 1)) params.append(update_data["label"]) if not updates: return query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1) params.append(task_id) await conn.execute(query, *params) # 新增:全局存储轮询结果 task_polling_results = deque(maxlen=100) # 存储最近100次轮询结果 # 新增:后台轮询任务 async def poll_iiot_task(pool): """后台轮询任务,每秒查询一次iiot_task表""" while True: try: async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" # 查询实际表而非视图 ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "id": t["id"], "sn": t["sn"], "status": t["status"], "create_time": t["create_time"].isoformat(), # 根据实际表结构添加其他字段 }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 存储结果(实际应用中可替换为数据库/缓存存储) task_polling_results.append({ "timestamp": asyncio.get_event_loop().time(), "data": tasks }) print(f"Polled {len(tasks)} tasks at {asyncio.get_event_loop().time()}") print(tasks) except Exception as e: print(f"Polling error: {str(e)}") await asyncio.sleep(1) # 每秒轮询一次 #获取全部任务 async def task_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_type_label": str(t["task_type_label"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" # 获取全部指令 async def command_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_command" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_sn": str(t["task_sn"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" 下面是api.py的代码 from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from .services import task_event_generator from .services import command_event_generator from fastapi import HTTPException from .services import task_type_event_generator, create_task_type, update_task_type, delete_task_type # 导入新增的删除函数 router = APIRouter() # 获取任务类型 @router.get("/sse/task-types") async def sse_task_types(request: Request): return StreamingResponse( task_type_event_generator(request), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # 添加任务类型 @router.post("/task-types") async def add_task_type(request: Request): try: data = await request.json() if not data.get("sn") or not data.get("label"): raise HTTPException(400, "SN和Label不能为空") await create_task_type(request.app.state.db_pool, data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 修改任务类型 @router.put("/task-types/{task_id}") async def modify_task_type(task_id: int, request: Request): try: update_data = await request.json() if not update_data: raise HTTPException(400, "无有效更新数据") await update_task_type(request.app.state.db_pool, task_id, update_data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 删除任务类型 @router.delete("/task-types/{task_id}") async def remove_task_type(task_id: int, request: Request): try: # 验证ID有效性 if not task_id or task_id <= 0: raise HTTPException(400, "无效的任务类型ID") # 执行删除操作 success = await delete_task_type(request.app.state.db_pool, task_id) if success: return {"status": "success", "message": "任务类型已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务信息 @router.get("/sse/tasks") async def sse_tasks(request: Request): return StreamingResponse( task_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 获取指令信息 @router.get("/sse/commands") async def sse_tasks(request: Request): return StreamingResponse( command_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } )

android 15中报错:// CRASH: com.android.mms (pid 32001) // Short Msg: java.lang.NullPointerException // Long Msg: java.lang.NullPointerException: Attempt to invoke interface method 'int java.util.List.size()' on a null object reference // Build Label: SONIM/X53001/X530:15/X53.0-01-15.0-99.01.11/jenkins:userdebug/test-keys // Build Changelist: jenkins // Build Time: 1754634422000 // java.lang.NullPointerException: Attempt to invoke interface method 'int java.util.List.size()' on a null object reference // at com.android.mms.ui.AttachmentViewActivity.onDataLoadFinished(AttachmentViewActivity.java:281) // at com.android.mms.ui.BaseAttachmentViewActivity$GetDataAsyncTask.onPostExecute(BaseAttachmentViewActivity.java:222) // at com.android.mms.ui.BaseAttachmentViewActivity$GetDataAsyncTask.onPostExecute(BaseAttachmentViewActivity.java:190) // at android.os.AsyncTask.finish(AsyncTask.java:771) // at android.os.AsyncTask.-$$Nest$mfinish(Unknown Source:0) // at android.os.AsyncTask$InternalHandler.handleMessage(AsyncTask.java:788) // at android.os.Handler.dispatchMessage(Handler.java:107) // at android.os.Looper.loopOnce(Looper.java:232) // at android.os.Looper.loop(Looper.java:317) // at android.app.ActivityThread.main(ActivityThread.java:8794) // at java.lang.reflect.Method.invoke(Native Method) // at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:580) // at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:892) ,具体代码如下:AttachmentViewActivity.java:/* * Sonim Technologies, Inc. CONFIDENTIAL * Copyright (c) 2017 All Rights Reserved. * * The source code contained or described herein and all documents * related to the source code ("Material") is property of Sonim * Technologies, Inc. and its affiliates, may be covered by U.S. * or Foreign Patents or patents in process, and is protected by trade * secret or copyright law. No part of the Material may be used, * copied, reproduced, modified, published, uploaded, posted, * transmitted, distributed, or disclosed in any way without * prior written permission by Sonim Technologies, Inc. * * Unless required by applicable law or agreed to in writing, * the Materials are provided "AS IS," and WITHOUT WARRANTIES * OR CONDITIONS OF ANY KIND, either express or implied. No license * under any patent, copyright, trade secret or other * intellectual property right is granted to or conferred upon you * by disclosure or delivery of the Material, either expressly, by * implication, inducement, estoppel or otherwise. Any license * under such intellectual property rights must be express and * approved by Sonim Technologies, Inc. in writing. * */ package com.android.mms.ui; import android.content.Context; import android.content.Intent; import android.os.Bundle; import androidx.recyclerview.widget.DefaultItemAnimator; import androidx.recyclerview.widget.LinearLayoutManager; import androidx.recyclerview.widget.RecyclerView; import android.text.Html; import android.util.DisplayMetrics; import android.util.Log; import android.view.KeyEvent; import android.view.View; import android.view.Menu; import android.widget.LinearLayout; import android.widget.TextView; import android.widget.Toast; import com.android.mms.R; import com.android.mms.model.MediaModel; import com.android.mms.model.MediaWithTextModel; import com.android.mms.ui.multipleviews.BaseViewFragment; import com.android.mms.ui.multipleviews.VideoViewFragment; import com.google.android.mms.ContentType; import java.util.ArrayList; public class AttachmentViewActivity extends BaseAttachmentViewActivity { private static final String TAG = "AttachmentViewActivity"; static final int SAVE_SLIDER_ATTACHMENT_PERMISSION_REQUEST_CODE = 2017; private static final int DEFAULT_VISIBLE_LIST_COUNT = 3; private static final int DEFAULT_RECYCLER_VIEW_HEIGHT = 0; private static final int RECYCLER_VIEW_DEVICE_PORTION_ON_DEVICE_HEIGHT = 4; private static final boolean CONTAINER_FULLVIEW_MODE_VISIBLE = true; private static final boolean CONTAINER_FULLVIEW_MODE_INVISIBLE = false; private static final String VIEW_VCARD = "VIEW_VCARD_FROM_MMS"; private static final String KEY_PACKAGE_NAME = "calling_package_name"; private static final String PACKAGE_NAME = "com.android.mms"; private static final String EXTRA_SINGLE_ITEM = "SingleItemOnly"; private LinearLayout mDotsLayout; private TextView[] mDotsTextview; private SlidesAdapter mSlidesAdapter; private RecyclerView mSlidesRecyclerView; /** * Default recycler view height will be zero. Based on Device height it will be calculate the * recyclerview height dynamically. */ private int mRecyclerViewHeight = 0; @Override public void onCreate(Bundle icicle) { super.onCreate(icicle); if (MessageUtils.checkPermissionsIfNeeded(this)) { return; } setContentView(R.layout.attachment_view_activity); new GetDataAsyncTask(this).execute(); } private void initView() { mSelectedAttachmentContainer = (LinearLayout) findViewById(R.id.frameLayout_container); mDotsLayout = (LinearLayout) findViewById(R.id.dots_layout); mSlidesRecyclerView = (RecyclerView) findViewById(R.id.slides_recycler_view); calculateRecyclerViewHeight(); resetRecyclerViewHeight(mRecyclerViewHeight); RecyclerView.LayoutManager layoutManager = new LinearLayoutManagerWrapper(getApplicationContext(), LinearLayoutManager.HORIZONTAL, false); mSlidesRecyclerView.setLayoutManager(layoutManager); mSlidesRecyclerView.setItemAnimator(new DefaultItemAnimator()); mSlidesAdapter = new SlidesAdapter(mMediaWithTextModelList, this); mSlidesRecyclerView.setAdapter(mSlidesAdapter); // << [DNL-7826] 20210422 JunYiLin >> mSlidesAdapter.setOnDataSetChangedListener(mDataSetChangedListener); // << [DNL-7080] 20210330 JunYiLin << mSlidesRecyclerView.setOnFocusChangeListener(new View.OnFocusChangeListener() { @Override public void onFocusChange(View v, boolean hasFocus) { if (hasFocus) { updateNavBar(getResources().getStringArray(R.array.setNavBar_highlight_attachments_view_in_slideshow)); } } }); // [DNL-7080] >> } /** * prepare display dots layout */ private void displayDotsLayout() { if (mDotsTextview == null) { if (mMediaWithTextModelList.size() % DEFAULT_VISIBLE_LIST_COUNT == 0) { mDotsTextview = new TextView[mMediaWithTextModelList.size() / DEFAULT_VISIBLE_LIST_COUNT]; } else { mDotsTextview = new TextView[ (mMediaWithTextModelList.size() / DEFAULT_VISIBLE_LIST_COUNT) + 1]; } for (int i = 0; i < mDotsTextview.length; i++) { TextView dotTextView = new TextView(this); mDotsTextview[i] = dotTextView; mDotsTextview[i].setText(Html.fromHtml(getResources(). getString(R.string.dots_text_design))); mDotsTextview[i].setTextSize(getResources().getDimension(R.dimen.dot_text_design_size)); mDotsTextview[i].setTextColor(getResources().getColor(R.color.dot_inactive)); Log.v(TAG, "mDotsTextview text: "+mDotsTextview[i].getText()); mDotsLayout.addView(mDotsTextview[i]); } changeDotsColor(0); } Log.d(TAG,"fxj 147"); // Message UABT-1662 Haifei.Liao remove the dot layout BEGIN mDotsLayout.setVisibility(View.GONE); // Message UABT-1662 Haifei.Liao remove the dot layout END } @Override public void onRequestPermissionsResult(final int requestCode, final String permissions[], final int[] grantResults) { if (requestCode == SAVE_SLIDER_ATTACHMENT_PERMISSION_REQUEST_CODE) { if (MessageUtils.hasStoragePermission()) { //TODO: Check for permission enabled or not } else { Toast.makeText(this, R.string.no_permission_save_attachment_to_storage, Toast.LENGTH_SHORT).show(); } } } /** * change dots color whenever focus going to change left to right or right to left * @param currentPage */ private void changeDotsColor(int currentPage) { if (mDotsTextview == null) { return; } int focus = currentPage / DEFAULT_VISIBLE_LIST_COUNT; if (focus > 0) { mDotsTextview[focus - 1].setTextColor(getResources().getColor(R.color.dot_inactive)); } if (focus < mDotsTextview.length - 1) { mDotsTextview[focus + 1].setTextColor(getResources().getColor(R.color.dot_inactive)); } mDotsTextview[focus].setTextColor(getResources().getColor(R.color.dot_active)); } @Override public void onBackPressed() { /** * When click on back button check for is it in full view mode or not. If it is in full view mode * adjust to normal mode without close application. If it is in normal mode close this activity */ if (mSlidesRecyclerView.getLayoutParams() != null && mSlidesRecyclerView.getLayoutParams().height == DEFAULT_RECYCLER_VIEW_HEIGHT) { if (isSingleSlideMode()) { finish(); } else { resetRecyclerViewHeight(CONTAINER_FULLVIEW_MODE_INVISIBLE); } } else { super.onBackPressed(); } } @Override public void onItemClick(int position) { /** * If seleted mediaModel is Vcard or Vcal open there respective actions * else adjust current view as full view. */ MediaModel mediaModel = mMediaWithTextModelList.get(position).getMediaModel(); if (mediaModel != null) { if (mediaModel.isVcard()) { Intent intent = new Intent(Intent.ACTION_VIEW); // we need open the saved part. intent.setDataAndType(mediaModel.getUri(), ContentType.TEXT_VCARD.toLowerCase()); intent.addFlags(Intent.FLAG_GRANT_READ_URI_PERMISSION); // distinguish view vcard from mms or contacts. intent.putExtra(VIEW_VCARD, true); Intent openInChooser = Intent.createChooser(intent, getString(R.string.action_header)); startActivity(openInChooser); } else if (mediaModel.isVcal()) { Intent intent = new Intent(Intent.ACTION_VIEW); intent.addFlags(Intent.FLAG_GRANT_READ_URI_PERMISSION); intent.putExtra(EXTRA_SINGLE_ITEM, true); // So we don't see "surrounding" images in Gallery intent.putExtra(KEY_PACKAGE_NAME, PACKAGE_NAME); String contentType; contentType = mediaModel.getContentType(); intent.setDataAndType(mediaModel.getUri(), contentType); startActivity(intent); } else { resetRecyclerViewHeight(CONTAINER_FULLVIEW_MODE_VISIBLE); } } } @Override public void onLoadContainerView(int position) { super.onLoadContainerView(position); updateCurrentPosition(position); Log.v(TAG, "Attachment no fragment loaded till now onLoadContainerView"); changeDotsColor(position); } /** * Update mediaWithTextModel on mediaModel and text available * @param mediaModel * @param message */ @Override public void onDataUpdate(final MediaModel mediaModel, final String message) { runOnUiThread(new Runnable() { @Override public void run() { if(mActualNumberOfSlides == mNumberOfSlidesLoaded) { onDataInsert(mediaModel, message); } else { MediaWithTextModel mediaWithTextModel = mMediaWithTextModelList.get(mNumberOfSlidesLoaded); mediaWithTextModel.setMediaModel(mediaModel); mediaWithTextModel.setMessageText(message); mMediaWithTextModelList.set(mNumberOfSlidesLoaded, mediaWithTextModel); mSlidesAdapter.notifyItemChanged(mNumberOfSlidesLoaded); mNumberOfSlidesLoaded = mNumberOfSlidesLoaded + 1; } } }); } /** * Method for once data loads done this method will trigger. Handle on Main Thread */ @Override public void onDataLoadFinished() { Log.v(TAG, "Attachment onDataLoadFinished media list size is " + mMediaWithTextModelList.size()); removeEmptySlideObjects(); displayDotsLayout(); } @Override public void setInitialSlideSize(int size) { super.setInitialSlideSize(size); mMediaWithTextModelList = new ArrayList<MediaWithTextModel>(size); for(int i = 0; i < size; i ++) { mMediaWithTextModelList.add(new MediaWithTextModel(null,null)); } runOnUiThread(new Runnable() { @Override public void run() { initView(); } }); } @Override public void onDataInsert(final MediaModel mediaModel, final String message) { runOnUiThread(new Runnable() { @Override public void run() { Log.v(TAG, "Attachment onDataInsert"); MediaWithTextModel mediaWithTextModel = new MediaWithTextModel(message, mediaModel); mMediaWithTextModelList.add(mediaWithTextModel); mSlidesAdapter.notifyItemInserted(mMediaWithTextModelList.size() - 1); } }); } @Override public void onDataRemove(final int position) { mSlidesAdapter.notifyItemRemoved(position); } /** * sometimes slidemodel will return empty mediamodel list */ private void removeEmptySlideObjects() { //check for number of slides reached or not. If not reached then remove dummy viewholder from recyclerview runOnUiThread(new Runnable() { @Override public void run() { if(mActualNumberOfSlides > mNumberOfSlidesLoaded) { mMediaWithTextModelList.remove(mNumberOfSlidesLoaded); mNumberOfSlidesLoaded = mNumberOfSlidesLoaded + 1; onDataRemove(mMediaWithTextModelList.size()); removeEmptySlideObjects(); } } }); } public static class LinearLayoutManagerWrapper extends LinearLayoutManager { public LinearLayoutManagerWrapper(Context context, int orientation, boolean reverseLayout) { super(context, orientation, reverseLayout); } @Override public boolean supportsPredictiveItemAnimations() { return false; } } /** * Calculate recycler view height using the device height * eg: if device height is 400 then recycleViewHeight = 400/4 * (Here 4 is portion of value). finally recyclerViewHeight = 100 */ private void calculateRecyclerViewHeight() { DisplayMetrics displayMetrics = getResources().getDisplayMetrics(); int height = displayMetrics.heightPixels; mRecyclerViewHeight = (int) (height / RECYCLER_VIEW_DEVICE_PORTION_ON_DEVICE_HEIGHT ); } public void resetRecyclerViewHeight(boolean isViewEntered) { Log.d(TAG, "base resetRecyclerViewHeight isEnter = " + isViewEntered); BaseViewFragment baseFragment = (BaseViewFragment) getFragmentManager() .findFragmentById(R.id.selected_attachment_container); if (isViewEntered) { resetRecyclerViewHeight(DEFAULT_RECYCLER_VIEW_HEIGHT); mDotsLayout.setVisibility(View.GONE); if (baseFragment != null) { baseFragment.onPlayMedia(); } } else { resetRecyclerViewHeight(mRecyclerViewHeight); mDotsLayout.setVisibility(View.VISIBLE); if (baseFragment != null) { baseFragment.onStopMedia(); } } mSlidesRecyclerView.requestLayout(); } private void resetRecyclerViewHeight(int height) { mSlidesRecyclerView.getLayoutParams().height = height; mSlidesRecyclerView.requestLayout(); } /** * When click play all menu item readjust view from full view to normal */ @Override public void onPlayAllMenuItemSelect() { if (mSlidesRecyclerView.getLayoutParams().height == DEFAULT_RECYCLER_VIEW_HEIGHT) { resetRecyclerViewHeight(CONTAINER_FULLVIEW_MODE_INVISIBLE); } } @Override public void playMediaInFullView() { resetRecyclerViewHeight(CONTAINER_FULLVIEW_MODE_VISIBLE); } @Override public boolean dispatchKeyEvent(KeyEvent event) { BaseViewFragment baseFragment = (BaseViewFragment) getFragmentManager() .findFragmentById(R.id.selected_attachment_container); if(baseFragment instanceof VideoViewFragment) { VideoViewFragment videoViewFragment = (VideoViewFragment) baseFragment; videoViewFragment.onKeyEvent(); } return super.dispatchKeyEvent(event); } @Override public boolean onPrepareOptionsMenu(Menu menu) { super.onPrepareOptionsMenu(menu); invalidateOptionsMenu(); return true; } // [DNL-7080] 20210330 JunYiLin << @Override public boolean onKeyUp(int keyCode, KeyEvent event) { switch (keyCode) { case KeyEvent.KEYCODE_MENU: saveAllAttachment(); break; case KeyEvent.KEYCODE_MULTIFUNC_CENTER: playAllSlideShow(); break; case KeyEvent.KEYCODE_MULTIFUNC_RIGHT: saveAttachment(); break; } return super.onKeyUp(keyCode, event); } @Override protected void onResume() { super.onResume(); updateNavBar(this.getResources().getStringArray(R.array.setNavBar_highlight_attachments_view_in_slideshow)); } // [DNL-7080] >> },BaseAttachmentViewActivity.java:/* * Sonim Technologies, Inc. CONFIDENTIAL * Copyright (c) 2017 All Rights Reserved. * * The source code contained or described herein and all documents * related to the source code ("Material") is property of Sonim * Technologies, Inc. and its affiliates, may be covered by U.S. * or Foreign Patents or patents in process, and is protected by trade * secret or copyright law. No part of the Material may be used, * copied, reproduced, modified, published, uploaded, posted, * transmitted, distributed, or disclosed in any way without * prior written permission by Sonim Technologies, Inc. * * Unless required by applicable law or agreed to in writing, * the Materials are provided "AS IS," and WITHOUT WARRANTIES * OR CONDITIONS OF ANY KIND, either express or implied. No license * under any patent, copyright, trade secret or other * intellectual property right is granted to or conferred upon you * by disclosure or delivery of the Material, either expressly, by * implication, inducement, estoppel or otherwise. Any license * under such intellectual property rights must be express and * approved by Sonim Technologies, Inc. in writing. * */ package com.android.mms.ui; import android.app.Activity; import android.app.FragmentManager; import android.app.FragmentTransaction; import android.content.ContentValues; import android.content.Intent; import android.net.Uri; import android.os.AsyncTask; import android.os.Bundle; import android.provider.Telephony; import android.util.Log; import android.os.Environment; import android.text.TextUtils; import android.view.Menu; import android.view.MenuItem; // << [DNL-795] 20210113 JunYiLin >> import android.view.KeyEvent; import android.webkit.MimeTypeMap; import android.widget.LinearLayout; import android.widget.Toast; import com.android.mms.MmsApp; import com.android.mms.R; import com.android.mms.drm.DrmUtils; import com.android.mms.model.MediaWithTextModel; import com.android.mms.model.MediaModel; import com.android.mms.model.SlideModel; import com.android.mms.model.SlideshowModel; import com.android.mms.model.SmilHelper; import com.android.mms.transaction.MessagingNotification; import com.android.mms.ui.multipleviews.AudioViewFragment; import com.android.mms.ui.multipleviews.BaseViewFragment; import com.android.mms.ui.multipleviews.IAttachmentSelection; import com.android.mms.ui.multipleviews.ImageViewFragment; import com.android.mms.ui.multipleviews.VcardVcalViewFragment; import com.android.mms.ui.multipleviews.VideoViewFragment; import com.android.mms.util.Utils; import com.google.android.mms.ContentType; import com.google.android.mms.MmsException; import com.google.android.mms.util.SqliteWrapper; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.ref.WeakReference; import java.util.List; public abstract class BaseAttachmentViewActivity extends Activity implements IAttachmentSelection { private static final String TAG = BaseAttachmentViewActivity.class.getSimpleName(); private static final int SINGLE_SLIDE_MODEL_SIZE = 1; private static final String NO_SPACE_ERROR = "ENOSPC"; private static boolean mNoSpace = false; public static final int MENU_PLAY = 1; public static final int MENU_PAUSE = 2; public static final int MENU_RESUME = 3; public static final int MENU_PLAY_ALL = 4; public static final int MENU_STOP = 5; public static final int MENU_SAVE_ATTACHMENT = 6; public static final int MENU_SAVE_ALL_ATTACHMENTS = 7; public LinearLayout mSelectedAttachmentContainer; public List<MediaWithTextModel> mMediaWithTextModelList; private boolean mSingleSlideMode = false; private boolean mPlayAllMode = false; private boolean mEditSlideShowMode = false; private int mCurrentMediaPosition = 0; public int mActualNumberOfSlides, mNumberOfSlidesLoaded; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); } private void markAsReadIfNeed() { Intent intent = getIntent(); boolean unread = intent.getBooleanExtra("unread", false); Uri uri = intent.getData(); if (unread) { ContentValues values = new ContentValues(2); values.put(Telephony.Mms.SEEN, MessageUtils.MESSAGE_SEEN); values.put(Telephony.Mms.READ, MessageUtils.MESSAGE_READ); SqliteWrapper.update(this, getContentResolver(), uri, values, null, null); MessagingNotification.blockingUpdateNewMessageIndicator( this, MessagingNotification.THREAD_NONE, false); } } @Override public void onLoadContainerView(int position) { MediaWithTextModel mediaWithTextModel = mMediaWithTextModelList.get(position); overrideContainer(mediaWithTextModel); } @Override public void onLoadContainerView(MediaWithTextModel customMediaModel) { overrideContainer(customMediaModel); } // When switching between the fragments update mCurrentMediaPosition // with selected media position public void updateCurrentPosition(int currentMediaPosition) { mCurrentMediaPosition = currentMediaPosition; } private void overrideContainer(MediaWithTextModel mediaWithTextModel) { if (!isFinishing() && !isDestroyed()) { FragmentManager fragmentManager = getFragmentManager(); FragmentTransaction fragmentTransaction = fragmentManager.beginTransaction(); MediaModel mediaModel = mediaWithTextModel.getMediaModel(); switch (mediaModel.getTag()) { case SmilHelper.ELEMENT_TAG_TEXT: fragmentTransaction.replace(R.id.selected_attachment_container, ImageViewFragment.newInstance(mediaWithTextModel), null); break; case SmilHelper.ELEMENT_TAG_IMAGE: fragmentTransaction.replace(R.id.selected_attachment_container, ImageViewFragment.newInstance(mediaWithTextModel), null); break; case SmilHelper.ELEMENT_TAG_AUDIO: fragmentTransaction.replace(R.id.selected_attachment_container, AudioViewFragment.newInstance(mediaWithTextModel), null); break; case SmilHelper.ELEMENT_TAG_VIDEO: fragmentTransaction.replace(R.id.selected_attachment_container, VideoViewFragment.newInstance(mediaWithTextModel), null); break; case SmilHelper.ELEMENT_TAG_REF: if (mediaModel.getContentType().toLowerCase().equals(ContentType.TEXT_VCARD. toLowerCase()) || mediaModel.getContentType().toLowerCase(). equals(ContentType.TEXT_VCALENDAR.toLowerCase())) { fragmentTransaction.replace(R.id.selected_attachment_container, VcardVcalViewFragment.newInstance(mediaWithTextModel), null); } break; default: Log.v(TAG,"default attachment type case"); } fragmentTransaction.commit(); } } public boolean isSingleSlideMode() { return mSingleSlideMode; } public void setSingleSlideMode(boolean mSingleSlideMode) { this.mSingleSlideMode = mSingleSlideMode; } static class GetDataAsyncTask extends AsyncTask<Void, Void, Void> { private WeakReference<BaseAttachmentViewActivity> attachmentViewActivityWeakReference; public GetDataAsyncTask(BaseAttachmentViewActivity baseAttachmentViewActivity) { attachmentViewActivityWeakReference = new WeakReference(baseAttachmentViewActivity); } @Override protected Void doInBackground(Void... voids) { Log.v(TAG, "GetDataAsyncTask doInBackground"); BaseAttachmentViewActivity activity = attachmentViewActivityWeakReference.get(); if (activity != null) { try { Intent intent = activity.getIntent(); Uri uri = intent.getData(); SlideshowModel.createFromMessageUri(activity, uri); } catch (MmsException e) { Log.e(TAG, "Cannot present the slide show.", e); } activity.markAsReadIfNeed(); } else { Log.v(TAG, "GetDataAsyncTask Activity is null "); } return null; } @Override protected void onPostExecute(Void aVoid) { super.onPostExecute(aVoid); BaseAttachmentViewActivity activity = attachmentViewActivityWeakReference.get(); if (activity != null) { activity.onDataLoadFinished(); } } } /** * Update mediaWithTextModelList on slideModel available * @param slideModel * @param currentSlide */ @Override public void onDataUpdate(SlideModel slideModel, int currentSlide) { String message = slideModel.getText() != null ? slideModel.getText().getText() : null; if (slideModel.size() == SINGLE_SLIDE_MODEL_SIZE && slideModel.get(0).isText()) { if (mActualNumberOfSlides > mNumberOfSlidesLoaded) { onDataUpdate(slideModel.get(0), message); } else { onDataInsert(slideModel.get(0), message); } } else { for (int j = 0; j < slideModel.size(); j++) { MediaModel mediaModel = slideModel.get(j); Log.d(TAG, "Attachment prepareSlideData MediaModel contentType = " + mediaModel.getContentType()); if (mediaModel != null && !mediaModel.isText()) { Log.d(TAG, "Attachment prepareSlideData adding MediaWithTextModel "); if (mActualNumberOfSlides > mNumberOfSlidesLoaded) { onDataUpdate(mediaModel, message); } else { onDataInsert(mediaModel, message); } } } } } @Override public void setInitialSlideSize(int size) { Log.v(TAG, "Attachment setSlideShowSize size " + size); mActualNumberOfSlides = size; } public IAttachmentSelection getIAttachmentSelection() { return this; } public boolean isPlayAllMode() { return mPlayAllMode; } public void setPlayAllMode(boolean mPlayAllMode) { this.mPlayAllMode = mPlayAllMode; } @Override public void onPlayNextSlide() { } //[DNL-795] 20210220 JunYiLin << /* mark original @Override public boolean onPrepareOptionsMenu(Menu menu) { menu.clear(); BaseViewFragment baseFragment = (BaseViewFragment) getFragmentManager(). findFragmentById(R.id.selected_attachment_container); if(baseFragment != null) { baseFragment.onMenuUpdate(menu); } if(mMediaWithTextModelList != null && mMediaWithTextModelList.size() > 1) { menu.add(0, MENU_PLAY_ALL, 0,R.string.play_all); } menu.add(0, MENU_SAVE_ATTACHMENT, 0,R.string.save_attachment); if(mMediaWithTextModelList != null && mMediaWithTextModelList.size() > 1) { menu.add(0, MENU_SAVE_ALL_ATTACHMENTS, 0, R.string.save_all_attachments); } return true; } */ // [DNL-795] >> @Override public boolean onOptionsItemSelected(MenuItem item) { BaseViewFragment baseFragment = null; switch (item.getItemId()) { case MENU_PLAY_ALL: onPlayAllMenuItemSelect(); Intent intent = new Intent(this, AttachmentPlayAllActivity.class); intent.setData(getIntent().getData()); startActivity(intent); break; case MENU_SAVE_ATTACHMENT: saveAttachment(); break; case MENU_SAVE_ALL_ATTACHMENTS: saveAllAttachment(); break; case MENU_PLAY: playMediaInFullView(); baseFragment = (BaseViewFragment) getFragmentManager() .findFragmentById(R.id.selected_attachment_container); baseFragment.onPlayMedia(); break; case MENU_PAUSE: baseFragment = (BaseViewFragment) getFragmentManager() .findFragmentById(R.id.selected_attachment_container); baseFragment.onPauseMedia(); break; case MENU_RESUME: baseFragment = (BaseViewFragment) getFragmentManager() .findFragmentById(R.id.selected_attachment_container); baseFragment.onResumeMedia(); break; default: return super.onOptionsItemSelected(item); } return true; } // [DNL-7080] 20210330 JunYiLin << public void saveAllAttachment() { /* mark original private void saveAllAttachment(){ */ // [DNL-7080] int resId; boolean isCopied = copyMedia(); if(isCopied){ resId = R.string.save_all_attachment_success; }else{ resId = mNoSpace ? R.string.space_low_message : R.string.save_attachment_fail; } Toast.makeText(BaseAttachmentViewActivity.this, resId, Toast.LENGTH_SHORT).show(); } // [DNL-795] 20210113 JunYiLin << public void saveAttachment() { /* mark original private void saveAttachment(){ */ // [DNL-795] >> int resId; boolean isCopied = copyPart(mMediaWithTextModelList.get(mCurrentMediaPosition)); if(isCopied){ resId = R.string.save_attachment_success; }else{ resId = mNoSpace ? R.string.space_low_message : R.string.save_attachment_fail; } Toast.makeText(BaseAttachmentViewActivity.this, resId, Toast.LENGTH_SHORT).show(); } /** * Copies media from an Mms to the "download" directory on the SD card. If any of the parts * are audio types, drm'd or not, they're copied to the "Ringtones" directory. */ private boolean copyMedia() { boolean result = true; if (mMediaWithTextModelList == null) { return false; } for(int i = 0; i < mMediaWithTextModelList.size(); i++) { // all parts have to be successful for a valid result. result &= copyPart(mMediaWithTextModelList.get(i)); } return result; } private boolean copyPart(MediaWithTextModel customMediaModel) { // [DNL-8241] 20210423 JunYiLin << if (MessageUtils.checkIsPhoneMemoryFull(BaseAttachmentViewActivity.this)) { mNoSpace = true; return false; } // [DNL-8241] >> mNoSpace = false; MediaModel mediaModel = customMediaModel.getMediaModel(); Uri uri = mediaModel.getUri(); String type = new String(mediaModel.getContentType()); boolean isDrm = DrmUtils.isDrmType(type); if (isDrm) { type = MmsApp.getApplication().getDrmManagerClient() .getOriginalMimeType(uri); } if (!ContentType.isImageType(type) && !ContentType.isVideoType(type) && !ContentType.isAudioType(type) && !(ContentType.TEXT_VCARD.toLowerCase().equals(type.toLowerCase())) && !(ContentType.TEXT_VCALENDAR.toLowerCase().equals(type.toLowerCase())) && !(ContentType.AUDIO_OGG.toLowerCase().equals(type.toLowerCase()))) { return true; // we only save pictures, videos, and sounds. Skip the text parts, // the app (smil) parts, and other type that we can't handle. // Return true to pretend that we successfully saved the part so // the whole save process will be counted a success. } InputStream input = null; FileOutputStream fout = null; try { input = getContentResolver().openInputStream(uri); if (input instanceof FileInputStream) { FileInputStream fin = (FileInputStream) input; String fileName = mediaModel.getSrc(); File originalFile = new File(fileName); fileName = originalFile.getName(); // Strip the full path of where the "part" is // stored down to just the leaf filename. // For every media the default saving directory should be 'Download'. String dir = Utils.getStoragePath(this) + "/" + Environment.DIRECTORY_DOWNLOADS + "/"; String extension; int index; if ((index = fileName.lastIndexOf('.')) == -1) { extension = MimeTypeMap.getSingleton().getExtensionFromMimeType(type); } else { extension = fileName.substring(index + 1, fileName.length()); fileName = fileName.substring(0, index); } if (isDrm) { extension += DrmUtils.getConvertExtension(type); } // Remove leading periods. The gallery ignores files starting with a period. if (!TextUtils.isEmpty(fileName)) { fileName = fileName.trim(); fileName = fileName.replaceAll("^\\.", ""); } File file = getUniqueDestination(dir + fileName, extension); // make sure the path is valid and directories created for this file. File parentFile = file.getParentFile(); if (!parentFile.exists() && !parentFile.mkdirs()) { Log.e(TAG, "[MMS] copyPart: mkdirs for " + parentFile.getPath() + " failed!"); return false; } fout = new FileOutputStream(file); byte[] buffer = new byte[8000]; int size = 0; while ((size=fin.read(buffer)) != -1) { fout.write(buffer, 0, size); } // Notify other applications listening to scanner events // that a media file has been added to the sd card sendBroadcast(new Intent(Intent.ACTION_MEDIA_SCANNER_SCAN_FILE, Uri.fromFile(file))); } } catch (IOException e) { // Ignore Log.e(TAG, "IOException caught while opening or reading stream", e); if(e.getMessage().contains(NO_SPACE_ERROR)){ mNoSpace = true; } return false; } finally { if (null != input) { try { input.close(); } catch (IOException e) { // Ignore Log.e(TAG, "IOException caught while closing stream", e); return false; } } if (null != fout) { try { fout.close(); } catch (IOException e) { // Ignore Log.e(TAG, "IOException caught while closing stream", e); return false; } } } return true; } private File getUniqueDestination(String base, String extension) { File file = new File(base + "." + extension); for (int i = 2; file.exists(); i++) { file = new File(base + "_" + i + "." + extension); } return file; } @Override public void onPlayAllMenuItemSelect() { } @Override public void playMediaInFullView() {} public boolean isEditSlideShowMode() { return mEditSlideShowMode; } public void setEditSlideShowMode(boolean editSlideShowMode) { this.mEditSlideShowMode = editSlideShowMode; } // << [DNL-7080] 20210330 JunYiLin << public void updateNavBar(String[] str) { if (str.length > 0) { MessageUtils.setNavigationBarName(this, str); } } public void playAllSlideShow() { Intent intent = new Intent(this, AttachmentPlayAllActivity.class); intent.setData(getIntent().getData()); startActivity(intent); } // [DNL-7080] >> // [DNL-7826] 20210422 JunYiLin << public final SlidesAdapter.OnDataSetChangedListener mDataSetChangedListener = new SlidesAdapter.OnDataSetChangedListener() { @Override public void onHighLightItem(int position) { playAudioMedia(position); } }; public void playAudioMedia(int position) { MediaModel mediaModel = mMediaWithTextModelList.get(position).getMediaModel(); if (mediaModel != null && mediaModel.isAudio()) { BaseViewFragment baseFragment = (BaseViewFragment) getFragmentManager().findFragmentById(R.id.selected_attachment_container); if (baseFragment != null) { baseFragment.onPlayMedia(); } } } // [DNL-7826] >> }以上报错,怎么在上述代码上具体修改

2025-05-29 02:32:31 [XNIO-1 task-3] INFO o.d.c.w.i.PlusWebInvokeTimeInterceptor - [PLUS]开始请求 => URL[POST /platform/chat/streamLLM],参数类型[json],参数:[{"model":"qwen","input":"讲的什么?","filePath":"http://fileeds.finchina.com:12001/wpspreview/mkplatform/2025/05/14/538a2f695f1a4aad99155327b59a43b1.html","fileGuid":"bcf418b1-b45f-46c2-a576-d47ac3e984c4","projectId":3651,"chatType":1}] Consume Time:83 ms 2025-05-29 02:32:31 Execute SQL:SELECT id,guid,file_name,source_path,target_path,original_path,state,public_state,memo,tmstamp,is_del,project_id,session_id,create_dept,create_by,create_time,update_by,update_time FROM mk_files WHERE (guid = 'bcf418b1-b45f-46c2-a576-d47ac3e984c4') Consume Time:17 ms 2025-05-29 02:32:32 Execute SQL:INSERT INTO mk_prompts ( id, guid, project_id, type, content, files_guid, model_type, create_dept, create_by, create_time, update_by, update_time ) VALUES ( 1927915933471264769, '3cd0828a-b265-4221-bbe1-09cc7ad88e7f', 3651, 0, '讲的什么?', 'bcf418b1-b45f-46c2-a576-d47ac3e984c4', 'qwen', 103, 1, '2025-05-29 02:32:32', 1, '2025-05-29 02:32:32' ) Consume Time:78 ms 2025-05-29 02:32:32 Execute SQL:INSERT INTO mk_prompts ( id, guid, project_id, type, files_guid, prompts_guid, model_type, create_dept, create_by, create_time, update_by, update_time ) VALUES ( 1927915933622259713, '9ddef2c6-f774-435e-b901-f4982f9bb0a0', 3651, 1, 'bcf418b1-b45f-46c2-a576-d47ac3e984c4', '3cd0828a-b265-4221-bbe1-09cc7ad88e7f', 'qwen', 103, 1, '2025-05-29 02:32:32', 1, '2025-05-29 02:32:32' ) 2025-05-29 02:33:04 [ForkJoinPool.commonPool-worker-1] WARN o.d.c.m.h.InjectionMetaObjectHandler - 自动注入警告 => 用户未登录 2025-05-29 02:33:04 [ForkJoinPool.commonPool-worker-1] ERROR o.d.p.controller.MkChatController - Error in streamLLM java.lang.RuntimeException: {"status": "ERROR","error_msg": "处理响应时发生错误:org.mybatis.spring.MyBatisSystemException"} at org.dromara.platform.service.impl.MkChatServiceImpl.lambda$requestLLMStream$2(MkChatServiceImpl.java:278) at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:844) at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) 这个是报错,以下是代码 @SaIgnore @ResponseBody @PostMapping(value = "/streamLLM", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamLLM(@RequestBody PromptBo promptBo) throws Exception { return chatService.getAnswerByFlux(promptBo) .onErrorResume(e -> { log.error("Error in streamLLM", e); return Flux.just(e.getMessage()); }); } @Override public Flux<String> getAnswerByFlux(PromptBo promptBo) throws Exception { String requestURL = LLMUrl; if(promptBo != null && promptBo.getModel() != null){ if(promptBo.getModel().equals("deepseek-v3")||promptBo.getModel().equals("deepseek-r1")||promptBo.getModel().equals("deepseek-r1-volc")){ requestURL = LLMUrlDS; } } String content; Long projectId = promptBo.getProjectId(); if (projectId == null){ return Flux.error(new RuntimeException("项目id不能为空")); } String questionGuid = UUID.randomUUID().toString(); MkPromptsBo question = new MkPromptsBo(questionGuid,projectId,0,promptBo.getModel()); //入库问题 MkPromptsBo answer = new MkPromptsBo(UUID.randomUUID().toString(),projectId,1,questionGuid,promptBo.getModel()); // 入库答案 MkPromptsBo think = new MkPromptsBo(UUID.randomUUID().toString(),projectId,2,questionGuid,promptBo.getModel()); // think if (promptBo.getChatType() == 1 || promptBo.getChatType() == 2){ // 全文问答或局部检索问答 MkFiles file = filesService.queryTargetPathByGuid(promptBo.getFileGuid()); String fileType = getFileExtension(file.getSourcePath(),file.getFileName()); R<String> r = getFileContent(fileType, file); if (r.getCode() == R.SUCCESS){ content = r.getData(); } else { return Flux.error(new RuntimeException("{\"status\": \"ERROR\",\"error_msg\": \"" + r.getMsg() + "\"}")); } } else if(promptBo.getChatType() == 0) { // 局部问答 content = promptBo.getContent()==null?"":promptBo.getContent(); } else{ return Flux.error(new RuntimeException("chatType参数有误")); } question.setFilesGuid(promptBo.getFileGuid()); question.setContent(promptBo.getInput()); answer.setFilesGuid(promptBo.getFileGuid()); think.setFilesGuid(promptBo.getFileGuid()); promptsService.insertByBo(question); promptsService.insertByBo(answer); if(promptBo.getModel().equals("deepseek-r1")||promptBo.getModel().equals("deepseek-r1-volc")){ promptsService.insertByBo(think); } // 向大模型提问 String promptStr = "You are a helpful assistant designed to output JSON."; PromptRequest requestModel = new PromptRequest(promptBo.getModel(),content,promptBo.getInput(),promptStr,true); // 当为局部检索问答时需要设置section_qa参数 if (promptBo.getChatType() == 2 ){ MkProjectsVo mkProjectsVo = projectsService.queryById(projectId); ObjectMapper objectMapper = new ObjectMapper(); List<String> keywords = objectMapper.readValue(mkProjectsVo.getKeywords(), new com.fasterxml.jackson.core.type.TypeReference<>() { }); requestModel.setSection_qa(new SectionQa(keywords,mkProjectsVo.getDirection(),mkProjectsVo.getOffsets())); } ObjectMapper mapper = new ObjectMapper(); String requestStr; try { requestStr = mapper.writeValueAsString(requestModel); } catch (JsonProcessingException e) { throw new RuntimeException(e); } return requestLLMStream(requestStr,promptBo,question,answer,think,requestURL); } private Flux<String> requestLLMStream(String requestStr,PromptBo promptBo,MkPromptsBo question,MkPromptsBo answer,MkPromptsBo think,String url){ HttpClient client = HttpClient.newHttpClient(); HttpRequest request; try { request = HttpRequest.newBuilder() .uri(new URI(url)) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(requestStr)) .build(); } catch (Exception e) { return Flux.create(sink -> { sink.next("{\"status\": \"ERROR\",\"error_msg\": \"构建大模型请求出现错误!\"}"); sink.complete(); }); } return Flux.create(sink -> { CompletableFuture<HttpResponse<InputStream>> responseFuture = client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()); responseFuture .completeOnTimeout(null, 60, TimeUnit.SECONDS) // 60秒超时 .thenAccept(response -> { if (response == null) { sink.next("{\"status\": \"ERROR\",\"error_msg\": \"请求超时\"}"); sink.complete(); return; } int code = response.statusCode(); if (code != 200) { sink.next("{\"status\": \"ERROR\",\"error_msg\": \"请求出现问题,请联系管理员!状态码:"+code+"\"}"); sink.complete(); return; } String idMessage = String.format("{\"type\":\"ids\",\"questionId\":\"%s\",\"answerId\":\"%s\"}\n\n", question.getId(), answer.getId()); sink.next(idMessage); AtomicBoolean isClientDisconnected = new AtomicBoolean(false); sink.onCancel(() -> { isClientDisconnected.set(true); }); try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.body(), StandardCharsets.UTF_8))) { String line; StringBuilder output = new StringBuilder(); StringBuilder thinkString = new StringBuilder(); ObjectMapper mapper2 = new ObjectMapper(); HashMap<String,String> processedLines = new HashMap<>(); // 用于记录"model" while ((line = reader.readLine()) != null && !isClientDisconnected.get()) { if(!line.isBlank()){ line = line.replaceFirst("data:",""); if ("[DONE]".equals(line.trim())) { break; } processJsonLine(processedLines,line, output, thinkString,mapper2); } sink.next(line); } // 完整答案拼接完成后,一次性入库 String fullAnswer = output.toString(); answer.setContent(fullAnswer); answer.setModelType(processedLines.get("model")); if(processedLines.containsKey("page_range")&&!processedLines.get("page_range").equals("null")){ answer.setPageRange(processedLines.get("page_range")); } if(promptBo.getModel().equals("deepseek-r1")||promptBo.getModel().equals("deepseek-r1-volc")) { think.setContent(thinkString.toString()); think.setModelType(processedLines.get("model")); think.setPageRange(processedLines.get("page_range")); promptsService.updateByBo(think); } question.setModelType(processedLines.get("model")); promptsService.updateByBo(answer); promptsService.updateByBo(question); if (!isClientDisconnected.get()) { // 如果客户端没有断开连接,发送 [DONE] 信号 sink.next("[DONE]"); } sink.complete(); } catch (Exception e) { sink.error(new RuntimeException("{\"status\": \"ERROR\",\"error_msg\": \"处理响应时发生错误:"+e+"\"}")); } }) .exceptionally(e -> { if (e instanceof IOException) { sink.next("{\"status\": \"ERROR\",\"error_msg\": \"模型调用请求失败:"+ e.getMessage()+"\"}"); } else if (e instanceof InterruptedException) { sink.next("{\"status\": \"ERROR\",\"error_msg\": \"请求模型连接被中断\"}"); } else { sink.next("{\"status\": \"ERROR\",\"error_msg\": \"发生未知错误:"+ e.getMessage()+"\"}"); } sink.complete(); return null; }); }); } @Override public Boolean updateByBo(MkPromptsBo bo) { MkPrompts update = MapstructUtils.convert(bo, MkPrompts.class); validEntityBeforeSave(update); return baseMapper.updateById(update) > 0; } -- public.mk_prompts definition -- Drop table -- DROP TABLE public.mk_prompts; CREATE TABLE public.mk_prompts ( id bigserial NOT NULL, -- 流水号 guid varchar(50) NULL, project_id int8 NULL, -- 项目表的id "type" int4 NULL, -- 记录类型 枚举值:0-问题;1-答案 "content" text NULL, -- type=0时为问题内容,type=1时为回答内容 files_guid varchar(50) NULL, -- 对应文件guid prompts_guid varchar(50) NULL, -- 对应问题的guid,type=1时有值 model_type varchar(200) NULL, -- 选择的模型 create_time date DEFAULT CURRENT_DATE NULL, -- 创建时间 update_time date DEFAULT CURRENT_DATE NULL, -- 更新时间 tmstamp varchar DEFAULT CURRENT_TIMESTAMP NULL, -- 时间戳 create_dept int8 NULL, -- 创建部门 create_by int8 NULL, -- 创建用户 update_by int8 NULL, -- 更新用户 page_range varchar(200) NULL, -- 关键词问答返回请求 {关键词: [1,2]} 比如:{"市净率":[0,1,2,3,4,5]} CONSTRAINT mk_prompts_pk PRIMARY KEY (id) ); -- Column comments COMMENT ON COLUMN public.mk_prompts.id IS '流水号'; COMMENT ON COLUMN public.mk_prompts.project_id IS '项目表的id'; COMMENT ON COLUMN public.mk_prompts."type" IS '记录类型 枚举值:0-问题;1-答案'; COMMENT ON COLUMN public.mk_prompts."content" IS 'type=0时为问题内容,type=1时为回答内容'; COMMENT ON COLUMN public.mk_prompts.files_guid IS '对应文件guid'; COMMENT ON COLUMN public.mk_prompts.prompts_guid IS '对应问题的guid,type=1时有值'; COMMENT ON COLUMN public.mk_prompts.model_type IS '选择的模型'; COMMENT ON COLUMN public.mk_prompts.create_time IS '创建时间'; COMMENT ON COLUMN public.mk_prompts.update_time IS '更新时间'; COMMENT ON COLUMN public.mk_prompts.tmstamp IS '时间戳'; COMMENT ON COLUMN public.mk_prompts.create_dept IS '创建部门'; COMMENT ON COLUMN public.mk_prompts.create_by IS '创建用户'; COMMENT ON COLUMN public.mk_prompts.update_by IS '更新用户'; COMMENT ON COLUMN public.mk_prompts.page_range IS '关键词问答返回请求 {关键词: [1,2]} 比如:{"市净率":[0,1,2,3,4,5]}'; 我本地启动时这个接口不会出现这个错误,一旦部署到服务器上就出现了这个错误而且就这一个接口,真的是太奇怪了,帮我看看是什么问题?

我有两段python代码,帮我增加三个接口,第一个接口功能是查询v_iiot_variable表,将表中sn,label,value数据返回到前端使用SSE实时显示,第二个接口功能是修改,第三个接口功能是删除,,用vue3+element写一个前端来实现这三个接口,前端用表格显示,表格中有修改和删除的功能,后端地址http://127.0.0.1:8888,sse卸载的时候记得释放 以下为services.pyd代码import json import asyncio from collections import deque # 在 service.py 中添加以下代码 async def command_match_event_generator(request): """SSE事件生成器,实时推送iiot_command_match表变更""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): async with pool.acquire() as conn: # 查询iiot_command_match表 commands = await conn.fetch( "SELECT id, sn, label, type, priority, task_type_sn, execute_json " "FROM iiot_command_match" ) # 格式化数据 formatted_data = [ { "id": c["id"], "sn": c["sn"], "label": c["label"], "type": c["type"], "priority": c["priority"], "task_type_sn": c["task_type_sn"], "execute_json": json.loads(c["execute_json"]) if c["execute_json"] != '-' else {} } for c in commands ] # 生成SSE事件 yield f"data: {json.dumps(formatted_data)}\n\n" await asyncio.sleep(1) # 每秒更新 except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" async def create_command_match(pool, data: dict): """创建新指令匹配""" required_fields = ["sn", "label", "type", "priority", "task_type_sn", "execute_json"] if not all(field in data for field in required_fields): raise ValueError("缺少必要字段") async with pool.acquire() as conn: # 检查SN是否唯一 existing = await conn.fetchval( "SELECT 1 FROM iiot_command_match WHERE sn = $1", data["sn"] ) if existing: raise ValueError(f"SN {data['sn']} 已存在") # 插入新记录 await conn.execute( "INSERT INTO iiot_command_match(sn, label, type, priority, task_type_sn, execute_json) " "VALUES ($1, $2, $3, $4, $5, $6)", data["sn"], data["label"], data["type"], data["priority"], data["task_type_sn"], json.dumps(data["execute_json"]) ) async def update_command_match(pool, command_id: int, update_data: dict): """更新指令匹配信息""" async with pool.acquire() as conn: # 检查记录是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_command_match WHERE id = $1", command_id) if not exists: raise ValueError(f"指令ID {command_id} 不存在") # 构建更新语句 query = "UPDATE iiot_command_match SET " params = [] updates = [] field_mapping = { "sn": "sn", "label": "label", "type": "type", "priority": "priority", "task_type_sn": "task_type_sn", "execute_json": "execute_json" } for field, db_field in field_mapping.items(): if field in update_data: updates.append(f"{db_field} = ${len(params) + 1}") # 特殊处理JSON字段 if field == "execute_json": params.append(json.dumps(update_data[field])) else: params.append(update_data[field]) if not updates: return query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1) params.append(command_id) await conn.execute(query, *params) async def delete_command_match(pool, command_id: int): """删除指令匹配""" async with pool.acquire() as conn: # 检查记录是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_command_match WHERE id = $1", command_id) if not exists: raise ValueError(f"指令ID {command_id} 不存在") # 执行删除操作 await conn.execute("DELETE FROM iiot_command_match WHERE id = $1", command_id) return True # 添加删除任务类型功能 async def delete_task_type(pool, task_id: int): """删除任务类型""" async with pool.acquire() as conn: # 检查任务类型是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_task_type WHERE id = $1", task_id) if not exists: raise ValueError(f"任务类型ID {task_id} 不存在") # 执行删除操作 await conn.execute("DELETE FROM iiot_task_type WHERE id = $1", task_id) return True # service.py 新增代码 async def task_type_event_generator(request): """SSE事件生成器,实时推送iiot_task_type表变更""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): async with pool.acquire() as conn: # 查询iiot_task_type表 task_types = await conn.fetch("SELECT id, sn, label FROM iiot_task_type") # 格式化数据 formatted_data = [ {"id": t["id"], "sn": t["sn"], "label": t["label"]} for t in task_types ] # 生成SSE事件 yield f"data: {json.dumps(formatted_data)}\n\n" await asyncio.sleep(1) # 每秒更新 except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" async def create_task_type(pool, data: dict): """创建新任务类型""" async with pool.acquire() as conn: await conn.execute( "INSERT INTO iiot_task_type(sn, label) VALUES ($1, $2)", data["sn"], data["label"] ) async def update_task_type(pool, task_id: int, update_data: dict): """更新任务类型信息""" async with pool.acquire() as conn: query = "UPDATE iiot_task_type SET " params = [] updates = [] if "sn" in update_data: updates.append("sn = $" + str(len(params) + 1)) params.append(update_data["sn"]) if "label" in update_data: updates.append("label = $" + str(len(params) + 1)) params.append(update_data["label"]) if not updates: return query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1) params.append(task_id) await conn.execute(query, *params) # 新增:全局存储轮询结果 task_polling_results = deque(maxlen=100) # 存储最近100次轮询结果 # 新增:后台轮询任务 async def poll_iiot_task(pool): """后台轮询任务,每秒查询一次iiot_task表""" while True: try: async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" # 查询实际表而非视图 ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "id": t["id"], "sn": t["sn"], "status": t["status"], "create_time": t["create_time"].isoformat(), # 根据实际表结构添加其他字段 }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 存储结果(实际应用中可替换为数据库/缓存存储) task_polling_results.append({ "timestamp": asyncio.get_event_loop().time(), "data": tasks }) print(f"Polled {len(tasks)} tasks at {asyncio.get_event_loop().time()}") print(tasks) except Exception as e: print(f"Polling error: {str(e)}") await asyncio.sleep(1) # 每秒轮询一次 #获取全部任务 async def task_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_type_label": str(t["task_type_label"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" # 获取全部指令 async def command_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_command" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_sn": str(t["task_sn"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" 以下为api.py代码from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from .services import task_event_generator from .services import command_event_generator from fastapi import HTTPException from .services import task_type_event_generator, create_task_type, update_task_type, delete_task_type # 导入新增的删除函数 router = APIRouter() # 在 api.py 中添加以下路由 from .services import ( command_match_event_generator, create_command_match, update_command_match, delete_command_match ) # 指令匹配SSE推送 @router.get("/sse/command-match") async def sse_command_match(request: Request): return StreamingResponse( command_match_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 创建新指令匹配 @router.post("/command-match") async def add_command_match(request: Request): try: data = await request.json() await create_command_match(request.app.state.db_pool, data) return {"status": "success"} except ValueError as ve: raise HTTPException(400, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 更新指令匹配 @router.put("/command-match/{command_id}") async def modify_command_match(command_id: int, request: Request): try: update_data = await request.json() await update_command_match(request.app.state.db_pool, command_id, update_data) return {"status": "success"} except ValueError as ve: raise HTTPException(400, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 删除指令匹配 @router.delete("/command-match/{command_id}") async def remove_command_match(command_id: int, request: Request): try: success = await delete_command_match(request.app.state.db_pool, command_id) if success: return {"status": "success", "message": "指令匹配已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务类型 @router.get("/sse/task-types") async def sse_task_types(request: Request): return StreamingResponse( task_type_event_generator(request), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # 添加任务类型 @router.post("/task-types") async def add_task_type(request: Request): try: data = await request.json() if not data.get("sn") or not data.get("label"): raise HTTPException(400, "SN和Label不能为空") await create_task_type(request.app.state.db_pool, data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 修改任务类型 @router.put("/task-types/{task_id}") async def modify_task_type(task_id: int, request: Request): try: update_data = await request.json() if not update_data: raise HTTPException(400, "无有效更新数据") await update_task_type(request.app.state.db_pool, task_id, update_data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 删除任务类型 @router.delete("/task-types/{task_id}") async def remove_task_type(task_id: int, request: Request): try: # 验证ID有效性 if not task_id or task_id <= 0: raise HTTPException(400, "无效的任务类型ID") # 执行删除操作 success = await delete_task_type(request.app.state.db_pool, task_id) if success: return {"status": "success", "message": "任务类型已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务信息 @router.get("/sse/tasks") async def sse_tasks(request: Request): return StreamingResponse( task_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 获取指令信息 @router.get("/sse/commands") async def sse_tasks(request: Request): return StreamingResponse( command_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } )

目前我有个前端代码,现在的功能是显示iiot_command_match表的所有数据,增加一个下拉框,里面显示的iiot_task_type表中的所有sn,iiot_task_type中的sn,和iiot_command_match中的task_type_sn是对应的,我希望选中这个下拉框的值,显示的是跟这个SN相匹配的数据,另外增加数据时,其中有一个,任务类型SN,的位置,现在是手动输入,改成下拉框选,选择的项跟前面下拉框的数据一样,都是iiot_task_type的sn,后台接口目前我有这些功能的接口,不用新写,但我会把代码给你,这样前端你知道调用什么。以下为前端代码<template> <el-button type="primary" @click="handleAdd">添加新记录</el-button> <el-table :data="commandMatches" style="width: 100%" height="500"> <el-table-column prop="sn" label="SN" width="180" /> <el-table-column prop="label" label="标签" width="120" /> <el-table-column prop="type" label="类型" width="100" /> <el-table-column prop="priority" label="优先级" width="80" sortable /> <el-table-column prop="task_type_sn" label="任务类型SN" width="150" /> <el-table-column label="执行JSON" min-width="200"> <template #default="{ row }"> {{ JSON.stringify(row.execute_json, null, 2) }} </template> </el-table-column> <el-table-column label="操作" width="180" fixed="right"> <template #default="{ row }"> <el-button size="small" @click="handleEdit(row)">编辑</el-button> <el-button size="small" type="danger" @click="handleDelete(row.id)" >删除</el-button > </template> </el-table-column> </el-table> <el-dialog v-model="dialogVisible" :title="formTitle"> <el-form :model="form" label-width="120px"> <el-form-item label="SN" required> <el-input v-model="form.sn" /> </el-form-item> <el-form-item label="标签"> <el-input v-model="form.label" /> </el-form-item> <el-form-item label="类型"> <el-input v-model="form.type" /> </el-form-item> <el-form-item label="优先级"> <el-input-number v-model="form.priority" :min="1" :max="10" /> </el-form-item> <el-form-item label="任务类型SN"> <el-input v-model="form.task_type_sn" /> </el-form-item> <el-form-item label="执行JSON"> <el-input v-model="form.execute_json" type="textarea" :autosize="{ minRows: 4, maxRows: 8 }" placeholder='例如: {"command": "start", "params": {"speed": 100}}' /> </el-form-item> </el-form> <template #footer> <el-button @click="dialogVisible = false">取消</el-button> <el-button type="primary" @click="submitForm">确认</el-button> </template> </el-dialog> </template> <script setup> import { ref, onMounted, onBeforeUnmount } from "vue"; import { ElMessage, ElMessageBox } from "element-plus"; // 后端API地址 const API_BASE = "http://127.0.0.1:8888"; // 实时数据 const commandMatches = ref([]); let eventSource = null; // 表单相关 const dialogVisible = ref(false); const form = ref({ id: null, sn: "", label: "", type: "", priority: 5, task_type_sn: "", execute_json: "{}", }); const isEditMode = ref(false); const formTitle = ref("添加新指令匹配"); // 初始化SSE连接 const initSSE = () => { if (eventSource) eventSource.close(); eventSource = new EventSource(${API_BASE}/sse/command-match); eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data); commandMatches.value = data; } catch (e) { console.error("SSE数据解析错误:", e); } }; eventSource.onerror = (err) => { console.error("SSE连接错误:", err); // 尝试重新连接 setTimeout(initSSE, 3000); }; }; // 添加新记录 const handleAdd = () => { form.value = { id: null, sn: "", label: "", type: "", priority: 5, task_type_sn: "", execute_json: "{}", }; isEditMode.value = false; formTitle.value = "添加新指令匹配"; dialogVisible.value = true; }; // 编辑记录 const handleEdit = (row) => { form.value = { id: row.id, sn: row.sn, label: row.label, type: row.type, priority: row.priority, task_type_sn: row.task_type_sn, execute_json: JSON.stringify(row.execute_json, null, 2), }; isEditMode.value = true; formTitle.value = "编辑指令匹配"; dialogVisible.value = true; }; // 删除记录 const handleDelete = async (id) => { try { await ElMessageBox.confirm("确定要删除这条记录吗?", "警告", { confirmButtonText: "确定", cancelButtonText: "取消", type: "warning", }); const response = await fetch(${API_BASE}/command-match/${id}, { method: "DELETE", }); if (response.ok) { ElMessage.success("删除成功"); } else { const error = await response.json(); throw new Error(error.detail || "删除失败"); } } catch (error) { ElMessage.error(error.message || "删除操作出错"); } }; // 提交表单 const submitForm = async () => { try { // 验证JSON格式 let executeJson; try { executeJson = JSON.parse(form.value.execute_json); } catch { throw new Error("执行JSON格式不正确"); } const payload = { ...form.value, execute_json: executeJson, }; const url = isEditMode.value ? ${API_BASE}/command-match/${form.value.id} : ${API_BASE}/command-match; const method = isEditMode.value ? "PUT" : "POST"; const response = await fetch(url, { method, headers: { "Content-Type": "application/json" }, body: JSON.stringify(payload), }); if (!response.ok) { const error = await response.json(); throw new Error(error.detail || "操作失败"); } ElMessage.success(isEditMode.value ? "更新成功" : "添加成功"); dialogVisible.value = false; } catch (error) { ElMessage.error(error.message || "操作出错"); } }; // 生命周期钩子 onMounted(() => { initSSE(); }); onBeforeUnmount(() => { if (eventSource) eventSource.close(); }); </script> <style scoped> .command-match-container { padding: 20px; background-color: #fff; border-radius: 4px; box-shadow: 0 2px 12px 0 rgba(0, 0, 0, 0.1); } </style> 以下为service.py代码 import json import asyncio from collections import deque # 在 service.py 中添加以下代码 async def command_match_event_generator(request): """SSE事件生成器,实时推送iiot_command_match表变更""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): async with pool.acquire() as conn: # 查询iiot_command_match表 commands = await conn.fetch( "SELECT id, sn, label, type, priority, task_type_sn, execute_json " "FROM iiot_command_match" ) # 格式化数据 formatted_data = [ { "id": c["id"], "sn": c["sn"], "label": c["label"], "type": c["type"], "priority": c["priority"], "task_type_sn": c["task_type_sn"], "execute_json": json.loads(c["execute_json"]) if c["execute_json"] != '-' else {} } for c in commands ] # 生成SSE事件 yield f"data: {json.dumps(formatted_data)}\n\n" await asyncio.sleep(1) # 每秒更新 except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" async def create_command_match(pool, data: dict): """创建新指令匹配""" required_fields = ["sn", "label", "type", "priority", "task_type_sn", "execute_json"] if not all(field in data for field in required_fields): raise ValueError("缺少必要字段") async with pool.acquire() as conn: # 检查SN是否唯一 existing = await conn.fetchval( "SELECT 1 FROM iiot_command_match WHERE sn = $1", data["sn"] ) if existing: raise ValueError(f"SN {data['sn']} 已存在") # 插入新记录 await conn.execute( "INSERT INTO iiot_command_match(sn, label, type, priority, task_type_sn, execute_json) " "VALUES ($1, $2, $3, $4, $5, $6)", data["sn"], data["label"], data["type"], data["priority"], data["task_type_sn"], json.dumps(data["execute_json"]) ) async def update_command_match(pool, command_id: int, update_data: dict): """更新指令匹配信息""" async with pool.acquire() as conn: # 检查记录是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_command_match WHERE id = $1", command_id) if not exists: raise ValueError(f"指令ID {command_id} 不存在") # 构建更新语句 query = "UPDATE iiot_command_match SET " params = [] updates = [] field_mapping = { "sn": "sn", "label": "label", "type": "type", "priority": "priority", "task_type_sn": "task_type_sn", "execute_json": "execute_json" } for field, db_field in field_mapping.items(): if field in update_data: updates.append(f"{db_field} = ${len(params) + 1}") # 特殊处理JSON字段 if field == "execute_json": params.append(json.dumps(update_data[field])) else: params.append(update_data[field]) if not updates: return query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1) params.append(command_id) await conn.execute(query, *params) async def delete_command_match(pool, command_id: int): """删除指令匹配""" async with pool.acquire() as conn: # 检查记录是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_command_match WHERE id = $1", command_id) if not exists: raise ValueError(f"指令ID {command_id} 不存在") # 执行删除操作 await conn.execute("DELETE FROM iiot_command_match WHERE id = $1", command_id) return True # 添加删除任务类型功能 async def delete_task_type(pool, task_id: int): """删除任务类型""" async with pool.acquire() as conn: # 检查任务类型是否存在 exists = await conn.fetchval("SELECT 1 FROM iiot_task_type WHERE id = $1", task_id) if not exists: raise ValueError(f"任务类型ID {task_id} 不存在") # 执行删除操作 await conn.execute("DELETE FROM iiot_task_type WHERE id = $1", task_id) return True # service.py 新增代码 async def task_type_event_generator(request): """SSE事件生成器,实时推送iiot_task_type表变更""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): async with pool.acquire() as conn: # 查询iiot_task_type表 task_types = await conn.fetch("SELECT id, sn, label FROM iiot_task_type") # 格式化数据 formatted_data = [ {"id": t["id"], "sn": t["sn"], "label": t["label"]} for t in task_types ] # 生成SSE事件 yield f"data: {json.dumps(formatted_data)}\n\n" await asyncio.sleep(1) # 每秒更新 except Exception as e: yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" async def create_task_type(pool, data: dict): """创建新任务类型""" async with pool.acquire() as conn: await conn.execute( "INSERT INTO iiot_task_type(sn, label) VALUES ($1, $2)", data["sn"], data["label"] ) async def update_task_type(pool, task_id: int, update_data: dict): """更新任务类型信息""" async with pool.acquire() as conn: query = "UPDATE iiot_task_type SET " params = [] updates = [] if "sn" in update_data: updates.append("sn = $" + str(len(params) + 1)) params.append(update_data["sn"]) if "label" in update_data: updates.append("label = $" + str(len(params) + 1)) params.append(update_data["label"]) if not updates: return query += ", ".join(updates) + " WHERE id = $" + str(len(params) + 1) params.append(task_id) await conn.execute(query, *params) # 新增:全局存储轮询结果 task_polling_results = deque(maxlen=100) # 存储最近100次轮询结果 # 新增:后台轮询任务 async def poll_iiot_task(pool): """后台轮询任务,每秒查询一次iiot_task表""" while True: try: async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" # 查询实际表而非视图 ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "id": t["id"], "sn": t["sn"], "status": t["status"], "create_time": t["create_time"].isoformat(), # 根据实际表结构添加其他字段 }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 存储结果(实际应用中可替换为数据库/缓存存储) task_polling_results.append({ "timestamp": asyncio.get_event_loop().time(), "data": tasks }) print(f"Polled {len(tasks)} tasks at {asyncio.get_event_loop().time()}") print(tasks) except Exception as e: print(f"Polling error: {str(e)}") await asyncio.sleep(1) # 每秒轮询一次 #获取全部任务 async def task_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_task" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_type_label": str(t["task_type_label"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" # 获取全部指令 async def command_event_generator(request): """合并后的SSE事件生成器,包含数据库查询和数据转换逻辑""" pool = request.app.state.db_pool try: while not await request.is_disconnected(): # 直接执行数据库查询 async with pool.acquire() as conn: db_tasks = await conn.fetch( "SELECT * FROM v_iiot_command" ) # 处理查询结果 tasks = [] for t in db_tasks: try: tasks.append({ "label": str(t["label"]), "sn": str(t["sn"]), "task_sn": str(t["task_sn"]), "priority": int(t["priority"]), "create_time": t["create_time"].isoformat(), "status_label": str(t["status_label"]) }) except Exception as e: print(f"Error processing task {t.get('id', 'unknown')}: {str(e)}") # 生成SSE事件 yield f"data: {json.dumps(tasks)}\n\n" await asyncio.sleep(2) except Exception as e: # 异常处理 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" 以下为api.py代码 from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from .services import task_event_generator from .services import command_event_generator from fastapi import HTTPException from .services import task_type_event_generator, create_task_type, update_task_type, delete_task_type # 导入新增的删除函数 router = APIRouter() # 在 api.py 中添加以下路由 from .services import ( command_match_event_generator, create_command_match, update_command_match, delete_command_match ) # 指令匹配SSE推送 @router.get("/sse/command-match") async def sse_command_match(request: Request): return StreamingResponse( command_match_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 创建新指令匹配 @router.post("/command-match") async def add_command_match(request: Request): try: data = await request.json() await create_command_match(request.app.state.db_pool, data) return {"status": "success"} except ValueError as ve: raise HTTPException(400, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 更新指令匹配 @router.put("/command-match/{command_id}") async def modify_command_match(command_id: int, request: Request): try: update_data = await request.json() await update_command_match(request.app.state.db_pool, command_id, update_data) return {"status": "success"} except ValueError as ve: raise HTTPException(400, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 删除指令匹配 @router.delete("/command-match/{command_id}") async def remove_command_match(command_id: int, request: Request): try: success = await delete_command_match(request.app.state.db_pool, command_id) if success: return {"status": "success", "message": "指令匹配已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务类型 @router.get("/sse/task-types") async def sse_task_types(request: Request): return StreamingResponse( task_type_event_generator(request), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # 添加任务类型 @router.post("/task-types") async def add_task_type(request: Request): try: data = await request.json() if not data.get("sn") or not data.get("label"): raise HTTPException(400, "SN和Label不能为空") await create_task_type(request.app.state.db_pool, data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 修改任务类型 @router.put("/task-types/{task_id}") async def modify_task_type(task_id: int, request: Request): try: update_data = await request.json() if not update_data: raise HTTPException(400, "无有效更新数据") await update_task_type(request.app.state.db_pool, task_id, update_data) return {"status": "success"} except Exception as e: raise HTTPException(500, str(e)) # 删除任务类型 @router.delete("/task-types/{task_id}") async def remove_task_type(task_id: int, request: Request): try: # 验证ID有效性 if not task_id or task_id <= 0: raise HTTPException(400, "无效的任务类型ID") # 执行删除操作 success = await delete_task_type(request.app.state.db_pool, task_id) if success: return {"status": "success", "message": "任务类型已删除"} else: raise HTTPException(500, "删除操作未成功") except ValueError as ve: raise HTTPException(404, str(ve)) except Exception as e: raise HTTPException(500, str(e)) # 获取任务信息 @router.get("/sse/tasks") async def sse_tasks(request: Request): return StreamingResponse( task_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # 获取指令信息 @router.get("/sse/commands") async def sse_tasks(request: Request): return StreamingResponse( command_event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } )

评论
用户头像
药罐子也有未来
2025.08.28
最后,还介绍了如何使用SseEmitter简化SSE的实现,提高了开发效率。
用户头像
xhmoon
2025.07.26
通过代码示例和扩展内容,文档不仅解决了问题,还提供了深入理解和实践的途径。🍚
用户头像
杏花朵朵
2025.06.29
文档结构清晰,从核心响应头的设置到具体的实现方式,再到关键注意事项和测试方法,一步步引导用户完成配置。🍎
用户头像
Friday永不为奴
2025.06.18
这段文档提供了详细的MediaType.TEXT_EVENT_STREAM_VALUE使用时的头设置方法和注意点,对于实现SSE非常有帮助。
用户头像
鸣泣的海猫
2025.06.11
强调了连接保持和禁止压缩的重要性,这对于SSE的性能和稳定性至关重要。
用户头像
Orca是只鲸
2025.06.08
提供了详细的SSE事件格式规范,确保用户能够正确构造事件消息。👐

最新推荐

recommend-type

【微信小程序源码】小程序官方Demo.zip

资源说明: 1:本资料仅用作交流学习参考,请切勿用于商业用途。 2:一套精品实用微信小程序源码资源,无论是入门练手还是项目复用都超实用,省去重复开发时间,让开发少走弯路! 更多精品资源请访问 https://blog.csdn.net/ashyyyy/article/details/146464041
recommend-type

体育赛事摘要数据集构建与自然语言处理技术应用_基于人工标注的大规模体育赛事评论文本与新闻文本摘要数据集SGSum_提供高质量训练集验证集测试集用于学术研究支持文本摘要模型开发与评估.zip

体育赛事摘要数据集构建与自然语言处理技术应用_基于人工标注的大规模体育赛事评论文本与新闻文本摘要数据集SGSum_提供高质量训练集验证集测试集用于学术研究支持文本摘要模型开发与评估.zip
recommend-type

Docker化部署TS3AudioBot教程与实践

### 标题知识点 #### TS3AudioBot_docker - **Dockerfile的用途与组成**:Dockerfile是一个文本文件,包含了所有构建Docker镜像的命令。开发者可以通过编辑Dockerfile来指定Docker镜像创建时所需的所有指令,包括基础镜像、运行时指令、环境变量、软件安装、文件复制等。TS3AudioBot_docker表明这个Dockerfile与TS3AudioBot项目相关,TS3AudioBot可能是一个用于TeamSpeak 3服务器的音频机器人,用于播放音频或与服务器上的用户进行交互。 - **Docker构建过程**:在描述中,有两种方式来获取TS3AudioBot的Docker镜像。一种是从Dockerhub上直接运行预构建的镜像,另一种是自行构建Docker镜像。自建过程会使用到docker build命令,而从Dockerhub运行则会用到docker run命令。 ### 描述知识点 #### Docker命令的使用 - **docker run**:这个命令用于运行一个Docker容器。其参数说明如下: - `--name tsbot`:为运行的容器指定一个名称,这里命名为tsbot。 - `--restart=always`:设置容器重启策略,这里是总是重启,确保容器在失败后自动重启。 - `-it`:这是一对参数,-i 表示交互式操作,-t 分配一个伪终端。 - `-d`:表示后台运行容器。 - `-v /home/tsBot/data:/data`:将宿主机的/home/tsBot/data目录挂载到容器内的/data目录上,以便持久化存储数据。 - `rofl256/tsaudiobot` 或 `tsaudiobot`:指定Docker镜像名称。前者可能是从DockerHub上获取的带有用户名命名空间的镜像,后者是本地构建或已重命名的镜像。 #### Docker构建流程 - **构建镜像**:使用docker build命令可以将Dockerfile中的指令转化为一个Docker镜像。`docker build . -t tsaudiobot`表示从当前目录中读取Dockerfile,并创建一个名为tsaudiobot的镜像。构建过程中,Docker会按顺序执行Dockerfile中的指令,比如FROM、RUN、COPY等,最终形成一个包含所有依赖和配置的应用镜像。 ### 标签知识点 #### Dockerfile - **Dockerfile的概念**:Dockerfile是一个包含创建Docker镜像所有命令的文本文件。它被Docker程序读取,用于自动构建Docker镜像。Dockerfile中的指令通常包括安装软件、设置环境变量、复制文件等。 - **Dockerfile中的命令**:一些常用的Dockerfile命令包括: - FROM:指定基础镜像。 - RUN:执行命令。 - COPY:将文件或目录复制到镜像中。 - ADD:类似于COPY,但是 ADD 支持从URL下载文件以及解压 tar 文件。 - ENV:设置环境变量。 - EXPOSE:声明端口。 - VOLUME:创建挂载点。 - CMD:容器启动时要运行的命令。 - ENTRYPOINT:配置容器启动时的执行命令。 ### 压缩包子文件的文件名称列表知识点 #### 文件命名 - **TS3AudioBot_docker-main**:此文件名表明了这是一个主要的代码库或Dockerfile的存放位置。在开发中,通常main分支代表当前的主版本或正在积极开发的分支。因此TS3AudioBot_docker-main可能表示这是在Dev分支上开发的Dockerfile的主要代码版本。主分支一般比较稳定,并作为新的特性开发的基础。 ### 综合知识点 - **Docker在DevOps中的角色**:Docker作为一种轻量级的容器化技术,在DevOps领域扮演重要角色。它可以快速部署、一致的运行环境、便于测试和迁移应用。通过Dockerfile的编写和docker build命令,开发者可以构建可移植的容器镜像,通过docker run命令则可以快速启动容器实例。 - **TS3AudioBot与TeamSpeak**:TS3AudioBot很可能是一个针对TeamSpeak 3服务器的自动化解决方案。TeamSpeak是一个语音通讯软件,广泛用于线上游戏团队进行沟通。一个音频机器人可以提供自动化的消息通知、音频流控制等功能,提高游戏社区的交流效率。 - **Docker镜像的版本管理**:镜像的命名通常包括用户名、项目名和标签。如rofl256/tsaudiobot中rofl256可能是一个用户名,tsaudiobot可能是项目名称,这样的命名规则有利于镜像的共享和管理。 在介绍以上知识点后,我们理解了标题、描述、标签及文件名称列表中所含的知识点。这些知识点涵盖了Dockerfile的构建、Docker镜像的使用、TS3AudioBot功能以及版本控制等多个方面,为IT专业人员在使用和开发Docker相关项目时提供了必要的信息。
recommend-type

零售销售数据的探索性分析与DeepAR模型预测

### 零售销售数据的探索性分析与DeepAR模型预测 #### 1. 探索性数据分析 在拥有45家商店的情况下,我们选择了第20号商店,来分析其不同部门在三年间的销售表现。借助DeepAR算法,我们可以了解不同部门商品的销售情况。 在SageMaker中,通过生命周期配置(Lifecycle Configurations),我们可以在笔记本实例启动前自定义安装Python包,避免在执行笔记本前手动跟踪所需的包。为了探索零售销售数据,我们需要安装最新版本(0.9.0)的seaborn库。具体操作步骤如下: 1. 在SageMaker的Notebook下,点击Lifecycle Config
recommend-type

llm agent平台

<think>好的,我现在需要帮用户寻找与LLM代理平台解决方案相关的信息。首先,我得明确用户的需求。用户提到了“LLM agent platform solutions”,也就是大型语言模型代理平台的解决方案。这可能涉及到如何构建、部署或优化基于LLM的代理系统。接下来,我应该回忆之前用户提供的引用内容,看看有没有相关的信息可以利用。 根据引用[1],提到构建LLM应用程序的步骤分解,可能涉及到代理平台的设计。引用[2]讨论了评估LLM的挑战,包括可重复性和开源模型的解决方案,这可能影响代理平台的稳定性和选择。引用[3]则提到大模型相关的岗位和面试题,可能涉及实际应用中的技术问题。 接下
recommend-type

Docker实现OAuth2代理:安全的HTTPS解决方案

### 知识点详细说明: #### Dockerfile基础 Dockerfile是一种文本文件,它包含了用户创建Docker镜像所需的命令和参数。Docker通过读取Dockerfile中的指令自动构建镜像。Dockerfile通常包含了如下载基础镜像、安装软件包、执行脚本等指令。 #### Dockerfile中的常用指令 1. **FROM**: 指定基础镜像,所有的Dockerfile都必须以FROM开始。 2. **RUN**: 在构建过程中执行命令,如安装软件。 3. **CMD**: 设置容器启动时运行的命令,可以被docker run命令后面的参数覆盖。 4. **EXPOSE**: 告诉Docker容器在运行时监听指定的网络端口。 5. **ENV**: 设置环境变量。 6. **ADD**: 将本地文件复制到容器中,如果是tar归档文件会自动解压。 7. **ENTRYPOINT**: 设置容器启动时的默认命令,不会被docker run命令覆盖。 8. **VOLUME**: 创建一个挂载点以挂载外部存储,如磁盘或网络文件系统。 #### OAuth 2.0 Proxy OAuth 2.0 Proxy 是一个轻量级的认证代理,用于在应用程序前提供OAuth认证功能。它主要通过HTTP重定向和回调机制,实现对下游服务的安全访问控制,支持多种身份提供商(IdP),如Google, GitHub等。 #### HTTPS和SSL/TLS HTTPS(HTTP Secure)是HTTP的安全版本,它通过SSL/TLS协议加密客户端和服务器之间的通信。使用HTTPS可以保护数据的机密性和完整性,防止数据在传输过程中被窃取或篡改。SSL(Secure Sockets Layer)和TLS(Transport Layer Security)是用来在互联网上进行通信时加密数据的安全协议。 #### Docker容器与HTTPS 为了在使用Docker容器时启用HTTPS,需要在容器内配置SSL/TLS证书,并确保使用443端口。这通常涉及到配置Nginx或Apache等Web服务器,并将其作为反向代理运行在Docker容器内。 #### 临时分叉(Fork) 在开源领域,“分叉”指的是一种特殊的复制项目的行为,通常是为了对原项目进行修改或增强功能。分叉的项目可以独立于原项目发展,并可选择是否合并回原项目。在本文的语境下,“临时分叉”可能指的是为了实现特定功能(如HTTPS支持)而在现有Docker-oauth2-proxy项目基础上创建的分支版本。 #### 实现步骤 要实现HTTPS支持的docker-oauth2-proxy,可能需要进行以下步骤: 1. **准备SSL/TLS证书**:可以使用Let's Encrypt免费获取证书或自行生成。 2. **配置Nginx/Apache服务器**:在Dockerfile中添加配置,以使用SSL证书和代理设置。 3. **修改OAuth2 Proxy设置**:调整OAuth2 Proxy配置以使用HTTPS连接。 4. **分叉Docker-oauth2-proxy项目**:创建项目的分支副本,以便进行修改。 5. **编辑Dockerfile**:在分叉的项目中编写或修改Dockerfile,包括下载基础镜像、设置环境变量、添加SSL证书、配置Nginx/Apache和OAuth2 Proxy等步骤。 6. **构建和测试新镜像**:使用Docker构建镜像,并在安全环境中进行测试,确保HTTPS配置正确,并且OAuth2 Proxy功能正常工作。 7. **部署到生产环境**:在确认无误后,将配置好的镜像部署到生产环境中。 #### 压缩包子文件的文件名称列表 - **docker-oauth2-proxy-master**: 这可能是指在GitHub等代码托管平台上,docker-oauth2-proxy项目的主分支或主仓库。名称列表中的“master”暗示了该文件夹包含的是主分支的代码。 总结来说,要实现一个支持HTTPS的docker-oauth2-proxy,开发者需要进行一系列的配置和编码工作,包括使用Dockerfile来构建自定义的Docker镜像,配置SSL/TLS证书,分叉并修改现有的开源项目代码。通过这些步骤,可以确保OAuth2 Proxy能够安全地处理HTTPS请求,并为下游服务提供安全认证功能。
recommend-type

利用AmazonSageMaker进行图像分类:从理论到实践

# 利用 Amazon SageMaker 进行图像分类:从理论到实践 ## 1. 主题建模与图像分类概述 在数据科学领域,从大量非结构化数据中提取信息和主题至关重要。像 SageMaker 的神经主题模型(NTM)这类主题建模技术,提供了线性和非线性学习方法,能帮助我们深入挖掘数据中的潜在主题。它通过特定的架构和内部机制工作,还支持分布式训练,将数据集分割成多个块进行并行处理。训练完成后,我们可以将模型部署为端点并进行推理,例如解读安然邮件中的主题。 图像分类在过去五年中一直是热门研究领域,因为它能解决众多行业的实际业务问题,如自动驾驶汽车行业就高度依赖图像分类和目标检测模型的准确性。A
recommend-type

wed前端开发网页设计

<think>好的,用户需要寻找关于Web前端开发和网页设计的相关资源或教程。首先,我需要理解用户的具体需求,他们可能刚入门,想要系统学习,或者有一定基础,需要进阶教程。根据用户的问题,他们提到了“教程”和“资源”,可能希望推荐书籍、在线课程、框架文档以及社区论坛等。 接下来,我需要参考用户提供的引用内容。引用[1]提到了周文洁的《HTML5网页前端设计实战》,这是一本配套的实战项目教程,适合有基础的读者,可能可以作为书籍推荐之一。引用[2]概述了Web前端开发的技术分类,包括客户端和服务器端技术,以及常用框架如Bootstrap、React等。引用[3]是关于Delphi的TMS WEB
recommend-type

eosforce下的scatter API应用实例教程

### eosforce使用分散API #### 知识点一:什么是EOSForce EOSForce是以EOSIO为技术基础,旨在为区块链应用提供高性能的公链解决方案。它类似于EOS,也使用了EOSIO软件套件,开发者可以基于EOSIO构建DAPP应用,同时它可能拥有与EOS不同的社区治理结构和经济模型。对于开发者来说,了解EOSForce的API和功能是非常关键的,因为它直接影响到应用的开发与部署。 #### 知识点二:scatter API的介绍 scatter API 是一个开源的JavaScript库,它的目的是为了简化EOSIO区块链上各类操作,包括账户管理和交易签名等。scatter旨在提供一个更为便捷、安全的用户界面,通过API接口与EOSIO区块链进行交互。用户无需保存私钥即可与区块链进行交互,使得整个过程更加安全,同时开发者也能够利用scatter实现功能更加强大的应用。 #### 知识点三:scatter API在EOSForce上的应用 在EOSForce上使用scatter API可以简化开发者对于区块链交互的工作,无需直接处理复杂的私钥和签名问题。scatter API提供了一整套用于与区块链交互的方法,包括但不限于账户创建、身份验证、签名交易、数据读取等。通过scatter API,开发者可以更加专注于应用逻辑的实现,而不必担心底层的区块链交互细节。 #### 知识点四:安装和运行scatter_demo项目 scatter_demo是基于scatter API的一个示例项目,通过它可以学习如何将scatter集成到应用程序中。根据提供的描述,安装该项目需要使用npm,即Node.js的包管理器。首先需要执行`npm install`来安装依赖,这个过程中npm会下载scatter_demo项目所需的所有JavaScript包。安装完成后,可以通过运行`npm run dev`命令启动项目,该命令通常与项目中的开发环境配置文件(如webpack.config.js)相对应,用于启动本地开发服务器和热重载功能,以便开发者实时观察代码修改带来的效果。 #### 知识点五:配置eosforce到scatter 在scatter_demo项目中,将eosforce配置到scatter需要进入scatter的设置界面。scatter提供了一个可视化的界面,允许用户管理自己的区块链网络配置。在scatter设置中选择“网络”一栏,然后选择“新建”,在此步骤中需要选择“eos”作为区块链类型。之后,将eosforce的节点配置信息填入对应区域,完成网络的设置。这样,scatter就能够连接到eosforce区块链,用户可以通过scatter API与eosforce区块链进行交互。 #### 知识点六:npm包管理器及安装命令 npm是Node.js的包管理器,它在Node.js项目中扮演着至关重要的角色,用来安装、管理和分享项目所需的代码包。`npm install`是npm的常用命令,用于安装项目依赖。在执行此命令时,npm会根据项目根目录中的`package.json`文件自动下载并安装所需的所有依赖包。这个文件描述了项目的依赖关系,包括每个依赖的版本范围。通过这种方式,开发者可以确保项目在不同的环境中都能够有相同的行为表现。 #### 结论 了解和掌握如何在EOSForce中使用scatter API对于区块链应用开发者来说是极其重要的。scatter简化了与EOSIO区块链的交互过程,提高开发效率,降低了实现复杂功能的技术门槛。在实际操作中,开发者需要关注如何正确安装scatter_demo项目,如何配置eosforce节点到scatter中,以及如何利用scatter API开发出功能完善的区块链应用。通过这些知识点的了解,开发者可以更好地把握EOSForce生态中的开发实践,并在项目中实现安全、便捷的区块链交互。
recommend-type

AI模型评估与应用拓展

# AI模型评估与应用拓展 ## 1. 模型性能评估 ### 1.1 实验假设 我们有三个模型,假设第一个基于周一和周二点击数据训练的模型,在预测周四后期应用下载量方面,不如第二和第三个模型。第二个基于周一到周三点击数据训练的模型,性能也不如第三个基于周一到周四大部分时间点击数据训练的模型。 ### 1.2 特征重要性分析 以下代码用于分析每个模型中重要的特征: ```python exp_lst = ['exp1', 'exp2', 'exp3'] for exp in exp_lst: model_file = os.path.join(sm_output_loc, exp,