活动介绍

python serial 串口接收不到数据会卡住

时间: 2023-09-04 07:04:08 浏览: 1123
### 回答1: 这个问题可能是因为串口未正确打开或未正确设置串口参数所导致的。请检查串口是否已正确打开,检查串口的波特率、数据位、停止位、校验位等参数是否与发送端设置的一致。另外,还需检查串口线是否连接正常,是否存在干扰等问题。如果以上问题都不存在,可以尝试更换串口线或更换电脑来测试。 ### 回答2: Python的serial库是用于处理串口通信的库。当串口不能接收到数据时,可能会导致代码执行“卡住”的情况。出现这个问题的原因可以有以下几种情况: 1. 串口未正确打开或关闭: 在使用串口通信前,需要确保串口已经成功打开并正确配置。没有正确打开串口,或者没有在程序结束后关闭串口,可能会导致串口接收数据时的卡顿问题。 2. 波特率设置不匹配: 串口通信中,发送方和接收方的波特率需要保持一致。如果发送方数据的波特率与接收方不符,接收方无法正确解析数据,从而导致卡住的情况。 3. 串口缓冲区溢出: 当数据发送速度大于接收速度时,可能会导致接收缓冲区溢出。当缓冲区满时,接收函数可能会等待缓冲区有足够的空间来接收新数据,从而导致程序卡顿。 为了解决这个问题,可以尝试以下方法: 1. 检查串口的打开和关闭过程,确保正确打开和关闭串口。 2. 检查发送方和接收方的波特率设置,保持一致。 3. 使用串口库提供的方法来检查缓冲区状态,避免缓冲区溢出。可以通过查询串口缓冲区中可读的字节数,或者设置串口的超时时间来避免卡顿问题。 4. 在接收数据的循环中,增加超时机制,当一定时间内没有接收到数据时,可以跳出循环,避免程序一直等待。 总结:如果Python的serial库无法接收到串口数据而卡住,可以先检查串口的打开和关闭过程,确保正确打开和关闭串口。同时要检查波特率设置是否匹配,避免串口解析数据错误。此外,可使用库提供的方法检查缓冲区状态,避免缓冲区溢出的情况发生。如果以上方法无效,可以尝试增加超时机制来避免代码卡住。 ### 回答3: 在Python中,当串口未接收到数据时,程序可能会卡住的原因有多种可能性。下面列出了一些常见的解决方法: 1. 串口设置错误:首先,确保你的串口设置正确。包括串口号、波特率、数据位、校验位、停止位等参数。你可以使用串口调试助手或其他串口工具来验证串口设置是否正确。 2. 读取超时设置:检查你的代码是否设置了正确的读取超时时间。如果读取超时时间设置过短,当串口没有接收到数据时,程序会立即超时,并卡住。你可以尝试延长超时时间,例如将读取超时时间设置为较大的值,或者设置为None表示无限等待。 3. 缓冲区问题:当串口接收数据时,可能需要使用缓冲区来存储接收到的数据,直到你读取并清空缓冲区。如果你没有及时读取缓冲区中的数据,缓冲区可能会满,导致串口卡住。因此,务必在适当的时候读取和清空缓冲区。 4. 读取方式问题:检查你使用的读取方式是否正确。通常,在Python中,可以使用`read()`或`readline()`方法来读取串口数据。确保你使用的方法适合你的数据格式,并及时处理读取到的数据。 5. 其他硬件或驱动问题:如果以上方法都无效,可能存在其他硬件或驱动问题。你可以尝试使用另一台电脑或更换串口线或串口模块,以确认是否与硬件有关。 总而言之,串口接收不到数据导致卡住的问题可能涉及多个方面,从串口设置到代码逻辑等都需要进行仔细检查和排查。
阅读全文

相关推荐

import binascii import serial import time import struct import queue import threading from datetime import datetime CanOBDItemList = [] CanPGNItemList = [[0,0,0,0]] filteredCanOBDItemList = [] Frame_start = b’\xFF’ Frame_end = b’\x55’ Frame_data_style_len = 6 Frame_Data_Len = 0 frame_buffer = bytearray() class CanInfShow_Item: def int(self,CanID,CanFramType,Len,CanDataInf): self.SystemCycle = datetime.strftime(“%Y-%m-%d %H:%M:%S.%f”)[:-3], self.CanID = CanID, self.CanFrame = CanFramType, self.CanDataLen = Len, self.CanData = CanDataInf class CanPGNShow_Item: def int(self, PGNID, CanID, CanData, Signal): self.PGNID = PGNID, self.CanID = CanID, self.CanData = CanData, self.Signal = Signal class SerialPort: def init(self, port, baudrate): 初始化串口参数 self.port = port self.baudrate = baudrate self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE ) 等待串口连接稳定 self.last_data_time = time.time() # 新增:最后接收数据的时间戳 self.cycle_dict = {} # 存储{帧ID: [上次时间戳, 当前周期]} self.last_frame_time = {} # 存储每个ID的最后出现时间 self.data_updated_ids = set() # 存储数据变化的CAN ID self.new_added_ids = set() # 存储新增的CAN ID self.last_data_dict = {} # 存储每个CAN ID的上一次数据 self.changed_bytes_dict = {} # 存储每个CAN ID的变化字节索引 self.last_pgn_data_dict = {} # 存储每个PGN ID的上一次数据 self.changed_pgn_bytes_dict = {} # 存储每个PGN ID的变化字节索引 self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.raw_data_queue = queue.Queue(maxsize=10000) self.data_lock = threading.Lock() self.worker_threads = [] self.allowed_pgn_ids = {0xFEF1, 0xF004, 0xFEC1, 0xFEE5, 0xFEEE, 0xFE56, 0xFEF2, 0xF005} self.filter_cycles = [] # 添加这一行用于存储过滤周期 time.sleep(0.2) if not self.ser.isOpen(): print(“串口打开失败!”) def close(self): # 关闭串口连接 if self.ser.isOpen(): self.ser.close() def send(self, data): # 发送数据 if self.ser.isOpen(): try: self.ser.write(data.encode(‘utf-8’)) except serial.SerialException as e: print(f"发送数据失败: {e}“) def recv(self, chunk_size=1024): if self.ser.isOpen(): # 每次最多读取1024字节 data = self.ser.read(min(self.ser.inWaiting(), chunk_size)) if data: self.last_data_time = time.time() return data return None def del(self): self.close() def SerialIsOpen(self): if self.ser.isOpen(): return 1 else: return 0 def start_reading(self): self.recv_thread = threading.Thread(target=self._recv_worker, daemon=True) self.parse_thread = threading.Thread(target=self._parse_worker, daemon=True) self.recv_thread.start() self.parse_thread.start() self.worker_threads = [self.recv_thread, self.parse_thread] def _recv_worker(self): while self.ser.isOpen(): data = self.recv(chunk_size=4096) # 每次最多读4KB if data: self.raw_data_queue.put(data) # else: # time.sleep(0.001) def _parse_worker(self): while True: try: data = self.raw_data_queue.get(timeout=0.1) print(“十六进制数据:”, data.hex()) for byte in data: self.process_byte(byte) except queue.Empty: continue def process_byte(self, byte): “”” 使用状态机逐字节解析帧结构。 “”" if self.state == 0: # 等待帧头 FF if byte == 0xFF: self.current_frame.append(byte) self.state = 1 else: # 如果不是帧头,忽略该字节 pass elif self.state == 1: # 等待帧头 55 if byte == 0x55: self.current_frame.append(byte) self.state = 2 else: # 如果第二字节不是55,重置 self.reset_state() elif self.state == 2: # 接收总长度低位 (第2字节) self.current_frame.append(byte) self.state = 3 elif self.state == 3: # 接收总长度高位 (第3字节) self.current_frame.append(byte) # 计算总长度(从第2字节开始) length_high = self.current_frame[2] length_low = byte self.expected_length = (length_high << 8) | length_low self.state = 4 elif self.state == 4: # 接收类型字段 (第4字节) self.current_frame.append(byte) self.state = 5 elif self.state == 5: # 接收保留字段 (第5字节) self.current_frame.append(byte) self.state = 6 elif self.state == 6: # 接收 CAN 通道类型 (第6字节) self.current_frame.append(byte) self.state = 7 elif self.state == 7: # 接收 CAN 报文个数 N (第7字节) self.current_frame.append(byte) self.num_messages = byte self.state = 8 self.messages_received = 0 elif self.state == 8: #接收can报文 self.current_frame.append(byte) self.messages_received += 1 if self.messages_received == self.num_messages * 12: self.state = 9 elif self.state == 9: # 接收校验位 self.current_frame.append(byte) self.state = 10 elif self.state == 10: # 完整帧已接收,验证校验和 if self.verify_checksum(): print(“完整帧数据:”, self.current_frame.hex()) self.Frame_analoy_process(bytes(self.current_frame)) else: print(“校验失败,丢弃当前帧”) self.reset_state() def verify_checksum(self): “”" 验证校验和:从第2字节到倒数第二个字节之和 & 0xFF “”" data_to_check = self.current_frame[2:-1] # 从第2字节到最后一个校验位之前 checksum = sum(data_to_check) & 0xFF return checksum == self.current_frame[-1] def reset_state(self): “”" 重置状态机 “”" self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.messages_received = 0 def set_filter_cycles(self, cycles): self.filter_cycles = cycles.copy() if cycles else [] #报文解析 def Frame_analoy_process(self, Framedata): # 检查帧类型 (0x0C 0x98) if len(Framedata) < 8 or Framedata[4] != 0x0C or Framedata[5] != 0x98: return try: FrameNum = int(Framedata[7]) except IndexError: return # 检查是否有足够数据 if len(Framedata) < 12 * FrameNum + 8: return current_time = time.time() # 获取当前精确时间戳 for index in range(0,FrameNum): # 时间戳 Cantime = datetime.now().strftime(“%Y-%m-%d %H:%M:%S.%f”)[:-3] try: id_bytes = [ Framedata[12 * index + 11], # LSB Framedata[12 * index + 10], Framedata[12 * index + 9], Framedata[12 * index + 8] # MSB ] except IndexError: continue # 格式化为8位大写十六进制 CanID = ‘’.join(format(b, ‘02X’) for b in id_bytes) # 提取ID字节 CanFramType = “Cycle” Len = 8 CanDataSpace = ‘’ PGNdata = ‘’ PGNID = int(Framedata[12 * index + 9] ) + int(Framedata[12 * index + 10]* 0x100) PGNSignl = self.Frame_analoy_PGN_Signal(PGNID,Framedata[12 * index + 12:]) # 提取数据部分 PGNdata = ‘’.join( format(Framedata[12 * index + 12 + posindex], ‘02X’) for posindex in range(8) ).upper() try: CanDataSpace = ’ ‘.join( format(Framedata[12 * index + 12 + posindex], ‘02X’) for posindex in range(8) ) except IndexError: continue current_data = CanDataSpace.split() if CanID in self.last_data_dict: last_data = self.last_data_dict[CanID].split() changed_indices = [] for i in range(min(len(last_data), len(current_data))): if last_data[i] != current_data[i]: changed_indices.append(i) self.changed_bytes_dict[CanID] = changed_indices else: self.changed_bytes_dict[CanID] = [] # 新ID无变化 self.last_data_dict[CanID] = CanDataSpace CanItemData = [Cantime, CanID, CanFramType, Len, CanDataSpace] # print(CanDataSpace) # ✅ 只有在白名单内的PGNID才处理PGN信号 # 获取当前PGN数据 current_pgn_data = PGNdata # 使用上面生成的两位格式数据 # 检查数据变化 if PGNID in self.last_pgn_data_dict: last_data = self.last_pgn_data_dict[PGNID] changed_indices = [] for i in range(min(len(last_data), len(current_pgn_data) // 2)): start_idx = i * 2 end_idx = start_idx + 2 if last_data[start_idx:end_idx] != current_pgn_data[start_idx:end_idx]: changed_indices.append(i) self.changed_pgn_bytes_dict[PGNID] = changed_indices else: self.changed_pgn_bytes_dict[PGNID] = [] # 新PGN ID无变化 self.last_pgn_data_dict[PGNID] = current_pgn_data if PGNID in self.allowed_pgn_ids: PGNSignl = self.Frame_analoy_PGN_Signal(PGNID, Framedata[12 * index + 12:]) SignalItemData = [hex(PGNID)[2:].upper(), CanID, PGNdata, PGNSignl] if all(not sublist for sublist in CanPGNItemList) or CanPGNItemList[0][0] == 0: if len(CanPGNItemList): CanPGNItemList.pop(0) CanPGNItemList.insert(0, SignalItemData) else: Listpos = self.find_in_2d_list(CanPGNItemList, CanID) if Listpos is not None: CanPGNItemList[Listpos[0]] = SignalItemData else: CanPGNItemList.append(SignalItemData) if CanID in self.last_frame_time: last_time = self.last_frame_time[CanID] cycle = (current_time - last_time) * 1000 # 转换为毫秒 if cycle > 40: if cycle<self.cycle_dict[CanID] : self.cycle_dict[CanID] = cycle/2 elif self.cycle_dict[CanID] == 0 : self.cycle_dict[CanID] = cycle/2 else: self.cycle_dict[CanID] = 0 # 首次出现,周期设为0 if “18F00F52” in self.cycle_dict: print(self.cycle_dict[“18F00F52”]) self.last_frame_time[CanID] = current_time # 更新列表 filtered_cycles = getattr(self, ‘filter_cycles’, []) is_filtered = False if filtered_cycles: for filter_cycle in filtered_cycles: tolerance = filter_cycle * 0.15 # 允许±15%的容差 if abs(self.cycle_dict[CanID] - filter_cycle) <= tolerance: is_filtered = True break # 根据过滤状态更新相应的列表 if is_filtered: # 更新到filteredCanOBDItemList Listpos = self.find_in_2d_list(filteredCanOBDItemList, CanID) if Listpos is not None: filteredCanOBDItemList[Listpos[0]] = CanItemData self.data_updated_ids.add(CanID) else: filteredCanOBDItemList.append(CanItemData) self.new_added_ids.add(CanID) else: # 更新到CanOBDItemList Listpos = self.find_in_2d_list(CanOBDItemList, CanID) if Listpos is not None: CanOBDItemList[Listpos[0]] = CanItemData self.data_updated_ids.add(CanID) else: CanOBDItemList.append(CanItemData) self.new_added_ids.add(CanID) # self.last_data_time = time.time() # 解析到有效帧时更新时间 def find_in_2d_list(self,matrix, target): for i, row in enumerate(matrix): if any(x == target for x in row): return (i, row.index(target)) # 使用row.index()找到具体列的索引 return None def Frame_analoy_PGN_Signal(self, PGNID, Framedata): # 确保数据是整数列表(0-255) if not all(isinstance(x, int) for x in Framedata): Framedata = [int(x) for x in Framedata] # 根据J1939规范解析 if PGNID == 0xFEF1: # 车速 (CCVS1) # 位置2-3 (索引1-2),大端序,单位1/256 km/h raw_val = (Framedata[1] << 8) | Framedata[2] return raw_val / 256.0 elif PGNID == 0xFE6C: # 车速 (TCO1) - 新增 # 位置7-8 (索引6-7),大端序,单位1/256 km/h raw_val = (Framedata[6] << 8) | Framedata[7] return raw_val / 256.0 elif PGNID == 0xF004: # 发动机转速+负载 # 负载:位置3 (索引2),单位1% engine_load = Framedata[2] & 0x7F # 取7位 # 转速:位置4-5 (索引3-4),大端序,单位0.125 RPM raw_rpm = (Framedata[3] << 8) | Framedata[4] rpm = raw_rpm * 0.125 return f’{engine_load}|{rpm}’ elif PGNID == 0xFEC1: # 里程表 (VDHR) # 位置1-4 (索引0-3),大端序,单位0.125米 raw_val = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) return raw_val * 0.125 # 转换为米 elif PGNID == 0xFEE5: # 发动机小时数 # 位置1-4 (索引0-3),大端序,单位0.05小时 raw_val = (Framedata[0] << 24) | (Framedata[1] << 16) | (Framedata[2] << 8) | Framedata[3] return raw_val * 0.05 elif PGNID == 0xFEF2: # 平均油耗 # 位置1-2 (索引0-1),大端序,单位0.05 L/h raw_val = (Framedata[0] << 8) | Framedata[1] return raw_val * 0.05 elif PGNID == 0xFEEE: # 冷却液温度 # 位置1 (索引0),单位1°C,偏移-40 return Framedata[0] - 40 elif PGNID == 0xFE56: # 燃油液位 # 位置1 (索引0),单位0.4% return Framedata[0] * 0.4 elif PGNID == 0xF005: # 档位 # 位置4 (索引3),直接返回值 return Framedata[3] return None # def start_reading(self): # self.read_thread = threading.Thread(target=self.Com_read_frame, daemon=True) # self.read_thread.start() 这是这段程序的部分print输出:十六进制数据: ff 十六进制数据: 55 十六进制数据: 1000 十六进制数据: 0c 十六进制数据: 98 十六进制数据: 00 十六进制数据: 01 十六进制数据: 0b 十六进制数据: 00 十六进制数据: 00 十六进制数据: 0c 十六进制数据: fc 十六进制数据: ff 十六进制数据: fa 十六进制数据: fa 十六进制数据: ff 十六进制数据: ff 十六进制数据: ff 十六进制数据: 26 十六进制数据: de 十六进制数据: ff 完整帧数据: ff5510000c9800010b00000cfcfffafaffffff26de 十六进制数据: 5510000c 十六进制数据: 98 十六进制数据: 00010b 十六进制数据: 00000c 十六进制数据: fcff 十六进制数据: fafa 十六进制数据: ff 十六进制数据: ffff 十六进制数据: 37ef 十六进制数据: ff 十六进制数据: 55 十六进制数据: 1000 十六进制数据: 0c 十六进制数据: 98 十六进制数据: 00 十六进制数据: 010b 十六进制数据: 00 十六进制数据: 00 十六进制数据: 0c 十六进制数据: fc 十六进制数据: ff 十六进制数据: fa 十六进制数据: fa 十六进制数据: ff 十六进制数据: ff 十六进制数据: ff30 十六进制数据: e8 十六进制数据: ff 完整帧数据: ff5510000c9800010b00000cfcfffafaffffff30e8 在十六进制数据中,FF 55是帧头可以看到发送了3帧的数据,但是完整帧数据显示只接收了2帧,找到其中的问题并解决

import binascii import serial import time import struct import queue import threading from datetime import datetime CanOBDItemList = [] CanPGNItemList = [[0,0,0,0]] filteredCanOBDItemList = [] Frame_start = b'\xFF' Frame_end = b'\x55' Frame_data_style_len = 6 Frame_Data_Len = 0 frame_buffer = bytearray() class CanInfShow_Item: def __int__(self,CanID,CanFramType,Len,CanDataInf): self.SystemCycle = datetime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], self.CanID = CanID, self.CanFrame = CanFramType, self.CanDataLen = Len, self.CanData = CanDataInf class CanPGNShow_Item: def __int__(self, PGNID, CanID, CanData, Signal): self.PGNID = PGNID, self.CanID = CanID, self.CanData = CanData, self.Signal = Signal class SerialPort: def __init__(self, port, baudrate): # 初始化串口参数 self.port = port self.baudrate = baudrate self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE ) # 等待串口连接稳定 self.last_data_time = time.time() # 新增:最后接收数据的时间戳 self.cycle_dict = {} # 存储{帧ID: [上次时间戳, 当前周期]} self.last_frame_time = {} # 存储每个ID的最后出现时间 self.data_updated_ids = set() # 存储数据变化的CAN ID self.new_added_ids = set() # 存储新增的CAN ID self.last_data_dict = {} # 存储每个CAN ID的上一次数据 self.changed_bytes_dict = {} # 存储每个CAN ID的变化字节索引 self.last_pgn_data_dict = {} # 存储每个PGN ID的上一次数据 self.changed_pgn_bytes_dict = {} # 存储每个PGN ID的变化字节索引 self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.raw_data_queue = queue.Queue(maxsize=10000) self.data_lock = threading.Lock() self.worker_threads = [] self.allowed_pgn_ids = {0xFEF1, 0xF004, 0xFEC1, 0xFEE5, 0xFEEE, 0xFE56, 0xFEF2, 0xF005} self.filter_cycles = [] # 添加这一行用于存储过滤周期 time.sleep(0.2) if not self.ser.isOpen(): print("串口打开失败!") def close(self): # 关闭串口连接 if self.ser.isOpen(): self.ser.close() def send(self, data): # 发送数据 if self.ser.isOpen(): try: self.ser.write(data.encode('utf-8')) except serial.SerialException as e: print(f"发送数据失败: {e}") # def recv(self): # # 接收数据 # if self.ser.isOpen(): # data = self.ser.read(self.ser.inWaiting()) # if data: # 有数据时更新时间戳 # self.last_data_time = time.time() # return data def recv(self, chunk_size=1024): if self.ser.isOpen(): # 每次最多读取1024字节 data = self.ser.read(min(self.ser.inWaiting(), chunk_size)) if data: self.last_data_time = time.time() return data return None def __del__(self): self.close() def SerialIsOpen(self): if self.ser.isOpen(): return 1 else: return 0 def start_reading(self): self.recv_thread = threading.Thread(target=self._recv_worker, daemon=True) self.parse_thread = threading.Thread(target=self._parse_worker, daemon=True) self.recv_thread.start() self.parse_thread.start() self.worker_threads = [self.recv_thread, self.parse_thread] def _recv_worker(self): while self.ser.isOpen(): data = self.recv(chunk_size=4096) # 每次最多读4KB if data: self.raw_data_queue.put(data) # else: # time.sleep(0.001) def _parse_worker(self): while True: try: data = self.raw_data_queue.get(timeout=0.1) print("十六进制数据:", data.hex()) for byte in data: self.process_byte(byte) except queue.Empty: continue def process_byte(self, byte): """ 使用状态机逐字节解析帧结构。 """ if self.state == 0: # 等待帧头 FF if byte == 0xFF: self.current_frame.append(byte) self.state = 1 else: # 如果不是帧头,忽略该字节 pass elif self.state == 1: # 等待帧头 55 if byte == 0x55: self.current_frame.append(byte) self.state = 2 else: # 如果第二字节不是55,重置 self.reset_state() elif self.state == 2: # 接收总长度低位 (第2字节) self.current_frame.append(byte) self.state = 3 elif self.state == 3: # 接收总长度高位 (第3字节) self.current_frame.append(byte) # 计算总长度(从第2字节开始) length_high = self.current_frame[2] length_low = byte self.expected_length = (length_high << 8) | length_low self.state = 4 elif self.state == 4: # 接收类型字段 (第4字节) self.current_frame.append(byte) self.state = 5 elif self.state == 5: # 接收保留字段 (第5字节) self.current_frame.append(byte) self.state = 6 elif self.state == 6: # 接收 CAN 通道类型 (第6字节) self.current_frame.append(byte) self.state = 7 elif self.state == 7: # 接收 CAN 报文个数 N (第7字节) self.current_frame.append(byte) self.num_messages = byte self.state = 8 self.messages_received = 0 elif self.state == 8: #接收can报文 self.current_frame.append(byte) self.messages_received += 1 if self.messages_received == self.num_messages * 12: self.state = 9 elif self.state == 9: # 接收校验位 self.current_frame.append(byte) self.state = 10 elif self.state == 10: # 完整帧已接收,验证校验和 if self.verify_checksum(): self.Frame_analoy_process(bytes(self.current_frame)) else: print("校验失败,丢弃当前帧") self.reset_state() def verify_checksum(self): """ 验证校验和:从第2字节到倒数第二个字节之和 & 0xFF """ data_to_check = self.current_frame[2:-1] # 从第2字节到最后一个校验位之前 checksum = sum(data_to_check) & 0xFF return checksum == self.current_frame[-1] def reset_state(self): """ 重置状态机 """ self.state = 0 self.current_frame = bytearray() self.expected_length = 0 self.messages_received = 0 def set_filter_cycles(self, cycles): self.filter_cycles = cycles.copy() if cycles else [] #报文解析 def Frame_analoy_process(self, Framedata): # 检查帧类型 (0x0C 0x98) if len(Framedata) < 8 or Framedata[4] != 0x0C or Framedata[5] != 0x98: return try: FrameNum = int(Framedata[7]) except IndexError: return # 检查是否有足够数据 if len(Framedata) < 12 * FrameNum + 8: return current_time = time.time() # 获取当前精确时间戳 for index in range(0,FrameNum): # 时间戳 Cantime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] try: id_bytes = [ Framedata[12 * index + 11], # LSB Framedata[12 * index + 10], Framedata[12 * index + 9], Framedata[12 * index + 8] # MSB ] except IndexError: continue # 格式化为8位大写十六进制 CanID = ''.join(format(b, '02X') for b in id_bytes) # 提取ID字节 CanFramType = "Cycle" Len = 8 CanDataSpace = '' PGNdata = '' PGNID = int(Framedata[12 * index + 9] ) + int(Framedata[12 * index + 10]* 0x100) PGNSignl = self.Frame_analoy_PGN_Signal(PGNID,Framedata[12 * index + 12:]) # 提取数据部分 PGNdata = ''.join( format(Framedata[12 * index + 12 + posindex], '02X') for posindex in range(8) ).upper() try: CanDataSpace = ' '.join( format(Framedata[12 * index + 12 + posindex], '02X') for posindex in range(8) ) except IndexError: continue current_data = CanDataSpace.split() if CanID in self.last_data_dict: last_data = self.last_data_dict[CanID].split() changed_indices = [] for i in range(min(len(last_data), len(current_data))): if last_data[i] != current_data[i]: changed_indices.append(i) self.changed_bytes_dict[CanID] = changed_indices else: self.changed_bytes_dict[CanID] = [] # 新ID无变化 self.last_data_dict[CanID] = CanDataSpace CanItemData = [Cantime, CanID, CanFramType, Len, CanDataSpace] # print(CanDataSpace) # ✅ 只有在白名单内的PGNID才处理PGN信号 # 获取当前PGN数据 current_pgn_data = PGNdata # 使用上面生成的两位格式数据 # 检查数据变化 if PGNID in self.last_pgn_data_dict: last_data = self.last_pgn_data_dict[PGNID] changed_indices = [] for i in range(min(len(last_data), len(current_pgn_data) // 2)): start_idx = i * 2 end_idx = start_idx + 2 if last_data[start_idx:end_idx] != current_pgn_data[start_idx:end_idx]: changed_indices.append(i) self.changed_pgn_bytes_dict[PGNID] = changed_indices else: self.changed_pgn_bytes_dict[PGNID] = [] # 新PGN ID无变化 self.last_pgn_data_dict[PGNID] = current_pgn_data if PGNID in self.allowed_pgn_ids: PGNSignl = self.Frame_analoy_PGN_Signal(PGNID, Framedata[12 * index + 12:]) SignalItemData = [hex(PGNID)[2:].upper(), CanID, PGNdata, PGNSignl] if all(not sublist for sublist in CanPGNItemList) or CanPGNItemList[0][0] == 0: if len(CanPGNItemList): CanPGNItemList.pop(0) CanPGNItemList.insert(0, SignalItemData) else: Listpos = self.find_in_2d_list(CanPGNItemList, CanID) if Listpos is not None: CanPGNItemList[Listpos[0]] = SignalItemData else: CanPGNItemList.append(SignalItemData) if CanID in self.last_frame_time: last_time = self.last_frame_time[CanID] cycle = (current_time - last_time) * 1000 # 转换为毫秒 if cycle > 40: if cycle<self.cycle_dict[CanID] : self.cycle_dict[CanID] = cycle/2 elif self.cycle_dict[CanID] == 0 : self.cycle_dict[CanID] = cycle/2 else: self.cycle_dict[CanID] = 0 # 首次出现,周期设为0 if "18F00F52" in self.cycle_dict: print(self.cycle_dict["18F00F52"]) self.last_frame_time[CanID] = current_time # 更新列表 filtered_cycles = getattr(self, 'filter_cycles', []) is_filtered = False if filtered_cycles: for filter_cycle in filtered_cycles: tolerance = filter_cycle * 0.15 # 允许±15%的容差 if abs(self.cycle_dict[CanID] - filter_cycle) <= tolerance: is_filtered = True break # 根据过滤状态更新相应的列表 if is_filtered: # 更新到filteredCanOBDItemList Listpos = self.find_in_2d_list(filteredCanOBDItemList, CanID) if Listpos is not None: filteredCanOBDItemList[Listpos[0]] = CanItemData self.data_updated_ids.add(CanID) else: filteredCanOBDItemList.append(CanItemData) self.new_added_ids.add(CanID) else: # 更新到CanOBDItemList Listpos = self.find_in_2d_list(CanOBDItemList, CanID) if Listpos is not None: CanOBDItemList[Listpos[0]] = CanItemData self.data_updated_ids.add(CanID) else: CanOBDItemList.append(CanItemData) self.new_added_ids.add(CanID) # self.last_data_time = time.time() # 解析到有效帧时更新时间 def find_in_2d_list(self,matrix, target): for i, row in enumerate(matrix): if any(x == target for x in row): return (i, row.index(target)) # 使用row.index()找到具体列的索引 return None def Frame_analoy_PGN_Signal(self, PGNID, Framedata): # 确保数据是整数列表(0-255) if not all(isinstance(x, int) for x in Framedata): Framedata = [int(x) for x in Framedata] # 根据J1939规范解析 if PGNID == 0xFEF1: # 车速 (CCVS1) # 位置2-3 (索引1-2),大端序,单位1/256 km/h raw_val = (Framedata[1] << 8) | Framedata[2] return raw_val / 256.0 elif PGNID == 0xFE6C: # 车速 (TCO1) - 新增 # 位置7-8 (索引6-7),大端序,单位1/256 km/h raw_val = (Framedata[6] << 8) | Framedata[7] return raw_val / 256.0 elif PGNID == 0xF004: # 发动机转速+负载 # 负载:位置3 (索引2),单位1% engine_load = Framedata[2] & 0x7F # 取7位 # 转速:位置4-5 (索引3-4),大端序,单位0.125 RPM raw_rpm = (Framedata[3] << 8) | Framedata[4] rpm = raw_rpm * 0.125 return f'{engine_load}|{rpm}' elif PGNID == 0xFEC1: # 里程表 (VDHR) # 位置1-4 (索引0-3),大端序,单位0.125米 raw_val = int(Framedata[3] * 0x1000000) + int(Framedata[2] * 0x10000) + int(Framedata[1] * 0x100) + int(Framedata[0]) return raw_val * 0.125 # 转换为米 elif PGNID == 0xFEE5: # 发动机小时数 # 位置1-4 (索引0-3),大端序,单位0.05小时 raw_val = (Framedata[0] << 24) | (Framedata[1] << 16) | (Framedata[2] << 8) | Framedata[3] return raw_val * 0.05 elif PGNID == 0xFEF2: # 平均油耗 # 位置1-2 (索引0-1),大端序,单位0.05 L/h raw_val = (Framedata[0] << 8) | Framedata[1] return raw_val * 0.05 elif PGNID == 0xFEEE: # 冷却液温度 # 位置1 (索引0),单位1°C,偏移-40 return Framedata[0] - 40 elif PGNID == 0xFE56: # 燃油液位 # 位置1 (索引0),单位0.4% return Framedata[0] * 0.4 elif PGNID == 0xF005: # 档位 # 位置4 (索引3),直接返回值 return Framedata[3] return None # def start_reading(self): # self.read_thread = threading.Thread(target=self.Com_read_frame, daemon=True) # self.read_thread.start() 这段程序在接收can数据时,部分接收到的数据会混乱,存在接收完一整帧数据后,后续数据不对应的情况,这是什么问题?

最新推荐

recommend-type

电力电子领域Boost单闭环仿真模型对电压阶跃变化与负载突变的稳定控制研究 电力电子 最新版

Boost单闭环仿真模型在电力电子领域的应用,特别是在目标输出电压阶跃变化和负载突变这两种极端工况下的稳定闭环控制效果。首先简述了Boost单闭环仿真模型的基本构造及其工作原理,接着深入探讨了在不同条件下(如电压阶跃变化和负载突变)如何利用闭环控制系统实现快速响应和平稳过渡。文中还提出了几种提升系统稳定性的方法,包括优化控制系统设计、引入误差调节和补偿机制、合理配置参数以及增强抗干扰能力。最后强调了该模型的重要性和潜在的应用前景。 适合人群:从事电力电子相关工作的工程师和技术人员,尤其是关注电源转换效率和稳定性的专业人士。 使用场景及目标:适用于需要评估或改进现有电源管理系统稳定性的场合,旨在帮助技术人员理解和掌握Boost单闭环仿真模型的工作机理,从而更好地应对实际工程中的挑战。 其他说明:随着电力电子技术的进步,Boost单闭环仿真模型有望在未来发挥更大的作用,推动工业生产和技术革新。
recommend-type

破解dex2jar: Android应用反编译与分析指南

标题中的“dex2jar”指的是一个用于将Android应用程序中的DEX文件(Dalvik可执行文件)转换成Java JAR文件的工具。这个过程被称为“DEX转JAR”,是一个逆向工程的过程,它允许开发者查看和分析Android应用程序的原始Java代码,这通常用于学习、测试和安全分析目的。破解一词在此上下文中可能用于描述不正当手段获取程序的源代码以进行修改或绕过安全机制等行为,但请注意,任何未经授权的修改和使用都可能违反法律和版权。 描述部分提供了使用dex2jar工具的基本步骤。dex2jar通常是一个批处理文件(dex2jar.bat),用于在Windows环境下执行操作。它将DEX文件(classes.dex)作为输入,并生成对应的JAR文件。这个过程需要用户已经下载并解压了dex2jar的压缩包,并将其中的dex2jar.bat文件放在一个可以访问的目录中。然后,用户需要将目标Android应用程序中的classes.dex文件复制到该目录下,并在命令行界面中运行以下命令: dex2jar.bat classes.dex 执行完毕后,会在同一目录下生成名为classes.dex.dex2jar.jar的文件。这个JAR文件实质上是将DEX文件中的类转换成了Java的.class文件,并打包成了JAR格式,供后续的分析或修改使用。 【标签】中的“Android 破解”可能被误解为破解Android应用程序的安全机制,实际上,这个标签更准确的意义是分析和理解Android应用程序的工作原理。而“jar dex”则是指JAR文件与DEX文件之间的转换关系。 【压缩包子文件的文件名称列表】中列举了几个文件名,其中“使用说明.txt”很可能是该工具的官方使用指南,提供更详细的使用说明、安装步骤和常见问题的解答。“dex2jar最新版下载dex2jar下载 2.0官方版_ - pc6下载站.url”则是一个指向PC6下载站上dex2jar工具下载页面的网页链接。“dex2jar-2.0”可能是指特定版本号的dex2jar工具压缩包。 总结以上信息,dex2jar是一个专门用于Android应用安全分析和逆向工程的工具,它将DEX文件转换成Java的JAR文件以便进行代码的查看和分析。尽管这个过程在安全研究和教育目的下是合法且有价值的,但任何使用这一工具的非法破解、侵犯版权或进行恶意行为都是不可接受的。
recommend-type

共享内存与共识算法详解

### 共享内存与共识算法详解 在分布式系统中,共享内存和共识算法是两个核心的概念。共享内存涉及到多个进程对同一数据的读写操作,而共识算法则帮助多个进程在可能存在故障的情况下达成一致的决策。下面将详细介绍这两个方面的相关知识。 #### 共享内存 共享内存的寄存器规范由Lamport在多篇论文中引入,同时定义了安全、规则和原子三种语义。 - **安全寄存器**:在三种寄存器语义中,安全寄存器提供的保证最弱。当读操作与写操作并发时,安全寄存器可能返回任意值,甚至可能是从未被写入或尝试写入的值。在多处理器机器中,进程可能并发访问实现寄存器抽象的底层硬件位置,从而导致返回任意值。在任意故障模型中
recommend-type

计算机专业本科生和研究生就业薪资待遇

<think>好的,我需要回答用户关于计算机专业本科和研究生就业薪资对比的问题。首先,我得先看看用户提供的引用资料,看看里面有没有相关的数据。 引用[4]提到,2019届计算机类本科毕业生的平均月收入是6858元,而高职是4883元。这应该可以作为本科生的参考数据。至于研究生,引用[1]指出重庆大学的计算机和软件硕士就业情况良好,薪资高于行业平均水平,但没有具体数字。不过引用[3]提到,前20名的高校多为985/211,尤其是理工类院校的毕业生薪资更高。这里可能需要结合其他信息来推断研究生的薪资水平。 另外,引用[2]提到计算机专业毕业生薪资一般在万元以上,但不确定这是否特指研究生还是包括
recommend-type

eWebEditor 10.3最新版特性与安全升级指南

从提供的信息来看,我们需要深入了解和探讨的内容主要集中在“eWebEditor最新版”这一主题上。eWebEditor是一款流行的在线HTML编辑器,它支持ASP和ASP.NET环境,并广泛用于Web内容管理。通过给出的标题和描述,以及标签和文件名称列表,我们可以推导出一系列相关的知识点。 ### 标题知识点解析 #### eWebEditor的定义与功能 “eWebEditor最新版”中提到的“eWebEditor”指的是在线HTML编辑器产品,它被广泛应用于需要在线编辑和发布网页内容的场合。编辑器通常包含许多功能,比如文本格式化、图像插入、链接管理等,提供用户友好和接近桌面程序的编辑体验。eWebEditor产品以ASP和ASP.NET作为其主要的技术平台。 #### “最新版”更新内容 “最新版”表明我们正在讨论的是eWebEditor的最新版本更新,该版本很可能是为了增加新功能、提升性能、修复已知问题或改善安全性能。一般来说,软件的更新也可能会引入对新操作系统或浏览器的兼容性,以及对现有API或开发环境的新支持。 ### 描述知识点解析 #### “亲测可用”的含义 从“亲测 可用”的描述中我们可以推断出,发布者可能已经对“eWebEditor最新版”进行了测试,并验证了其在实际使用中的性能和稳定性。该短语传递出一个积极的信号,即该版本值得信赖,用户可以期待它将正常工作,无需担心兼容性或功能缺失的问题。 ### 标签知识点解析 #### eWebEditor的版本标识 “eWebEditor ASPX 10.3 最新版”中的标签指出我们讨论的版本号为10.3,这是一个具体的产品版本,意味着它可能包含了一些特定的更新或新增特性。通过版本号,我们可以推断产品已经经过了多次迭代和改进。 #### ASPX技术框架 在标签中提到的“ASPX”,这表明eWebEditor最新版支持ASP.NET Web Forms技术,ASPX是ASP.NET网页的标准文件扩展名。这一信息指出编辑器适合使用.NET框架的网站开发环境。 ### 文件名称列表知识点解析 #### “升级说明.txt”文件 “升级说明.txt”是一个文本文件,它可能包含了eWebEditor从上一版本升级到最新版本时的变化说明,例如新增功能、改进的地方以及需要注意的变更。开发者或维护人员在升级时应该仔细阅读这些说明,以便于平滑过渡到新版本,并最大化地利用新功能。 #### “安全说明.txt”文件 “安全说明.txt”文件通常提供了关于软件安全性的相关信息,这可能包括了针对最新版的安全补丁、修复的安全漏洞列表以及安全最佳实践的建议。特别是对于在线编辑器这类直接参与网页内容生成的工具,安全尤为重要,因此,安全说明文件对于确保编辑器和整个网站的安全运行至关重要。 #### “ewebeditor”文件夹或组件 “ewebeditor”可能是实际包含eWebEditor编辑器文件的文件夹名称。通常,这类文件夹内会包含用于前端的JavaScript文件、用于后端处理的服务器端代码(ASP.NET或ASP代码),以及相关的样式文件和资源文件。对于开发者来说,了解这些文件和组件的组织结构对于集成和配置编辑器至关重要。 综合以上信息,我们可以了解到eWebEditor的最新版本更新了很多内容,可能包含性能和安全性的提升,并可能对特定的技术平台如ASP.NET提供了更好的支持。用户应该参考升级和安全说明文件,以便正确理解和应用这些更新。对于开发者而言,掌握如何在项目中部署和配置eWebEditor编辑器也是一个重要的技能点。
recommend-type

分布式系统中的时间抽象与故障处理

### 分布式系统中的时间抽象与故障处理 #### 1. 故障检测概述 在分布式系统中,存在三种不同的系统假设:异步系统假设、同步系统假设和部分同步系统假设。异步系统不包含任何时间假设,我们的进程和链路抽象直接体现了这一点。然而,这些抽象不足以定义同步和部分同步系统。 为了添加时间假设,一种方法是用时间保证来扩展进程和链路抽象,但这会导致规范过于复杂。因此,我们引入了故障检测器的抽象概念,它能提供关于哪些进程崩溃、哪些进程正常的信息,不过这些信息不一定准确。 故障检测器抽象相较于直接对进程和链路做时间假设具有以下两个优势: - 减轻了用时间假设扩展进程和链路抽象的需求,保留了这些抽象的简
recommend-type

我发一份参考课程设计目录

<think>好的,用户之前问过关于高校实验室安排系统的数据库课程设计,现在他需要一份参考的课程设计目录。首先,我需要回顾之前的对话,确保目录结构和他之前的问题相关,并且覆盖他提到的核心需求、ER模型、表结构、业务逻辑等内容。 用户现在要目录,可能是在进行课程设计的结构规划,需要明确的章节安排。我需要考虑课程设计通常包含的部分,比如需求分析、系统设计、数据库设计、实现、测试等。同时,结合之前给出的内容,比如ER图、表结构、冲突检测算法等,确保这些部分在目录中有体现。 还要注意用户可能希望目录足够详细,包含各个关键部分,比如摘要、需求分析、数据库设计(包括概念结构、逻辑结构、物理结构)、系统
recommend-type

惠普AMTBrand深度解析与技术特点

标题和描述中提到的“惠普AMTBrand”,可能是指惠普(Hewlett-Packard,简称HP)公司旗下的AMTBrand相关产品或技术。然而,由于给出的信息非常有限,我们可以先从惠普公司以及AMT(Active Management Technology,主动管理技术)两个方面进行展开。惠普是全球知名的IT企业,提供多种计算机硬件、软件、云服务和解决方案,而AMT是英特尔(Intel)研发的一种硬件级别的远程管理技术。 首先,我们来了解惠普公司: 惠普(Hewlett-Packard Enterprise,简称HPE),是全球领先的信息技术解决方案提供商。成立于1939年,由Bill Hewlett和David Packard在一间车库里创立,如今已经成为全球范围内提供广泛IT产品与服务的企业。惠普的产品和服务包括但不限于个人计算机(PC)、打印设备、工作站、服务器、网络设备、存储解决方案以及软件和服务。 惠普在IT服务管理、云计算、大数据和分析等领域均有涉猎,并为各种规模的企业提供从基础架构到应用管理的全方位解决方案。随着数字化转型的不断深入,惠普也在不断地通过研发新技术和收购相关企业来拓展其产品和服务的范围。 接着,我们探索AMT技术: AMT是英特尔推出的一种基于硬件的管理解决方案,它允许IT部门远程管理企业中的个人计算机和其他设备。AMT是一种集成在商用英特尔处理器中的技术,能够在个人电脑关机或者操作系统失效的情况下,提供网络访问以及硬件级别的远程管理功能。这项技术最初由英特尔在2006年发布,历经数代更新,为IT运维人员提供了众多功能,如远程开机、远程维护、软件部署、系统监控等。 AMT的优势主要体现在以下几个方面: 1. 远程访问:即使目标设备没有开机或操作系统出现故障,AMT也可以实现远程访问和管理。 2. 安全性:它提供比传统软件解决方案更为深入的硬件级别安全机制,确保数据传输的安全。 3. 节约成本:通过减少现场支持的需要,AMT能够帮助公司节约维护成本。 4. 提高效率:快速远程解决问题,从而提高IT部门的工作效率和响应速度。 然而,由于AMT技术基于特定的硬件和软件,这意味着用户需要购买带有AMT功能的英特尔处理器和相应的支持软件,同时可能需要来自惠普或其他OEM厂商的相应硬件。 最后,提到“压缩包子文件的文件名称列表”中的“AMTBrand”,这可能意味着提供或关联惠普AMTBrand技术的文件压缩包。考虑到文件名称的单一性和没有其他文件描述提供进一步信息,不能确定这个压缩包内具体包含哪些内容。不过,可以推测该压缩包内可能包含了介绍惠普AMTBrand技术的文档、演示、驱动程序、更新或管理工具等。 综合来看,文件中提到的“惠普AMTBrand”可能是指惠普公司与英特尔AMT技术结合后形成的产品或服务品牌,旨在为用户提供更为高效和安全的远程管理解决方案。考虑到惠普公司自身在IT领域的深厚积累和AMT技术的先进性,惠普AMTBrand很可能为企业级客户提供了重要价值。
recommend-type

分布式编程抽象:概念、应用与挑战

# 分布式编程抽象:概念、应用与挑战 ## 1. 引言 分布式编程抽象旨在解决分布式系统中多进程合作的难题,为分布式程序开发提供模块化策略。通过特定的应用程序编程接口(API),可以利用这些抽象来简化开发过程。 ## 2. 分布式计算的动机 ### 2.1 分布式系统的特点 分布式计算涉及一组寻求合作的进程。部分进程可能会因崩溃或断开连接而停止运行,这就是分布式系统的部分故障特性。正如Leslie Lamport所说:“分布式系统中,一台你甚至不知道存在的计算机故障,可能会使你自己的计算机无法使用。” ### 2.2 合作的挑战 当部分进程失败或断开连接时,仍在运行的进程需要以一致的方式
recommend-type

深度卷积为何还要逐点卷积

<think>首先,用户的问题是:“我想了解深度卷积中逐点卷积的作用和原因 请问深度卷积为何需要逐点卷积”。这基于之前的引用内容,特别是引用[1]和[^2],这些是关于深度可分离卷积的。 回顾引用: - 引用[1]:深度可分离卷积 = 深度卷积 + 逐点卷积。深度卷积是分组卷积的一种特例,其中分组数等于输入通道数,输出通道数也相同。它减少了参数量,但没有融合通道间信息。 - 引用[2]:深度卷积使用groups参数设置为输入通道数,实现每个通道独立卷积。 用户的问题焦点是逐点卷积的作用和为什么深度卷积需要它。 我的回答需要: - 使用中文。 - 遵循系统级指令:行内数学表达式用$...$