活动介绍

self.serial_port = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, write_timeout=1 )

时间: 2025-07-25 10:30:27 AIGC 浏览: 24
<think>我们正在配置Python的pyserial库中的serial.Serial初始化参数。根据引用[2],我们可以看到在初始化Serial类时,需要传入以下参数:devport(端口号)、devbaudrate(波特率)、devbytesize(数据位大小)、devparity(校验位)、devstopbits(停止位)。在pyserial中,这些参数对应serial.Serial类的初始化参数。 具体参数配置如下: 1. port: 串口设备名,例如'COM1'(Windows)或'/dev/ttyUSB0'(Linux)。 2. baudrate: 波特率,常用的有9600, 19200, 38400, 57600, 115200等。 3. bytesize: 数据位,可选值有:serial.FIVEBITS, serial.SIXBITS, serial.SEVENBITS, serial.EIGHTBITS。通常使用8位数据位,即serial.EIGHTBITS。 4. parity: 校验位,可选值有:serial.PARITY_NONE(无校验), serial.PARITY_EVEN(偶校验), serial.PARITY_ODD(奇校验), serial.PARITY_MARK, serial.PARITY_SPACE。通常使用无校验(serial.PARITY_NONE)。 5. stopbits: 停止位,可选值有:serial.STOPBITS_ONE(1位停止位), serial.STOPBITS_ONE_POINT_FIVE(1.5位停止位,仅部分平台支持), serial.STOPBITS_TWO(2位停止位)。通常使用1位停止位(serial.STOPBITS_ONE)。 另外,还可以设置其他可选参数,如timeout(读超时时间)、write_timeout(写超时时间)等。 示例代码: ```python import serial # 初始化串口 ser = serial.Serial( port='COM1', # 串口号 baudrate=9600, # 波特率 bytesize=serial.EIGHTBITS, # 数据位,8位 parity=serial.PARITY_NONE, # 校验位,无 stopbits=serial.STOPBITS_ONE, # 停止位,1位 timeout=1 # 读超时时间,单位秒,设置为None则一直等待直到读取到指定数据,0表示非阻塞 ) # 使用串口 ser.write(b'Hello') # 发送数据,注意要发送bytes类型 data = ser.readline() # 读取一行 print(data) # 关闭串口 ser.close() ``` 注意事项: - 在发送数据时,需要将字符串编码为二进制数据,例如使用`'Hello'.encode()`或直接写为字节串`b'Hello'`。 - 读取数据时,返回的是二进制数据,需要解码为字符串,例如`data.decode('utf-8')`。 如果你需要更高级的封装,可以参考引用[1]中的`cushy-serial`库,它提供了基于事件回调的串口通信方式。 §§相关问题§§ 1. 如何解决pyserial打开串口时提示“端口不存在”或“拒绝访问”的问题? 2. 如何设置pyserial的超时机制? 3. 在pyserial中如何处理二进制数据与字符串的转换?</think>### Python pyserial 串口初始化配置指南 在 Python 中使用 pyserial 库初始化串口通信时,需要正确配置 `serial.Serial()` 类的参数。以下是关键参数说明和配置方法: #### 核心初始化参数 ```python import serial ser = serial.Serial( port='COM1', # 串口号 (Windows: COM1, Linux: /dev/ttyUSB0) baudrate=9600, # 波特率 (常用值: 9600, 19200, 115200) bytesize=serial.EIGHTBITS, # 数据位 (可选: FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS) parity=serial.PARITY_NONE, # 校验位 (可选: PARITY_NONE, PARITY_EVEN, PARITY_ODD) stopbits=serial.STOPBITS_ONE, # 停止位 (可选: STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO) timeout=1, # 读超时(秒),None=阻塞模式,0=非阻塞 write_timeout=1 # 写超时(秒) ) ``` #### 参数详解 1. **端口号 (port)** - Windows: `'COM1'`, `'COM3'` - Linux: `'/dev/ttyUSB0'`, `'/dev/ttyACM0'` - 自动检测示例: ```python import serial.tools.list_ports ports = [p.device for p in serial.tools.list_ports.comports()] ``` 2. **波特率 (baudrate)** 常用值:$9600$, $19200$, $38400$, $57600$, $115200$ 需与设备设置一致,否则会出现乱码[^1]。 3. **数据位 (bytesize)** - $8$ 位最常用:`serial.EIGHTBITS` - 老设备可能使用 $7$ 位:`serial.SEVENBITS` 4. **校验位 (parity)** ```python serial.PARITY_NONE # 无校验 (常用) serial.PARITY_EVEN # 偶校验 serial.PARITY_ODD # 奇校验 ``` 5. **停止位 (stopbits)** ```python serial.STOPBITS_ONE # 1位停止位 (常用) serial.STOPBITS_TWO # 2位停止位 ``` 6. **超时设置** ```python timeout=None # 阻塞模式,等待数据直到读取完成 timeout=0 # 非阻塞模式,立即返回可用数据 timeout=0.5 # 等待0.5秒后返回 ``` #### 完整示例 ```python import serial try: ser = serial.Serial( port='COM3', baudrate=115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1 ) print(f"串口已打开: {ser.name}") # 发送数据 (需编码为bytes) ser.write("AT指令\r\n".encode('utf-8')) # 接收数据 response = ser.readline().decode('utf-8', errors='replace') print(f"收到响应: {response}") except serial.SerialException as e: print(f"串口错误: {str(e)}") finally: if 'ser' in locals() and ser.is_open: ser.close() ``` #### 常见问题解决 1. **权限问题 (Linux)** 使用 `sudo` 或添加用户到 `dialout` 组: ```bash sudo usermod -aG dialout $USER ``` 2. **端口占用错误** 关闭其他串口监控软件,或使用 `ser.close()` 后重新打开。 3. **数据乱码** 检查波特率、数据位、停止位是否与设备匹配,解码时使用正确编码: ```python data = ser.read(10).decode('ascii') # ASCII设备 data = ser.read(10).decode('utf-8') # UTF-8设备 ```
阅读全文

相关推荐

当前代码如下: import sys import serial import serial.tools.list_ports import numpy as np from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QComboBox, QLabel, QGroupBox, QStatusBar, QTextEdit, QSplitter, QCheckBox) from PyQt5.QtCore import QTimer, Qt, QThread, pyqtSignal from pyqtgraph import PlotWidget, mkPen, mkBrush import pyqtgraph as pg from collections import deque import time import binascii class LidarWorker(QThread): """雷达数据处理线程""" new_points = pyqtSignal(list) # 信号:发射新的点云数据 raw_data_signal = pyqtSignal(str) # 信号:发射原始数据 def __init__(self, port, baudrate=153600): super().__init__() self.port = port self.baudrate = baudrate self.serial_conn = None self.running = False self._last_valid_dist = 1.0 # 上次有效距离值(用于突变检测) self.buffer = bytearray() # 确保缓冲区初始化 # 根据实际数据确认的配置 self.HEADER = b'\xb4\x4b' # 正确的包头 self.HEADER_LENGTH = 2 self.POINT_SIZE = 5 # 角度2字节 + 距离2字节 + 质量1字节 # 雷达参数 self.max_distance = 8.0 # 最大测距距离(米) self.min_distance = 0.12 # 最小测距距离(米) # 调试 self.debug_counter = 0 def run(self): """主线程函数""" if not self._connect_serial(): self.new_points.emit([]) return while self.running: if self.serial_conn.in_waiting > 0: data = self.serial_conn.read(self.serial_conn.in_waiting) self.raw_data_signal.emit(binascii.hexlify(data).decode('ascii')) self._process_buffer(data) time.sleep(0.001) self._disconnect_serial() def _connect_serial(self): """连接串口""" try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.1 ) self.running = True return True except serial.SerialException as e: print(f"无法连接到串口: {e}") return False def _disconnect_serial(self): """断开串口连接""" if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() def _process_buffer(self, new_data): """最终版数据解析方法""" self.buffer.extend(new_data) points = [] while len(self.buffer) >= 7: # 最小完整包长度 # 增强版包头检测(允许部分数据干扰) header_pos = -1 for i in range(min(len(self.buffer) - 1, 100)): # 最多检查前100字节 if self.buffer[i] == 0xb4 and self.buffer[i + 1] == 0x4b: header_pos = i break if header_pos == -1: # 保留最后1字节可能的部分包头 self.buffer = self.buffer[-1:] if len(self.buffer) > 0 else bytearray() break # 移除包头前的无效数据 if header_pos > 0: self.buffer = self.buffer[header_pos:] if len(self.buffer) < 7: break # 提取完整数据包 packet = self.buffer[:7] self.buffer = self.buffer[7:] try: # 改进的字节顺序检测 angle_be = packet[2] * 256 + packet[3] angle_le = packet[3] * 256 + packet[2] dist_be = packet[4] * 256 + packet[5] dist_le = packet[5] * 256 + packet[4] # 测试所有可能的组合 for angle, dist in [(angle_be, dist_be), (angle_le, dist_le), (angle_be, dist_le), (angle_le, dist_be)]: actual_angle = angle * 0.01 actual_dist = dist * 0.001 if (0 <= actual_angle < 360 and 0.1 <= actual_dist <= 12.0 and abs(actual_dist - self._last_valid_dist) < 2.0): # 距离突变检查 points.append((actual_angle, actual_dist)) self._last_valid_dist = actual_dist break except Exception as e: print(f"数据包解析异常: {e}\n原始数据: {binascii.hexlify(packet)}") if points: self.new_points.emit(points) elif len(self.buffer) > 20: # 缓冲区积压过多 print(f"警告: 缓冲区积压 {len(self.buffer)}字节,执行清空") self.buffer.clear() def stop(self): """停止线程""" self.running = False self.wait() class LidarApp(QMainWindow): """主应用程序窗口""" def __init__(self): super().__init__() self.setWindowTitle("YDLIDAR X3Pro 平面地图") self.setGeometry(100, 100, 1200, 800) # 初始化UI self.init_ui() # 雷达工作线程 self.lidar_worker = None # 点云数据 self.point_cloud = [] self.map_points = [] # 用于地图显示的点 # 更新可用串口 self.update_serial_ports() # 设置定时器定期更新图形 self.plot_timer = QTimer() self.plot_timer.timeout.connect(self.update_map_display) self.plot_timer.start(50) # 20Hz刷新率 def init_ui(self): """初始化用户界面""" # 主窗口布局 main_widget = QWidget() self.setCentralWidget(main_widget) main_layout = QHBoxLayout(main_widget) main_layout.setContentsMargins(5, 5, 5, 5) # 使用分割器实现可调整布局 splitter = QSplitter(Qt.Horizontal) # 左侧控制面板 control_panel = QWidget() control_panel.setMaximumWidth(300) control_layout = QVBoxLayout() control_layout.setContentsMargins(5, 5, 5, 5) # 串口设置组 port_group = QGroupBox("串口设置") port_layout = QVBoxLayout() self.port_combo = QComboBox() self.refresh_btn = QPushButton("刷新串口列表") self.refresh_btn.clicked.connect(self.update_serial_ports) port_layout.addWidget(QLabel("选择串口:")) port_layout.addWidget(self.port_combo) port_layout.addWidget(self.refresh_btn) port_group.setLayout(port_layout) # 雷达控制组 control_group = QGroupBox("雷达控制") control_btn_layout = QVBoxLayout() self.start_btn = QPushButton("开始扫描") self.start_btn.clicked.connect(self.start_lidar) self.stop_btn = QPushButton("停止扫描") self.stop_btn.clicked.connect(self.stop_lidar) self.stop_btn.setEnabled(False) # 地图显示选项 self.accumulate_check = QCheckBox("累积模式") self.accumulate_check.setChecked(True) self.clear_map_btn = QPushButton("清空地图") self.clear_map_btn.clicked.connect(self.clear_map) control_btn_layout.addWidget(self.start_btn) control_btn_layout.addWidget(self.stop_btn) control_btn_layout.addWidget(self.accumulate_check) control_btn_layout.addWidget(self.clear_map_btn) control_group.setLayout(control_btn_layout) # 状态信息组 status_group = QGroupBox("状态信息") status_layout = QVBoxLayout() self.status_label = QLabel("状态: 未连接") self.point_count_label = QLabel("点数: 0") self.fps_label = QLabel("FPS: 0") status_layout.addWidget(self.status_label) status_layout.addWidget(self.point_count_label) status_layout.addWidget(self.fps_label) status_layout.addStretch() status_group.setLayout(status_layout) # 添加到控制面板 control_layout.addWidget(port_group) control_layout.addWidget(control_group) control_layout.addWidget(status_group) control_layout.addStretch() control_panel.setLayout(control_layout) # 右侧主显示区 right_panel = QWidget() right_layout = QVBoxLayout() # 平面地图显示 self.map_widget = pg.PlotWidget() self.map_widget.setBackground('w') self.map_widget.setTitle("平面地图 - YDLIDAR X3Pro", color='k') self.map_widget.setLabel('left', 'Y (m)') self.map_widget.setLabel('bottom', 'X (m)') self.map_widget.showGrid(x=True, y=True) self.map_widget.setXRange(-8, 8) self.map_widget.setYRange(-8, 8) self.map_widget.setAspectLocked(True) # 地图散点图 self.map_plot = pg.ScatterPlotItem( size=5, pen=mkPen(color='b', width=1), brush=mkBrush('b') ) self.map_widget.addItem(self.map_plot) # 原始数据展示 self.raw_data_display = QTextEdit() self.raw_data_display.setReadOnly(True) self.raw_data_display.setMaximumHeight(150) self.raw_data_display.setStyleSheet("font-family: monospace;") right_layout.addWidget(self.map_widget) right_layout.addWidget(QLabel("原始数据(十六进制):")) right_layout.addWidget(self.raw_data_display) right_panel.setLayout(right_layout) # 添加分割器 splitter.addWidget(control_panel) splitter.addWidget(right_panel) splitter.setStretchFactor(1, 3) # 右侧区域占用更多空间 main_layout.addWidget(splitter) # 状态栏 self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("就绪") # FPS计算 self.last_update_time = time.time() self.frame_count = 0 self.current_fps = 0 def update_serial_ports(self): """更新可用串口列表""" self.port_combo.clear() ports = serial.tools.list_ports.comports() for port in ports: self.port_combo.addItem(f"{port.device}", port.device) if not ports: self.port_combo.addItem("未找到串口", None) self.start_btn.setEnabled(False) else: self.start_btn.setEnabled(True) def start_lidar(self): """启动雷达扫描""" port = self.port_combo.currentData() if not port: self.status_bar.showMessage("错误: 没有可用的串口") return # 禁用按钮 self.start_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.port_combo.setEnabled(False) self.refresh_btn.setEnabled(False) # 清空数据和显示 self.point_cloud = [] if not self.accumulate_check.isChecked(): self.map_points = [] self.raw_data_display.clear() # 启动工作线程 self.lidar_worker = LidarWorker(port) self.lidar_worker.new_points.connect(self.add_new_points) self.lidar_worker.raw_data_signal.connect(self.display_raw_data) self.lidar_worker.start() self.status_label.setText("状态: 扫描中...") self.status_bar.showMessage(f"已连接到 {port},开始扫描...") def stop_lidar(self): """停止雷达扫描""" if self.lidar_worker: self.lidar_worker.stop() self.lidar_worker = None # 恢复按钮状态 self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.port_combo.setEnabled(True) self.refresh_btn.setEnabled(True) self.status_label.setText("状态: 已停止") self.status_bar.showMessage("扫描已停止") def clear_map(self): """清空地图数据""" self.map_points = [] self.update_map_display() def add_new_points(self, new_points): """添加新的点云数据""" if new_points: self.point_cloud = new_points # 更新当前帧数据 # 累积模式处理 if self.accumulate_check.isChecked(): self.map_points.extend(new_points) # 限制累积点数 if len(self.map_points) > 5000: self.map_points = self.map_points[-5000:] self.point_count_label.setText(f"点数: {len(self.point_cloud)}") # 计算FPS self.frame_count += 1 current_time = time.time() if current_time - self.last_update_time >= 1.0: self.current_fps = self.frame_count / (current_time - self.last_update_time) self.fps_label.setText(f"FPS: {self.current_fps:.1f}") self.last_update_time = current_time self.frame_count = 0 def display_raw_data(self, hex_data): """显示原始数据""" # 限制显示的行数,避免内存占用过高 max_lines = 50 current_text = self.raw_data_display.toPlainText() lines = current_text.split('\n') if len(lines) > max_lines: lines = lines[-max_lines:] # 添加新数据 lines.append(hex_data) self.raw_data_display.setPlainText('\n'.join(lines)) # 自动滚动到底部 scrollbar = self.raw_data_display.verticalScrollBar() scrollbar.setValue(scrollbar.maximum()) def update_map_display(self): """增强版地图显示""" if not hasattr(self, 'scatter'): self.scatter = pg.ScatterPlotItem(size=5) self.map_widget.addItem(self.scatter) self.map_widget.setXRange(-5, 5) self.map_widget.setYRange(-5, 5) display_points = self.map_points if self.accumulate_check.isChecked() else self.point_cloud if not display_points: self.scatter.setData([], []) return # 坐标转换(添加异常处理) try: angles = np.radians([p[0] for p in display_points]) distances = np.clip([p[1] for p in display_points], 0.1, 8.0) x = distances * np.cos(angles) y = distances * np.sin(angles) # 自动调整视野(保留10%边距) if len(x) > 10: # 至少有10个点才调整 x_margin = (max(x) - min(x)) * 0.1 y_margin = (max(y) - min(y)) * 0.1 self.map_widget.setXRange(min(x) - x_margin, max(x) + x_margin) self.map_widget.setYRange(min(y) - y_margin, max(y) + y_margin) # 动态颜色映射 norm_dist = (distances - 0.1) / (8.0 - 0.1) colors = [ (int(255 * (1 - v)), 0, int(255 * v), 180) for v in norm_dist ] self.scatter.setData(x=x, y=y, brush=colors, size=5) except Exception as e: print(f"地图渲染错误: {e}") def closeEvent(self, event): """窗口关闭事件""" self.stop_lidar() self.plot_timer.stop() super().closeEvent(event) if __name__ == "__main__": app = QApplication(sys.argv) # 设置全局样式 app.setStyle('Fusion') window = LidarApp() window.show() sys.exit(app.exec_()) 现在能显示点云,但这不是laser线条地图,我需要生成平面二维地图。请结合数据手册利用激光雷达进行路径规划,借助SLAM(Simultaneous Localization and Mapping)技术实现了地图的生成与更新,从而能够在未知环境中高效行进,帮我修改代码。直接给出完整代码。

import serial import time import struct import queue import threading from datetime import datetime CanOBDItemList = [[0,0,0,0,0]] CanPGNItemList = [[0,0,0,0]] Frame_start = b’\xFF’ Frame_end = b’\x55’ Frame_data_style_len = 6 Frame_Data_Len = 0 frame_buffer = bytearray() class CanInfShow_Item: def int(self,CanID,CanFramType,Len,CanDataInf): self.SystemCycle = datetime.strftime(“%Y-%m-%d %H:%M:%S.%f”)[:-3], self.CanID = CanID, self.CanFrame = CanFramType, self.CanDataLen = Len, self.CanData = CanDataInf class CanPGNShow_Item: def int(self, PGNID, CanID, CanData, Signal): self.PGNID = PGNID, self.CanID = CanID, self.CanData = CanData, self.Signal = Signal class SerialPort: def init(self, port, baudrate): # 初始化串口参数 self.port = port self.baudrate = baudrate self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE ) # 等待串口连接稳定 self.last_data_time = time.time() # 新增:最后接收数据的时间戳 self.cycle_dict = {} # 存储{帧ID: [上次时间戳, 当前周期]} self.last_frame_time = {} # 存储每个ID的最后出现时间 self.data_updated_ids = set() # 存储数据变化的CAN ID self.new_added_ids = set() # 存储新增的CAN ID self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.raw_data_queue = queue.Queue(maxsize=10000) self.parsed_data_queue = queue.Queue(maxsize=1000) self.data_lock = threading.Lock() self.worker_threads = [] time.sleep(0.2) if not self.ser.isOpen(): print("串口打开失败!") def close(self): # 关闭串口连接 if self.ser.isOpen(): self.ser.close() def send(self, data): # 发送数据 if self.ser.isOpen(): try: self.ser.write(data.encode('utf-8')) except serial.SerialException as e: print(f"发送数据失败: {e}") # def recv(self): # # 接收数据 # if self.ser.isOpen(): # data = self.ser.read(self.ser.inWaiting()) # if data: # 有数据时更新时间戳 # self.last_data_time = time.time() # return data def recv(self, chunk_size=1024): if self.ser.isOpen(): # 每次最多读取1024字节 data = self.ser.read(min(self.ser.inWaiting(), chunk_size)) if data: self.last_data_time = time.time() return data return None def __del__(self): self.close() def SerialIsOpen(self): if self.ser.isOpen(): return 1 else: return 0 def start_reading(self): self.recv_thread = threading.Thread(target=self._recv_worker, daemon=True) self.parse_thread = threading.Thread(target=self._parse_worker, daemon=True) self.recv_thread.start() self.parse_thread.start() self.worker_threads = [self.recv_thread, self.parse_thread] def _recv_worker(self): while self.ser.isOpen(): data = self.recv(chunk_size=1024) # 每次最多读1KB if data: self.raw_data_queue.put(data) else: time.sleep(0.001) def _parse_worker(self): while True: try: data = self.raw_data_queue.get(timeout=0.1) for byte in data: self.process_byte(byte) except queue.Empty: continue def process_byte(self, byte): """ 使用状态机逐字节解析帧结构。 """ if self.state == 0: # 等待帧头 FF if byte == 0xFF: self.current_frame.append(byte) self.state = 1 else: # 如果不是帧头,忽略该字节 pass elif self.state == 1: # 等待帧头 55 if byte == 0x55: self.current_frame.append(byte) self.state = 2 else: # 如果第二字节不是55,重置 self.reset_state() elif self.state == 2: # 接收总长度高位 (第2字节) self.current_frame.append(byte) self.state = 3 elif self.state == 3: # 接收总长度低位 (第3字节) self.current_frame.append(byte) # 计算总长度(从第2字节开始) length_high = self.current_frame[2] length_low = byte self.expected_length = (length_high << 8) | length_low self.state = 4 elif self.state == 4: # 接收类型字段 (第4字节) self.current_frame.append(byte) self.state = 5 elif self.state == 5: # 接收保留字段 (第5字节) self.current_frame.append(byte) self.state = 6 elif self.state == 6: # 接收 CAN 通道类型 (第6字节) self.current_frame.append(byte) self.state = 7 elif self.state == 7: # 接收 CAN 报文个数 N (第7字节) self.current_frame.append(byte) self.num_messages = byte self.state = 8 self.messages_received = 0 elif self.state == 8: # 接收 CAN 报文数据 (每条12字节) self.current_frame.append(byte) self.messages_received += 1 if self.messages_received >= self.num_messages * 12: # 所有报文接收完成,准备校验位 self.state = 9 elif self.state == 9: # 接收校验位 self.current_frame.append(byte) self.state = 10 elif self.state == 10: # 完整帧已接收,验证校验和 if self.verify_checksum(): # 校验成功,放入队列供主线程处理 self.parsed_data_queue.put(bytes(self.current_frame)) else: print("校验失败,丢弃当前帧") self.reset_state() def verify_checksum(self): """ 验证校验和:从第2字节到倒数第二个字节之和 & 0xFF """ data_to_check = self.current_frame[2:-1] # 从第2字节到最后一个校验位之前 checksum = sum(data_to_check) & 0xFF return checksum == self.current_frame[-1] def reset_state(self): """ 重置状态机 """ self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.messages_received = 0 def process_parsed_frames(self): """ 在主线程中处理解析完成的数据帧 """ while not self.serial.parsed_data_queue.empty(): try: frame = self.serial.parsed_data_queue.get_nowait() self.Frame_analoy_process(frame) except queue.Empty: break #报文解析 def Frame_analoy_process(self, Framedata): # 检查帧类型 (0x0C 0x98) if len(Framedata) < 8 or Framedata[4] != 0x0C or Framedata[5] != 0x98: return try: FrameNum = int(Framedata[7]) except IndexError: return # 检查是否有足够数据 if len(Framedata) < 12 * FrameNum + 8: return current_time = time.time() # 获取当前精确时间戳 for index in range(0,FrameNum): # 时间戳 Cantime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] try: id_bytes = [ Framedata[12 * index + 11], # LSB Framedata[12 * index + 10], Framedata[12 * index + 9], Framedata[12 * index + 8] # MSB ] except IndexError: continue # 格式化为8位大写十六进制 CanID = ''.join(format(b, '02X') for b in id_bytes) # 提取ID字节 CanFramType = "Cycle" Len = 8 CanDataSpace = '' PGNdata = '' PGNID = int(Framedata[12 * index + 9] ) + int(Framedata[12 * index + 10]* 0x100) PGNSignl = self.Frame_analoy_PGN_Signal(PGNID,Framedata[12 * index + 12:]) # 提取数据部分 for posindex in range(0,8): PGNdata += str(hex(Framedata[12 * index + 12 + posindex])[2:]) try: CanDataSpace = ' '.join( format(Framedata[12 * index + 12 + posindex], '02X') for posindex in range(8) ) except IndexError: continue CanItemData = [Cantime, CanID, CanFramType, Len, CanDataSpace] if PGNSignl != None: SignalItemData = [hex(PGNID)[2:], CanID, PGNdata, PGNSignl] if all(not sublist for sublist in CanPGNItemList) or CanPGNItemList[0][0] == 0: if len(CanPGNItemList): CanPGNItemList.pop(0) CanPGNItemList.insert(0, SignalItemData) else: Listpos = self.find_in_2d_list(CanPGNItemList, CanID) if None != Listpos: CanPGNItemList[Listpos[0]] = SignalItemData else: CanPGNItemList.append(SignalItemData) if CanID in self.last_frame_time: last_time = self.last_frame_time[CanID] cycle = (current_time - last_time) * 1000 # 转换为毫秒 # 平滑处理:使用加权平均减少抖动 if CanID in self.cycle_dict: old_cycle = self.cycle_dict[CanID] self.cycle_dict[CanID] = 0.7 * old_cycle + 0.3 * cycle else: self.cycle_dict[CanID] = cycle else: self.cycle_dict[CanID] = 0 # 首次出现,周期设为0 self.last_frame_time[CanID] = current_time # 更新列表 if not CanOBDItemList or CanOBDItemList[0][0] == 0: if CanOBDItemList and CanOBDItemList[0][0] == 0: CanOBDItemList.pop(0) CanOBDItemList.insert(0, CanItemData) self.new_added_ids.add(CanID) # 新增标志 else: Listpos = self.find_in_2d_list(CanOBDItemList, CanID) if Listpos is not None: CanOBDItemList[Listpos[0]] = CanItemData self.data_updated_ids.add(CanID) # 更新标志 else: CanOBDItemList.append(CanItemData) self.new_added_ids.add(CanID) # 新增标志 self.last_data_time = time.time() # 解析到有效帧时更新时间 def find_in_2d_list(self,matrix, target): for i, row in enumerate(matrix): if any(x == target for x in row): return (i, row.index(target)) # 使用row.index()找到具体列的索引 return None def Frame_analoy_PGN_Signal(self,PGNID, Framedata): #车速 SignalVal = 0 if PGNID == 0xFEF1: SignalVal = float(int(Framedata[2] * 0x100) + int(Framedata[3]) / 256.0) elif PGNID == 0xF004: SignalVal = str(int(Framedata[2] & 0x7F)) + '|'+ str(float(int(Framedata[4] * 0x100) + int(Framedata[3]) / 125.0)) elif PGNID == 0xFCE1: SignalVal = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) elif PGNID == 0xFEE5: SignalVal = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) elif PGNID == 0xFEE5: SignalVal = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) elif PGNID == 0xFEEE: SignalVal = int(Framedata[0] - 40) elif PGNID == 0xFE56: SignalVal = float(Framedata[0] * 0.4) elif PGNID == 0xFEF2: SignalVal = float(int(Framedata[1] * 0x100 + Framedata[0] * 0.05)) elif PGNID == 0xF005: SignalVal = Framedata[3] return SignalVal # def start_reading(self): # self.read_thread = threading.Thread(target=self.Com_read_frame, daemon=True) # self.read_thread.start() 怎么把process_parsed_frames合并到_parse_worker中

import binascii import serial import time import struct import queue import threading from datetime import datetime CanOBDItemList = [] CanPGNItemList = [[0,0,0,0]] filteredCanOBDItemList = [] Frame_start = b'\xFF' Frame_end = b'\x55' Frame_data_style_len = 6 Frame_Data_Len = 0 frame_buffer = bytearray() class CanInfShow_Item: def __int__(self,CanID,CanFramType,Len,CanDataInf): self.SystemCycle = datetime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], self.CanID = CanID, self.CanFrame = CanFramType, self.CanDataLen = Len, self.CanData = CanDataInf class CanPGNShow_Item: def __int__(self, PGNID, CanID, CanData, Signal): self.PGNID = PGNID, self.CanID = CanID, self.CanData = CanData, self.Signal = Signal class SerialPort: def __init__(self, port, baudrate): # 初始化串口参数 self.port = port self.baudrate = baudrate self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE ) # 等待串口连接稳定 self.last_data_time = time.time() # 新增:最后接收数据的时间戳 self.cycle_dict = {} # 存储{帧ID: [上次时间戳, 当前周期]} self.last_frame_time = {} # 存储每个ID的最后出现时间 self.data_updated_ids = set() # 存储数据变化的CAN ID self.new_added_ids = set() # 存储新增的CAN ID self.last_data_dict = {} # 存储每个CAN ID的上一次数据 self.changed_bytes_dict = {} # 存储每个CAN ID的变化字节索引 self.last_pgn_data_dict = {} # 存储每个PGN ID的上一次数据 self.changed_pgn_bytes_dict = {} # 存储每个PGN ID的变化字节索引 self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.raw_data_queue = queue.Queue(maxsize=10000) self.data_lock = threading.Lock() self.worker_threads = [] self.allowed_pgn_ids = {0xFEF1, 0xF004, 0xFEC1, 0xFEE5, 0xFEEE, 0xFE56, 0xFEF2, 0xF005} self.filter_cycles = [] # 添加这一行用于存储过滤周期 #优化二维查找方式的映射字典 self.can_obd_index_map = {} self.filtered_index_map = {} self.can_pgn_index_map = {} time.sleep(0.2) if not self.ser.isOpen(): print("串口打开失败!") def close(self): # 关闭串口连接 if self.ser.isOpen(): self.ser.close() def send(self, data): # 发送数据 if self.ser.isOpen(): try: self.ser.write(data.encode('utf-8')) except serial.SerialException as e: print(f"发送数据失败: {e}") # def recv(self): # # 接收数据 # if self.ser.isOpen(): # data = self.ser.read(self.ser.inWaiting()) # if data: # 有数据时更新时间戳 # self.last_data_time = time.time() # return data def recv(self, chunk_size=1024): if self.ser.isOpen(): # 每次最多读取1024字节 data = self.ser.read(min(self.ser.inWaiting(), chunk_size)) if data: self.last_data_time = time.time() return data return None def __del__(self): self.close() def SerialIsOpen(self): if self.ser.isOpen(): return 1 else: return 0 def start_reading(self): self.recv_thread = threading.Thread(target=self._recv_worker, daemon=True) self.parse_thread = threading.Thread(target=self._parse_worker, daemon=True) self.recv_thread.start() self.parse_thread.start() self.worker_threads = [self.recv_thread, self.parse_thread] def _recv_worker(self): while self.ser.isOpen(): data = self.recv(chunk_size=4096) # 每次最多读4KB if data: self.raw_data_queue.put(data) # else: # time.sleep(0.001) def _parse_worker(self): while True: try: data = self.raw_data_queue.get(timeout=0.1) for byte in data: self.process_byte(byte) except queue.Empty: continue def process_byte(self, byte): """ 使用状态机逐字节解析帧结构。 """ if self.state == 0: # 等待帧头 FF if byte == 0xFF: self.current_frame.append(byte) self.state = 1 else: # 如果不是帧头,忽略该字节 pass elif self.state == 1: # 等待帧头 55 if byte == 0x55: self.current_frame.append(byte) self.state = 2 else: # 如果第二字节不是55,重置 self.reset_state() elif self.state == 2: # 接收总长度低位 (第2字节) self.current_frame.append(byte) self.state = 3 elif self.state == 3: # 接收总长度高位 (第3字节) self.current_frame.append(byte) # 计算总长度(从第2字节开始) length_high = self.current_frame[2] length_low = byte self.expected_length = (length_high << 8) | length_low self.state = 4 elif self.state == 4: # 接收类型字段 (第4字节) self.current_frame.append(byte) self.state = 5 elif self.state == 5: # 接收保留字段 (第5字节) self.current_frame.append(byte) self.state = 6 elif self.state == 6: # 接收 CAN 通道类型 (第6字节) self.current_frame.append(byte) self.state = 7 elif self.state == 7: # 接收 CAN 报文个数 N (第7字节) self.current_frame.append(byte) self.num_messages = byte self.state = 8 self.messages_received = 0 elif self.state == 8: #接收can报文 self.current_frame.append(byte) self.messages_received += 1 if self.messages_received == self.num_messages * 12: self.state = 9 elif self.state == 9: # 接收校验位 self.current_frame.append(byte) self.state = 10 elif self.state == 10: # 完整帧已接收,验证校验和 if self.verify_checksum(): self.Frame_analoy_process(bytes(self.current_frame)) else: print("校验失败,丢弃当前帧") self.reset_state() def verify_checksum(self): """ 验证校验和:从第2字节到倒数第二个字节之和 & 0xFF """ data_to_check = self.current_frame[2:-1] # 从第2字节到最后一个校验位之前 checksum = sum(data_to_check) & 0xFF return checksum == self.current_frame[-1] def reset_state(self): """ 重置状态机 """ self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.messages_received = 0 def set_filter_cycles(self, cycles): self.filter_cycles = cycles.copy() if cycles else [] #报文解析 def Frame_analoy_process(self, Framedata): # 检查帧类型 (0x0C 0x98) if len(Framedata) < 8 or Framedata[4] != 0x0C or Framedata[5] != 0x98: return try: FrameNum = int(Framedata[7]) except IndexError: return # 检查是否有足够数据 if len(Framedata) < 12 * FrameNum + 8: return current_time = time.time() # 获取当前精确时间戳 for index in range(0,FrameNum): # 时间戳 Cantime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] try: id_bytes = [ Framedata[12 * index + 11], # LSB Framedata[12 * index + 10], Framedata[12 * index + 9], Framedata[12 * index + 8] # MSB ] except IndexError: continue # 格式化为8位大写十六进制 CanID = ''.join(format(b, '02X') for b in id_bytes) # 提取ID字节 CanFramType = "Cycle" Len = 8 CanDataSpace = '' PGNdata = '' PGNID = int(Framedata[12 * index + 9] ) + int(Framedata[12 * index + 10]* 0x100) PGNSignl = self.Frame_analoy_PGN_Signal(PGNID,Framedata[12 * index + 12:]) # 提取数据部分 PGNdata = ''.join( format(Framedata[12 * index + 12 + posindex], '02X') for posindex in range(8) ).upper() try: CanDataSpace = ' '.join( format(Framedata[12 * index + 12 + posindex], '02X') for posindex in range(8) ) except IndexError: continue current_data = CanDataSpace.split() if CanID in self.last_data_dict: last_data = self.last_data_dict[CanID].split() changed_indices = [] for i in range(min(len(last_data), len(current_data))): if last_data[i] != current_data[i]: changed_indices.append(i) self.changed_bytes_dict[CanID] = changed_indices else: self.changed_bytes_dict[CanID] = [] # 新ID无变化 self.last_data_dict[CanID] = CanDataSpace CanItemData = [Cantime, CanID, CanFramType, Len, CanDataSpace] # print(CanDataSpace) # ✅ 只有在白名单内的PGNID才处理PGN信号 # 获取当前PGN数据 current_pgn_data = PGNdata # 使用上面生成的两位格式数据 # 检查数据变化 if PGNID in self.last_pgn_data_dict: last_data = self.last_pgn_data_dict[PGNID] changed_indices = [] for i in range(min(len(last_data), len(current_pgn_data) // 2)): start_idx = i * 2 end_idx = start_idx + 2 if last_data[start_idx:end_idx] != current_pgn_data[start_idx:end_idx]: changed_indices.append(i) self.changed_pgn_bytes_dict[PGNID] = changed_indices else: self.changed_pgn_bytes_dict[PGNID] = [] # 新PGN ID无变化 self.last_pgn_data_dict[PGNID] = current_pgn_data if PGNID in self.allowed_pgn_ids: PGNSignl = self.Frame_analoy_PGN_Signal(PGNID, Framedata[12 * index + 12:]) SignalItemData = [hex(PGNID)[2:].upper(), CanID, PGNdata, PGNSignl] if all(not sublist for sublist in CanPGNItemList) or CanPGNItemList[0][0] == 0: if CanPGNItemList: removed_id = CanPGNItemList[0][1] self.remove_from_index_maps("CanPGNItemList", removed_id) CanPGNItemList.pop(0) if PGNID in self.allowed_pgn_ids: PGNSignl = self.Frame_analoy_PGN_Signal(PGNID, Framedata[12 * index + 12:]) SignalItemData = [hex(PGNID)[2:].upper(), CanID, PGNdata, PGNSignl] # 使用快速查找 index_in_list = self.find_index("CanPGNItemList", CanID) if index_in_list is not None: CanPGNItemList[index_in_list] = SignalItemData else: CanPGNItemList.append(SignalItemData) new_index = len(CanPGNItemList) - 1 self.update_index_maps("CanPGNItemList", new_index, CanID) if CanID in self.last_frame_time: last_time = self.last_frame_time[CanID] cycle = (current_time - last_time) * 1000 # 转换为毫秒 # if CanID == "18F00F52" : # print(cycle) if cycle > 10: if cycle<self.cycle_dict[CanID] : self.cycle_dict[CanID] = cycle elif self.cycle_dict[CanID] == 0 : self.cycle_dict[CanID] = cycle else: self.cycle_dict[CanID] = 0 # 首次出现,周期设为0 # if "18F00F52" in self.cycle_dict: # print(self.cycle_dict["18F00F52"]) self.last_frame_time[CanID] = current_time # 更新列表 filtered_cycles = getattr(self, 'filter_cycles', []) is_filtered = False if filtered_cycles: for filter_cycle in filtered_cycles: tolerance = filter_cycle * 0.15 # 允许±15%的容差 if abs(self.cycle_dict[CanID] - filter_cycle) <= tolerance: is_filtered = True break index_in_obd = self.find_index("CanOBDItemList", CanID) index_in_filtered = self.find_index("filteredCanOBDItemList", CanID) # 根据过滤状态更新相应的列表 if is_filtered: if index_in_filtered is not None: filteredCanOBDItemList[index_in_filtered] = CanItemData self.data_updated_ids.add(CanID) else: # 如果存在于主列表,先移除 if index_in_obd is not None: self.remove_from_index_maps("CanOBDItemList", CanID) CanOBDItemList.pop(index_in_obd) self.data_updated_ids.add(CanID) filteredCanOBDItemList.append(CanItemData) new_index = len(filteredCanOBDItemList) - 1 self.update_index_maps("filteredCanOBDItemList", new_index, CanID) self.new_added_ids.add(CanID) else: index_in_list = self.find_index("CanOBDItemList", CanID) if index_in_obd is not None: CanOBDItemList[index_in_obd] = CanItemData self.data_updated_ids.add(CanID) else: # 如果存在于过滤列表,先移除 if index_in_filtered is not None: self.remove_from_index_maps("filteredCanOBDItemList", CanID) filteredCanOBDItemList.pop(index_in_filtered) CanOBDItemList.append(CanItemData) new_index = len(CanOBDItemList) - 1 self.update_index_maps("CanOBDItemList", new_index, CanID) self.new_added_ids.add(CanID) if CanID == "18F00F52": print(FrameNum) def find_index(self, list_name, can_id): """快速查找索引的O(1)方法""" if list_name == "CanOBDItemList": return self.can_obd_index_map.get(can_id) elif list_name == "filteredCanOBDItemList": return self.filtered_index_map.get(can_id) elif list_name == "CanPGNItemList": return self.can_pgn_index_map.get(can_id) return None def update_index_maps(self, list_name, index, can_id): """更新索引映射""" if list_name == "CanOBDItemList": self.can_obd_index_map[can_id] = index elif list_name == "filteredCanOBDItemList": self.filtered_index_map[can_id] = index elif list_name == "CanPGNItemList": self.can_pgn_index_map[can_id] = index def remove_from_index_maps(self, list_name, can_id): """从索引映射中移除条目""" if list_name == "CanOBDItemList": self.can_obd_index_map.pop(can_id, None) elif list_name == "filteredCanOBDItemList": self.filtered_index_map.pop(can_id, None) elif list_name == "CanPGNItemList": self.can_pgn_index_map.pop(can_id, None) def Frame_analoy_PGN_Signal(self, PGNID, Framedata): # 确保数据是整数列表(0-255) if not all(isinstance(x, int) for x in Framedata): Framedata = [int(x) for x in Framedata] # 根据J1939规范解析 if PGNID == 0xFEF1: # 车速 (CCVS1) # 位置2-3 (索引1-2),大端序,单位1/256 km/h raw_val = (Framedata[1] << 8) | Framedata[2] return raw_val / 256.0 elif PGNID == 0xFE6C: # 车速 (TCO1) - 新增 # 位置7-8 (索引6-7),大端序,单位1/256 km/h raw_val = (Framedata[6] << 8) | Framedata[7] return raw_val / 256.0 elif PGNID == 0xF004: # 发动机转速+负载 # 负载:位置3 (索引2),单位1% engine_load = Framedata[2] & 0x7F # 取7位 # 转速:位置4-5 (索引3-4),大端序,单位0.125 RPM raw_rpm = (Framedata[3] << 8) | Framedata[4] rpm = raw_rpm * 0.125 return f'{engine_load}|{rpm}' elif PGNID == 0xFEC1: # 里程表 (VDHR) # 位置1-4 (索引0-3),大端序,单位0.125米 raw_val = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) return raw_val * 0.125 # 转换为米 elif PGNID == 0xFEE5: # 发动机小时数 # 位置1-4 (索引0-3),大端序,单位0.05小时 raw_val = (Framedata[0] << 24) | (Framedata[1] << 16) | (Framedata[2] << 8) | Framedata[3] return raw_val * 0.05 elif PGNID == 0xFEF2: # 平均油耗 # 位置1-2 (索引0-1),大端序,单位0.05 L/h raw_val = (Framedata[0] << 8) | Framedata[1] return raw_val * 0.05 elif PGNID == 0xFEEE: # 冷却液温度 # 位置1 (索引0),单位1°C,偏移-40 return Framedata[0] - 40 elif PGNID == 0xFE56: # 燃油液位 # 位置1 (索引0),单位0.4% return Framedata[0] * 0.4 elif PGNID == 0xF005: # 档位 # 位置4 (索引3),直接返回值 return Framedata[3] return None # def start_reading(self): # self.read_thread = threading.Thread(target=self.Com_read_frame, daemon=True) # self.read_thread.start() 现在这段代码运算CANID的周期的逻辑有一些问题,因为程序是按照包来发送,一个包当中有多个报文,程序中却是根据每个报文的处理间隔来计算,应该怎么修改代码?

import binascii import serial import time import struct import queue import threading from datetime import datetime CanOBDItemList = [] CanPGNItemList = [[0,0,0,0]] filteredCanOBDItemList = [] Frame_start = b’\xFF’ Frame_end = b’\x55’ Frame_data_style_len = 6 Frame_Data_Len = 0 frame_buffer = bytearray() class CanInfShow_Item: def int(self,CanID,CanFramType,Len,CanDataInf): self.SystemCycle = datetime.strftime(“%Y-%m-%d %H:%M:%S.%f”)[:-3], self.CanID = CanID, self.CanFrame = CanFramType, self.CanDataLen = Len, self.CanData = CanDataInf class CanPGNShow_Item: def int(self, PGNID, CanID, CanData, Signal): self.PGNID = PGNID, self.CanID = CanID, self.CanData = CanData, self.Signal = Signal class SerialPort: def init(self, port, baudrate): 初始化串口参数 self.port = port self.baudrate = baudrate self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE ) 等待串口连接稳定 self.last_data_time = time.time() # 新增:最后接收数据的时间戳 self.cycle_dict = {} # 存储{帧ID: [上次时间戳, 当前周期]} self.last_frame_time = {} # 存储每个ID的最后出现时间 self.data_updated_ids = set() # 存储数据变化的CAN ID self.new_added_ids = set() # 存储新增的CAN ID self.last_data_dict = {} # 存储每个CAN ID的上一次数据 self.changed_bytes_dict = {} # 存储每个CAN ID的变化字节索引 self.last_pgn_data_dict = {} # 存储每个PGN ID的上一次数据 self.changed_pgn_bytes_dict = {} # 存储每个PGN ID的变化字节索引 self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.raw_data_queue = queue.Queue(maxsize=10000) self.data_lock = threading.Lock() self.worker_threads = [] self.allowed_pgn_ids = {0xFEF1, 0xF004, 0xFEC1, 0xFEE5, 0xFEEE, 0xFE56, 0xFEF2, 0xF005} self.filter_cycles = [] # 添加这一行用于存储过滤周期 #优化二维查找方式的映射字典 self.can_obd_index_map = {} self.filtered_index_map = {} self.can_pgn_index_map = {} time.sleep(0.2) if not self.ser.isOpen(): print(“串口打开失败!”) def close(self): # 关闭串口连接 if self.ser.isOpen(): self.ser.close() def send(self, data): # 发送数据 if self.ser.isOpen(): try: self.ser.write(data.encode(‘utf-8’)) except serial.SerialException as e: print(f"发送数据失败: {e}“) # def recv(self): # # 接收数据 # if self.ser.isOpen(): # data = self.ser.read(self.ser.inWaiting()) # if data: # 有数据时更新时间戳 # self.last_data_time = time.time() # return data def recv(self, chunk_size=1024): if self.ser.isOpen(): # 每次最多读取1024字节 data = self.ser.read(min(self.ser.inWaiting(), chunk_size)) if data: self.last_data_time = time.time() return data return None def del(self): self.close() def SerialIsOpen(self): if self.ser.isOpen(): return 1 else: return 0 def start_reading(self): self.recv_thread = threading.Thread(target=self._recv_worker, daemon=True) self.parse_thread = threading.Thread(target=self._parse_worker, daemon=True) self.recv_thread.start() self.parse_thread.start() self.worker_threads = [self.recv_thread, self.parse_thread] def _recv_worker(self): while self.ser.isOpen(): data = self.recv(chunk_size=4096) # 每次最多读4KB if data: self.raw_data_queue.put(data) # else: # time.sleep(0.001) def _parse_worker(self): while True: try: data = self.raw_data_queue.get(timeout=0.1) for byte in data: self.process_byte(byte) except queue.Empty: continue def process_byte(self, byte): “”” 使用状态机逐字节解析帧结构。 “”" if self.state == 0: # 等待帧头 FF if byte == 0xFF: self.current_frame.append(byte) self.state = 1 else: # 如果不是帧头,忽略该字节 pass elif self.state == 1: # 等待帧头 55 if byte == 0x55: self.current_frame.append(byte) self.state = 2 else: # 如果第二字节不是55,重置 self.reset_state() elif self.state == 2: # 接收总长度低位 (第2字节) self.current_frame.append(byte) self.state = 3 elif self.state == 3: # 接收总长度高位 (第3字节) self.current_frame.append(byte) # 计算总长度(从第2字节开始) length_high = self.current_frame[2] length_low = byte self.expected_length = (length_high << 8) | length_low self.state = 4 elif self.state == 4: # 接收类型字段 (第4字节) self.current_frame.append(byte) self.state = 5 elif self.state == 5: # 接收保留字段 (第5字节) self.current_frame.append(byte) self.state = 6 elif self.state == 6: # 接收 CAN 通道类型 (第6字节) self.current_frame.append(byte) self.state = 7 elif self.state == 7: # 接收 CAN 报文个数 N (第7字节) self.current_frame.append(byte) self.num_messages = byte self.state = 8 self.messages_received = 0 elif self.state == 8: #接收can报文 self.current_frame.append(byte) self.messages_received += 1 if self.messages_received == self.num_messages * 12: self.state = 9 elif self.state == 9: # 接收校验位 self.current_frame.append(byte) self.state = 10 elif self.state == 10: # 完整帧已接收,验证校验和 if self.verify_checksum(): self.Frame_analoy_process(bytes(self.current_frame)) else: print(“校验失败,丢弃当前帧”) self.reset_state() def verify_checksum(self): “”" 验证校验和:从第2字节到倒数第二个字节之和 & 0xFF “”" data_to_check = self.current_frame[2:-1] # 从第2字节到最后一个校验位之前 checksum = sum(data_to_check) & 0xFF return checksum == self.current_frame[-1] def reset_state(self): “”" 重置状态机 “”" self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.messages_received = 0 def set_filter_cycles(self, cycles): self.filter_cycles = cycles.copy() if cycles else [] #报文解析 def Frame_analoy_process(self, Framedata): # 检查帧类型 (0x0C 0x98) if len(Framedata) < 8 or Framedata[4] != 0x0C or Framedata[5] != 0x98: return try: FrameNum = int(Framedata[7]) except IndexError: return # 检查是否有足够数据 if len(Framedata) < 12 * FrameNum + 8: return current_time = time.time() # 获取当前精确时间戳 for index in range(0,FrameNum): # 时间戳 Cantime = datetime.now().strftime(“%Y-%m-%d %H:%M:%S.%f”)[:-3] try: id_bytes = [ Framedata[12 * index + 11], # LSB Framedata[12 * index + 10], Framedata[12 * index + 9], Framedata[12 * index + 8] # MSB ] except IndexError: continue # 格式化为8位大写十六进制 CanID = ‘’.join(format(b, ‘02X’) for b in id_bytes) # 提取ID字节 CanFramType = “Cycle” Len = 8 CanDataSpace = ‘’ PGNdata = ‘’ PGNID = int(Framedata[12 * index + 9] ) + int(Framedata[12 * index + 10]* 0x100) PGNSignl = self.Frame_analoy_PGN_Signal(PGNID,Framedata[12 * index + 12:]) # 提取数据部分 PGNdata = ‘’.join( format(Framedata[12 * index + 12 + posindex], ‘02X’) for posindex in range(8) ).upper() try: CanDataSpace = ’ ‘.join( format(Framedata[12 * index + 12 + posindex], ‘02X’) for posindex in range(8) ) except IndexError: continue current_data = CanDataSpace.split() if CanID in self.last_data_dict: last_data = self.last_data_dict[CanID].split() changed_indices = [] for i in range(min(len(last_data), len(current_data))): if last_data[i] != current_data[i]: changed_indices.append(i) self.changed_bytes_dict[CanID] = changed_indices else: self.changed_bytes_dict[CanID] = [] # 新ID无变化 self.last_data_dict[CanID] = CanDataSpace CanItemData = [Cantime, CanID, CanFramType, Len, CanDataSpace] # print(CanDataSpace) # ✅ 只有在白名单内的PGNID才处理PGN信号 # 获取当前PGN数据 current_pgn_data = PGNdata # 使用上面生成的两位格式数据 # 检查数据变化 if PGNID in self.last_pgn_data_dict: last_data = self.last_pgn_data_dict[PGNID] changed_indices = [] for i in range(min(len(last_data), len(current_pgn_data) // 2)): start_idx = i * 2 end_idx = start_idx + 2 if last_data[start_idx:end_idx] != current_pgn_data[start_idx:end_idx]: changed_indices.append(i) self.changed_pgn_bytes_dict[PGNID] = changed_indices else: self.changed_pgn_bytes_dict[PGNID] = [] # 新PGN ID无变化 self.last_pgn_data_dict[PGNID] = current_pgn_data if PGNID in self.allowed_pgn_ids: PGNSignl = self.Frame_analoy_PGN_Signal(PGNID, Framedata[12 * index + 12:]) SignalItemData = [hex(PGNID)[2:].upper(), CanID, PGNdata, PGNSignl] if all(not sublist for sublist in CanPGNItemList) or CanPGNItemList[0][0] == 0: if CanPGNItemList: removed_id = CanPGNItemList[0][1] self.remove_from_index_maps(“CanPGNItemList”, removed_id) CanPGNItemList.pop(0) if PGNID in self.allowed_pgn_ids: PGNSignl = self.Frame_analoy_PGN_Signal(PGNID, Framedata[12 * index + 12:]) SignalItemData = [hex(PGNID)[2:].upper(), CanID, PGNdata, PGNSignl] # 使用快速查找 index_in_list = self.find_index(“CanPGNItemList”, CanID) if index_in_list is not None: CanPGNItemList[index_in_list] = SignalItemData else: CanPGNItemList.append(SignalItemData) new_index = len(CanPGNItemList) - 1 self.update_index_maps(“CanPGNItemList”, new_index, CanID) if CanID in self.last_frame_time: last_time = self.last_frame_time[CanID] cycle = (current_time - last_time) * 1000 # 转换为毫秒 # if CanID == “18F00F52” : # print(cycle) if cycle > 10: if cycle<self.cycle_dict[CanID] : self.cycle_dict[CanID] = cycle elif self.cycle_dict[CanID] == 0 : self.cycle_dict[CanID] = cycle else: self.cycle_dict[CanID] = 0 # 首次出现,周期设为0 # if “18F00F52” in self.cycle_dict: # print(self.cycle_dict[“18F00F52”]) self.last_frame_time[CanID] = current_time # 更新列表 filtered_cycles = getattr(self, ‘filter_cycles’, []) is_filtered = False if filtered_cycles: for filter_cycle in filtered_cycles: tolerance = filter_cycle * 0.15 # 允许±15%的容差 if abs(self.cycle_dict[CanID] - filter_cycle) <= tolerance: is_filtered = True break index_in_obd = self.find_index(“CanOBDItemList”, CanID) index_in_filtered = self.find_index(“filteredCanOBDItemList”, CanID) # 根据过滤状态更新相应的列表 if is_filtered: if index_in_filtered is not None: filteredCanOBDItemList[index_in_filtered] = CanItemData self.data_updated_ids.add(CanID) else: # 如果存在于主列表,先移除 if index_in_obd is not None: self.remove_from_index_maps(“CanOBDItemList”, CanID) CanOBDItemList.pop(index_in_obd) self.data_updated_ids.add(CanID) filteredCanOBDItemList.append(CanItemData) new_index = len(filteredCanOBDItemList) - 1 self.update_index_maps(“filteredCanOBDItemList”, new_index, CanID) self.new_added_ids.add(CanID) else: index_in_list = self.find_index(“CanOBDItemList”, CanID) if index_in_obd is not None: CanOBDItemList[index_in_obd] = CanItemData self.data_updated_ids.add(CanID) else: # 如果存在于过滤列表,先移除 if index_in_filtered is not None: self.remove_from_index_maps(“filteredCanOBDItemList”, CanID) filteredCanOBDItemList.pop(index_in_filtered) CanOBDItemList.append(CanItemData) new_index = len(CanOBDItemList) - 1 self.update_index_maps(“CanOBDItemList”, new_index, CanID) self.new_added_ids.add(CanID) if CanID == “18F00F52”: print(FrameNum) def find_index(self, list_name, can_id): “”“快速查找索引的O(1)方法”“” if list_name == “CanOBDItemList”: return self.can_obd_index_map.get(can_id) elif list_name == “filteredCanOBDItemList”: return self.filtered_index_map.get(can_id) elif list_name == “CanPGNItemList”: return self.can_pgn_index_map.get(can_id) return None def update_index_maps(self, list_name, index, can_id): “”“更新索引映射”“” if list_name == “CanOBDItemList”: self.can_obd_index_map[can_id] = index elif list_name == “filteredCanOBDItemList”: self.filtered_index_map[can_id] = index elif list_name == “CanPGNItemList”: self.can_pgn_index_map[can_id] = index def remove_from_index_maps(self, list_name, can_id): “”“从索引映射中移除条目”“” if list_name == “CanOBDItemList”: self.can_obd_index_map.pop(can_id, None) elif list_name == “filteredCanOBDItemList”: self.filtered_index_map.pop(can_id, None) elif list_name == “CanPGNItemList”: self.can_pgn_index_map.pop(can_id, None) def Frame_analoy_PGN_Signal(self, PGNID, Framedata): # 确保数据是整数列表(0-255) if not all(isinstance(x, int) for x in Framedata): Framedata = [int(x) for x in Framedata] # 根据J1939规范解析 if PGNID == 0xFEF1: # 车速 (CCVS1) # 位置2-3 (索引1-2),大端序,单位1/256 km/h raw_val = (Framedata[1] << 8) | Framedata[2] return raw_val / 256.0 elif PGNID == 0xFE6C: # 车速 (TCO1) - 新增 # 位置7-8 (索引6-7),大端序,单位1/256 km/h raw_val = (Framedata[6] << 8) | Framedata[7] return raw_val / 256.0 elif PGNID == 0xF004: # 发动机转速+负载 # 负载:位置3 (索引2),单位1% engine_load = Framedata[2] & 0x7F # 取7位 # 转速:位置4-5 (索引3-4),大端序,单位0.125 RPM raw_rpm = (Framedata[3] << 8) | Framedata[4] rpm = raw_rpm * 0.125 return f’{engine_load}|{rpm}’ elif PGNID == 0xFEC1: # 里程表 (VDHR) # 位置1-4 (索引0-3),大端序,单位0.125米 raw_val = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) return raw_val * 0.125 # 转换为米 elif PGNID == 0xFEE5: # 发动机小时数 # 位置1-4 (索引0-3),大端序,单位0.05小时 raw_val = (Framedata[0] << 24) | (Framedata[1] << 16) | (Framedata[2] << 8) | Framedata[3] return raw_val * 0.05 elif PGNID == 0xFEF2: # 平均油耗 # 位置1-2 (索引0-1),大端序,单位0.05 L/h raw_val = (Framedata[0] << 8) | Framedata[1] return raw_val * 0.05 elif PGNID == 0xFEEE: # 冷却液温度 # 位置1 (索引0),单位1°C,偏移-40 return Framedata[0] - 40 elif PGNID == 0xFE56: # 燃油液位 # 位置1 (索引0),单位0.4% return Framedata[0] * 0.4 elif PGNID == 0xF005: # 档位 # 位置4 (索引3),直接返回值 return Framedata[3] return None # def start_reading(self): # self.read_thread = threading.Thread(target=self.Com_read_frame, daemon=True) # self.read_thread.start() 现在这段代码运算CANID的周期的逻辑有一些问题,因为程序是按照帧来发送,一个帧当中只会有一种canid。一个帧当中有多个报文,程序中却是根据每个报文的处理间隔来计算。每个帧发送的报文数量不一样,需要根据报文数量来计算周期,比如第一次收到4个,第二次收到5个,周期就应该等于第二次收到第一个报文的时间减去第一次收到第一个报文的时间除以4。应该怎么修改代码?

最新推荐

recommend-type

前端开发基于jQuery的选择器与DOM操作技术:网页元素精准定位及动态交互功能实现

内容概要:本文系统介绍了jQuery的基础知识,涵盖其概念、优势、开发环境搭建、核心语法与选择器、DOM遍历与操作方法,以及事件处理机制。文章强调jQuery作为轻量级JavaScript库在简化DOM操作、跨浏览器兼容性及提升开发效率方面的突出作用,并通过大量代码示例详细讲解了选择器(如标签、类、ID、属性、自定义及表单选择器)、DOM遍历方法(如filter、next、siblings等)、元素访问方式(.get()和索引访问)以及事件绑定与委托(如on、off、hover、ready等),帮助读者掌握jQuery的核心使用技巧。; 适合人群:具备HTML、CSS和JavaScript基础,初入前端领域的开发者或希望巩固jQuery基础的1-3年经验研发人员。; 使用场景及目标:①快速实现DOM元素选取与操作,提升页面交互开发效率;②理解jQuery事件机制与DOM遍历逻辑,用于传统项目维护或兼容性开发;③为学习现代前端框架前打下扎实的JavaScript操作基础。; 阅读建议:建议结合文中示例动手实践,重点理解选择器的使用场景与事件委托机制,注意区分jQuery对象与原生DOM对象的操作差异,并在实际项目中逐步应用所学内容以加深理解。
recommend-type

DeepSeek类AI在图书馆智慧服务中的应用与技术考量.docx

DeepSeek类AI在图书馆智慧服务中的应用与技术考量.docx
recommend-type

jsfuck自动解码工具.zip

jsfuck自动解码工具
recommend-type

COMSOL仿真模型展现手性结构

利用COMSOL软件进行手性结构电磁仿真的具体步骤和技术细节。主要内容涵盖如何构建手性结构的通用模型,以及如何精确计算不同极化状态(如左旋圆极化、右旋圆极化)下的透射率(Tlr, Trl, Trr, Tll)和反射率(Rlr, Rrl, Rrr, Rll)分量。文中特别强调了极化分量计算的关键点,包括正确应用MATLAB脚本进行电场分量的矢量合成与投影运算,确保极化方向的一致性和准确性。此外,还提供了能量守恒检查的方法,用于验证模型的可靠性。 适合人群:从事电磁仿真、光学器件设计的研究人员和工程师,尤其是那些需要深入了解手性材料电磁特性的专业人士。 使用场景及目标:适用于希望掌握COMSOL多物理场仿真工具在手性结构电磁特性研究中的应用,旨在提高仿真精度和效率,避免常见错误,确保仿真结果的可靠性和一致性。 其他说明:文中提供的MATLAB代码片段有助于读者更好地理解和实施具体的计算过程,同时提醒了一些常见的陷阱和注意事项,帮助用户规避潜在的问题。
recommend-type

VMD-MFRFNN_VMD-MFRFNN.zip

VMD-MFRFNN_VMD-MFRFNN.zip
recommend-type

Info2007v1.0更新至v2.0:优化管理与前台功能

根据提供的文件信息,可以挖掘出以下知识点: ### 标题知识点: 1. **免费时代WEB程序INFO2007 V1.0:** - 该标题表明存在一个名为INFO2007的WEB程序版本1.0,该版本是在免费时代推出的,可能意味着该程序是开源的或者提供免费下载。 ### 描述知识点: 1. **软件缺陷说明:** - 开发者提到程序存在BUG(程序缺陷),并提供了一个更新和反馈的渠道,说明软件仍在开发中,且有后续版本计划。 2. **联系方式:** - 开发者提供了QQ和邮箱作为联系方式,用于反馈问题或询问更新情况。 3. **Info2007v2.0更新内容:** - 提及了升级后的版本INFO2007v2.0新增功能,包括数据库结构变化(添加会员和公告表)、后台管理功能的增加与优化、前台功能的增加与优化等。 4. **安装要求:** - 软件需要特定的服务器环境支持,比如FSO(文件系统对象)、数据采集功能和JMAIL(邮件发送组件)。 5. **配置与安装细节:** - 对config.asp下的目录配置和pageurlsa变量做了说明,这些通常涉及程序的运行环境和安全设置。 6. **默认登录信息:** - 提供了默认的管理员用户名和密码,以及后台管理的默认目录,这对于安装和测试程序很重要。 7. **使用前的必要步骤:** - 强调了解压后生成静态页面的重要性,这可能是确保网站内容可被正确浏览的前置操作。 ### 标签知识点: 1. **ASP源码其他类别:** - 这表明该程序使用ASP(Active Server Pages)作为后端编程语言,并且归类于其他类别,可能意味着它不局限于某一特定功能或领域。 ### 压缩包文件名称列表知识点: 1. **www.codejia.com:** - 这个文件名可能指示了程序被托管或下载的来源网站,也暗示了可能含有与网站域名相关的程序文件。 ### 综合知识点: 1. **软件开发与维护:** - 从描述中可以看出开发者在推动软件的持续改进,并鼓励用户参与软件的测试和反馈过程。 2. **软件环境配置:** - 软件对运行环境有所要求,特别是服务器端的支持,需要了解FSO、数据采集、JMAIL等组件的使用和配置。 3. **后台管理系统:** - 更新内容中提及的后台管理功能,如会员管理、公告管理、文章管理等,显示了该程序提供了一套用于网站内容和用户管理的后台解决方案。 4. **前台展示优化:** - 对前台页面的优化和增加功能,如会员注册、文章页、下载页和分类栏目的改进,说明了对用户体验的重视。 5. **安全与权限控制:** - 默认用户名和密码的提供,以及后台目录的默认设置,强调了安装过程中应立即更改编译以提高安全性。 6. **静态页面生成:** - 生成静态页面作为必要步骤可能涉及到网站的性能优化和安全措施。 7. **开源与社区支持:** - 由于提及了更新的可能和用户反馈渠道,这表明软件具有一定的开源特性或至少鼓励社区参与。 综上所述,这些知识点涵盖了软件开发的常见方面,包括软件生命周期的维护、功能更新、环境配置、安全实践以及优化用户体验。了解和掌握这些知识点可以帮助开发者和用户更好地利用和改进免费时代WEB程序INFO2007 V1.0。
recommend-type

Rust测试实战:错误处理、环境变量与模拟服务器

### Rust 测试实战:错误处理、环境变量与模拟服务器 在 Rust 开发中,测试是确保代码质量和稳定性的重要环节。本文将深入探讨 Rust 中的测试技巧,包括错误处理、使用环境变量测试 Config 模块以及使用模拟服务器测试 profanity 模块。 #### 1. 错误处理与比较 在 Rust 中,我们可以为自定义错误类型实现 `std::fmt::Display` 特征,以便将错误转换为字符串。以下是一个示例: ```rust impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::For
recommend-type

请分析下面代码:<tbody> <#if (paginationSupport.items)?has_content> <#list paginationSupport.items?sort_by('caseNo') as s> <tr class="b"> <td><a href="../user/viewRequestForm.action?requestFormId=${s.id}">${s.caseNo?default("Not Assigned")?if_exists}</a></td> <td>${s.lotId?if_exists}</td> <td><@m.directoryLink s.applicant?if_exists /></td> <td>${s.prodId?if_exists}</td> <td>${s.lotStageId?if_exists}</td> <td>${s.status?if_exists}</td> <td> <#if s.status!="Waiting For STR"> <#if s.workflowId?exists> <#list s.queuedOwners as owner> <#if owner?matches("[MmAaEeTt]\\d{3}.*")> <@m.directoryLink owner/> <#elseif owner?matches("[K-Z][A-Z][A-Z].*[_].*")> <#list adminService.loadGroupMembersByName(owner) as member> <@m.directoryLink member/><!--by Group--> </#list> <#else> <@m.orgManager owner?if_exists/><!--by Org--> </#if> </#list> </#if> <#else> <#if s.ercCategory=="Input the STR Number"||s.ercCategory=="STR"||s.ercCategory=="MSTR"> STR NO:<a href="${ercService.strLink?if_exists}${s.strEnNo?if_exists}" target="_blank">${s.strEnNo?if_exists}</a> is not effective <#else> EN NO:<a href="${ercService.enLink?if_exists}${s.strEnNo?if_exists}" target="_blank">${s.strEnNo?if_exists}</a> is not effective </#if> </#if> </td> <td><#if s.submitDate?exists>${s.submitDate?datetime?if_exists}</#if></td> <td>${s.purpose?default('')}</td> </tr> </#list> </#if> </tbody>

这段代码是使用 FreeMarker 模板引擎生成 HTML 表格内容的一部分,主要功能是根据分页数据展示请求表单的信息。以下是代码的详细分析: --- ### 1. **条件判断与循环** ```html <#if (paginationSupport.items)?has_content> ``` 判断 `paginationSupport.items` 是否存在且非空,只有存在数据时才执行表格渲染。 ```html <#list paginationSupport.items?sort_by('caseNo') as s> ``` 对 `paginationSupport.item
recommend-type

op-drone:监控未平仓头寸的市场利器

标题中提到的“op-drone”很可能是指一个特定的监控系统或服务的名称,它被用来“持续监控市场中的未平仓头寸”。未平仓头寸是指在金融市场上尚未完成交易结算的买卖双方的持仓数量。监控未平仓头寸对于市场参与者来说至关重要,因为它可以提供有关市场流动性和投资者情绪的重要信息。 在深入探讨之前,需要了解几个基础概念: 1. 未平仓头寸:指交易者在期货、期权、外汇或其他衍生品市场上建立的但尚未平仓的头寸。这些头寸在到期前仍然具有价值,而且市场上的价格变动会对它们的总体价值产生影响。 2. 持续监控:这通常是指使用软件工具或服务不断跟踪和分析市场数据的过程。持续监控可帮助交易者或市场分析师及时捕捉市场的动态变化,并根据最新情况做出交易决策。 3. 市场监控系统:这类系统通常具备收集实时数据、分析市场趋势、识别异常交易行为等多种功能。它们对于投资者了解市场状况、进行风险管理以及制定交易策略至关重要。 从描述中可以推断出,op-drone是一个专门用于持续监控未平仓头寸的系统或服务。这种系统需要具备以下功能: 1. 数据收集:系统需要有能力实时收集金融市场中的数据,包括但不限于期货、期权、股票、债券等金融产品的交易信息。 2. 数据分析:通过算法或机器学习技术分析收集到的数据,识别市场趋势、投资者行为模式以及潜在风险。 3. 异常检测:能够识别出市场中的异常交易活动,比如未平仓头寸的急剧变化,这可能是市场重大变动的前兆。 4. 风险预警:系统应能向用户发出风险预警,告知用户潜在的市场风险,帮助他们进行风险管理。 5. 报告与可视化:提供详细的数据报告和可视化图表,帮助用户更直观地理解市场状况和未平仓头寸变化。 此外,虽然文件中未提供标签和具体的文件名称列表,但可以推测“op-drone-main”可能是系统中的一个核心组件或主程序的名称。这个组件可能是整个op-drone系统运行的基础,负责处理大部分的监控和分析功能。 综合以上信息,我们可以得出一个结论:op-drone是一个专门设计用于监控金融市场上未平仓头寸的系统或服务。它可能具备实时数据收集和分析、异常行为识别、风险预警以及报告和可视化展示等多种功能,从而帮助用户在复杂的市场环境中做出更加明智的决策。对于需要精确把握市场动态、进行风险管理的金融分析师、交易员、投资组合经理等专业人士来说,这样的系统是不可或缺的工具。
recommend-type

RustWeb服务部署与环境变量配置全解析

### Rust Web 服务部署与环境变量配置全解析 在 Rust 开发中,完成业务逻辑编写后,将应用部署到生产环境是关键的下一步。本文将深入探讨 Rust 应用部署过程中的重要环节,包括环境变量的配置、二进制文件的优化以及跨平台编译等内容。 #### 1. 认证与授权相关未覆盖内容 在认证和授权方面,有一些内容未详细涉及。例如,设置会话数据库来存储令牌,以便在数据泄露或用户有其他需求时使令牌失效。可以通过设置 Redis 实例,在生成每个令牌时,不仅将结果返回给客户端,还将其存储在 Redis 键值存储中。 另外,重置用户密码和创建刷新令牌也是重要的话题。重置用户密码可以通过创建一个