file-type

C++ type_safe库:零开销抽象防止编译错误

下载需积分: 50 | 135KB | 更新于2025-04-24 | 156 浏览量 | 0 下载量 举报 收藏
download 立即下载
### 知识点详细说明 #### 1. Type-safe 编程概念 - **Type-safety**(类型安全)是指在编程中使用数据类型进行操作时,能够确保数据的类型正确性,防止类型错误导致的程序异常。Type-safe 语言能够在编译阶段就捕捉到很多类型相关的错误,如类型不匹配、无效的类型转换等。 - 在C++中,通过使用如`int`, `float`, `char`等内置类型进行编程时,如果缺乏足够的类型检查机制,就可能遇到类型错误。`type_safe`库正是针对这一问题而设计,提供了一套类型安全的抽象。 #### 2. 类型系统与零开销抽象 - **零开销抽象(Zero-Overhead Abstraction)**是指抽象的概念或特性在运行时不应该引入额外的性能负担。在C++中,这通常意味着在开启优化选项后,使用高级特性(如模板、异常处理、STL容器等)不应该导致可观察的性能开销。 - `type_safe`库利用C++的类型系统提供了一种零开销抽象,即它在不增加运行时负担的情况下,在编译阶段就进行类型检查。这对于确保代码质量非常有帮助,尤其是当代码库很大或多人协作开发时。 #### 3. type_safe 库特性 - **改进的内置类型**:`type_safe`库提供了一系列改进的内置类型,例如`ts::integer<T>`和`ts::floating_point<T>`。这些类型是C++内置类型的包装器,但提供了额外的类型安全特性。 - **无默认构造函数**:这要求用户在使用这些类型时必须进行有意义的初始化,避免了未初始化变量可能引发的错误。 - **无“有损”转换**:防止了不同数据类型间的隐式类型转换,这种转换可能导致数据丢失或计算错误。 - **严格算术/比较规则**:禁止混合不同类型的算术运算和比较操作,以防止不精确或不一致的计算结果。 - **上溢/下溢的定义行为**:在非调试模式下,对于整数的上溢/下溢行为是未定义的,这意味着编译器可以根据具体情况优化这些行为,从而提高程序效率。在调试模式下,可能会有一些性能折衷以提供错误检测。 #### 4. 示例使用场景 - 使用`type_safe`库可以有效防止在进行整数运算时产生的类型不匹配错误,避免因整数溢出导致的程序崩溃问题。例如,在金融系统、航天控制系统等对安全性要求极高的场合,确保数值计算的准确性和安全性至关重要。 - 在设计需要高度类型安全的库或应用时,可以利用`type_safe`库提供的包装器类型来保证类型的安全性,减少运行时的错误检查开销。 #### 5. C++ 类型安全相关的知识 - 在C++中,类型安全不仅可以通过使用库如`type_safe`来实现,还可以通过其他方式,例如使用模板元编程、利用编译器特性(如`static_assert`)、以及遵循良好的编程实践(如使用`const`关键字、引用限定符等)来提高类型安全性。 - 类型安全是编译时检查的一部分,它有助于提高代码的可维护性和可读性,同时减少运行时错误的发生。 #### 6. 关于C++编程 - C++是一种静态类型、编译式、通用的编程语言,广泛应用于软件开发领域。其特性包括多范式编程(支持面向对象、泛型和过程式编程)、内存管理功能和对低级操作的精细控制等。 - `type_safe`库的使用表明C++在不断发展,它不仅保持了性能优势,还在提高代码安全性和易用性方面不断进步。 #### 7. 关于开源和赞助 - 开源软件允许用户免费使用和修改源代码。`type_safe`库的开源性质使其可以被广泛地审视和改进,从而不断提高其可靠性和功能性。 - 社区赞助或捐赠是许多开源项目得以持续发展的重要支持方式。对于像`type_safe`这样的库,赞助可以鼓励维护者继续其开发工作,并可能促进更多优秀特性或改进的加入。 #### 8. C++标签说明 - **type-safety C++**:这个标签说明了文档讨论的核心是关于C++语言中的类型安全特性,强调了类型安全在C++编程实践中的重要性。 #### 9. 压缩包子文件文件名称说明 - **type_safe-main**:这个文件可能是`type_safe`库源代码的主文件或示例文件,用于展示库的主要功能或用于构建和运行库提供的示例程序。 通过以上知识点的详细说明,我们能够理解`type_safe`库在C++编程中的重要性,其为C++提供了类型安全的保证,减少运行时错误,并且无运行时性能损耗。同时,也让我们了解到类型安全对于C++程序的开发和维护是多么关键。

相关推荐

filetype

检查代码是否有误,是否可执行,是否合理,是否可以不牺牲精度的情况下压缩内存和提升速度: import os import sys import re import json import gc import time import tempfile import concurrent.futures import difflib import threading import traceback import numpy as np import librosa import torch import psutil import requests import hashlib import shutil from typing import List, Dict, Tuple, Optional, Set from threading import Lock, Semaphore, RLock from datetime import datetime from pydub import AudioSegment from pydub.silence import split_on_silence from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks from transformers import AutoModelForSequenceClassification, AutoTokenizer from torch.utils.data import TensorDataset, DataLoader from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QLineEdit, QTextEdit, QFileDialog, QProgressBar, QGroupBox, QMessageBox, QListWidget, QSplitter, QTabWidget, QTableWidget, QTableWidgetItem, QHeaderView, QAction, QMenu, QToolBar, QCheckBox, QComboBox, QSpinBox, QDialog, QDialogButtonBox, QStatusBar) from PyQt5.QtCore import QThread, pyqtSignal, Qt, QTimer, QSize from PyQt5.QtGui import QFont, QTextCursor, QColor, QIcon # ====================== 资源监控器 ====================== class ResourceMonitor: """统一资源监控器(增强版)""" def __init__(self): self.gpu_available = torch.cuda.is_available() def memory_percent(self) -> Dict[str, float]: """获取内存使用百分比,同时返回CPU和GPU信息""" try: result = { "cpu": psutil.virtual_memory().percent } if self.gpu_available: allocated = torch.cuda.memory_allocated() / (1024 ** 3) total = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) result["gpu"] = (allocated / total) * 100 if total > 0 else 0 return result except Exception as e: print(f"获取内存使用百分比失败: {str(e)}") return {"cpu": 0, "gpu": 0} # ====================== 方言配置中心(优化版) ====================== class DialectConfig: """集中管理方言配置,便于维护和扩展(带缓存)""" # 标准关键词 STANDARD_KEYWORDS = { "opening": ["您好", "很高兴为您服务", "请问有什么可以帮您"], "closing": ["感谢来电", "祝您生活愉快", "再见"], "forbidden": ["不知道", "没办法", "你投诉吧", "随便你"] } # 贵州方言关键词 GUIZHOU_KEYWORDS = { "opening": ["麻烦您喽", "请问搞哪样", "有咋个可以帮您", "多谢喽"], "closing": ["搞归一喽", "麻烦您喽", "再见喽", "慢走喽"], "forbidden": ["搞不成", "没得法", "随便你喽", "你投诉吧喽"] } # 方言到标准表达的映射(扩展更多贵州方言) DIALECT_MAPPING = { "恼火得很": "非常生气", "鬼火戳": "很愤怒", "搞不成": "无法完成", "没得": "没有", "搞哪样嘛": "做什么呢", "归一喽": "完成了", "咋个": "怎么", "克哪点": "去哪里", "麻烦您喽": "麻烦您了", "多谢喽": "多谢了", "憨包": "傻瓜", "归一": "结束", "板扎": "很好", "鬼火冒": "非常生气", "背时": "倒霉", "吃豁皮": "占便宜" } # 类属性缓存 _combined_keywords = None _compiled_opening = None _compiled_closing = None _hotwords = None _dialect_pattern = None @classmethod def get_combined_keywords(cls) -> Dict[str, List[str]]: """获取合并后的关键词集(带缓存)""" if cls._combined_keywords is None: cls._combined_keywords = { "opening": cls.STANDARD_KEYWORDS["opening"] + cls.GUIZHOU_KEYWORDS["opening"], "closing": cls.STANDARD_KEYWORDS["closing"] + cls.GUIZHOU_KEYWORDS["closing"], "forbidden": cls.STANDARD_KEYWORDS["forbidden"] + cls.GUIZHOU_KEYWORDS["forbidden"] } return cls._combined_keywords @classmethod def get_compiled_opening(cls) -> List[re.Pattern]: """获取预编译的开场关键词正则表达式(带缓存)""" if cls._compiled_opening is None: keywords = cls.get_combined_keywords()["opening"] cls._compiled_opening = [re.compile(re.escape(kw)) for kw in keywords] return cls._compiled_opening @classmethod def get_compiled_closing(cls) -> List[re.Pattern]: """获取预编译的结束关键词正则表达式(带缓存)""" if cls._compiled_closing is None: keywords = cls.get_combined_keywords()["closing"] cls._compiled_closing = [re.compile(re.escape(kw)) for kw in keywords] return cls._compiled_closing @classmethod def get_asr_hotwords(cls) -> List[str]: """获取ASR热词列表(带缓存)""" if cls._hotwords is None: combined = cls.get_combined_keywords() cls._hotwords = sorted(set( combined["opening"] + combined["closing"] )) return cls._hotwords @classmethod def preprocess_text(cls, texts: List[str]) -> List[str]: """将方言文本转换为标准表达(使用一次性替换)""" if cls._dialect_pattern is None: # 创建方言替换的正则表达式(一次性) # 修复:按长度降序排序,确保长词优先匹配 keys = sorted(cls.DIALECT_MAPPING.keys(), key=len, reverse=True) pattern_str = "|".join(re.escape(key) for key in keys) cls._dialect_pattern = re.compile(pattern_str) def replace_match(match): return cls.DIALECT_MAPPING[match.group(0)] return [cls._dialect_pattern.sub(replace_match, text) for text in texts] # ====================== 系统配置管理器 ====================== class ConfigManager: """管理应用程序配置""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_config() return cls._instance def _init_config(self): """初始化默认配置""" self.config = { "model_paths": { "asr": "./models/iic-speech_paraformer-large-vad-punc-spk_asr_nat-zh-cn", "sentiment": "./models/IDEA-CCNL-Erlangshen-Roberta-110M-Sentiment" }, "sample_rate": 16000, "silence_thresh": -40, "min_silence_len": 1000, "max_concurrent": 1, "dialect_config": "guizhou", "max_audio_duration": 3600 # 最大音频时长(秒) } self.load_config() def load_config(self): """从文件加载配置""" try: if os.path.exists("config.json"): with open("config.json", "r") as f: self.config.update(json.load(f)) except: pass def save_config(self): """保存配置到文件""" try: with open("config.json", "w") as f: json.dump(self.config, f, indent=2) except: pass def get(self, key: str, default=None): """获取配置值""" return self.config.get(key, default) def set(self, key: str, value): """设置配置值""" self.config[key] = value self.save_config() # ====================== 音频处理工具(优化版) ====================== class AudioProcessor: """处理音频转换和特征提取(避免重复加载)""" SUPPORTED_FORMATS = ('.mp3', '.wav', '.amr', '.m4a') @staticmethod def convert_to_wav(input_path: str, temp_dir: str) -> Optional[List[str]]: """将音频转换为WAV格式(在静音处分割)""" try: os.makedirs(temp_dir, exist_ok=True) # 检查文件格式 if not any(input_path.lower().endswith(ext) for ext in AudioProcessor.SUPPORTED_FORMATS): raise ValueError(f"不支持的音频格式: {os.path.splitext(input_path)[1]}") if input_path.lower().endswith('.wav'): return [input_path] # 已经是WAV格式 # 检查ffmpeg是否可用 try: AudioSegment.converter = "ffmpeg" # 显式指定ffmpeg audio = AudioSegment.from_file(input_path) except FileNotFoundError: print("错误: 未找到ffmpeg,请安装并添加到环境变量") return None # 检查音频时长是否超过限制 max_duration = ConfigManager().get("max_audio_duration", 3600) * 1000 # 毫秒 if len(audio) > max_duration: return AudioProcessor._split_long_audio(audio, input_path, temp_dir) else: return AudioProcessor._convert_single_audio(audio, input_path, temp_dir) except Exception as e: print(f"格式转换失败: {str(e)}") return None @staticmethod def _split_long_audio(audio: AudioSegment, input_path: str, temp_dir: str) -> List[str]: """分割长音频文件""" wav_paths = [] # 在静音处分割音频 chunks = split_on_silence( audio, min_silence_len=ConfigManager().get("min_silence_len", 1000), silence_thresh=ConfigManager().get("silence_thresh", -40), keep_silence=500 ) # 合并小片段,避免分段过多 merged_chunks = [] current_chunk = AudioSegment.empty() for chunk in chunks: if len(current_chunk) + len(chunk) < 5 * 60 * 1000: # 5分钟 current_chunk += chunk else: if len(current_chunk) > 0: merged_chunks.append(current_chunk) current_chunk = chunk if len(current_chunk) > 0: merged_chunks.append(current_chunk) # 导出分段音频 sample_rate = ConfigManager().get("sample_rate", 16000) for i, chunk in enumerate(merged_chunks): chunk = chunk.set_frame_rate(sample_rate).set_channels(1) chunk_path = os.path.join( temp_dir, f"{os.path.splitext(os.path.basename(input_path))[0]}_part{i + 1}.wav" ) chunk.export(chunk_path, format="wav") wav_paths.append(chunk_path) return wav_paths @staticmethod def _convert_single_audio(audio: AudioSegment, input_path: str, temp_dir: str) -> List[str]: """转换单个短音频文件""" sample_rate = ConfigManager().get("sample_rate", 16000) audio = audio.set_frame_rate(sample_rate).set_channels(1) wav_path = os.path.join(temp_dir, os.path.splitext(os.path.basename(input_path))[0] + ".wav") audio.export(wav_path, format="wav") return [wav_path] @staticmethod def extract_features_from_audio(y: np.ndarray, sr: int) -> Dict[str, float]: """从已加载的音频数据中提取特征(避免重复加载)""" try: duration = librosa.get_duration(y=y, sr=sr) segment_length = 60 # 60秒分段 total_segments = max(1, int(np.ceil(duration / segment_length))) syllable_rates = [] volume_stabilities = [] for i in range(total_segments): start = i * segment_length end = min((i + 1) * segment_length, duration) y_segment = y[int(start * sr):int(end * sr)] if len(y_segment) == 0: continue # 语速计算 intervals = librosa.effects.split(y_segment, top_db=20) speech_duration = sum(end - start for start, end in intervals) / sr if speech_duration > 0.1: # 避免极短语音导致的异常 syllable_rate = len(intervals) / speech_duration else: syllable_rate = 0 syllable_rates.append(syllable_rate) # 音量稳定性 rms = librosa.feature.rms(y=y_segment)[0] if len(rms) > 0 and np.mean(rms) > 0: volume_stability = np.std(rms) / np.mean(rms) volume_stabilities.append(volume_stability) return { "duration": duration, "syllable_rate": round(np.mean(syllable_rates) if syllable_rates else 0, 2), "volume_stability": round(np.mean(volume_stabilities) if volume_stabilities else 0, 4) } except: return {"duration": 0, "syllable_rate": 0, "volume_stability": 0} # ====================== 模型加载器(优化版) ====================== class ModelLoader: """加载和管理AI模型(使用RLock)""" asr_pipeline = None sentiment_model = None sentiment_tokenizer = None model_lock = RLock() # 使用RLock代替Lock models_loaded = False # 添加模型加载状态标志 @classmethod def load_models(cls): """加载所有模型""" config = ConfigManager() # 加载ASR模型 if not cls.asr_pipeline: with cls.model_lock: if not cls.asr_pipeline: # 双重检查锁定 cls.load_asr_model(config.get("model_paths")["asr"]) # 加载情感分析模型 if not cls.sentiment_model: with cls.model_lock: if not cls.sentiment_model: # 双重检查锁定 cls.load_sentiment_model(config.get("model_paths")["sentiment"]) cls.models_loaded = True @classmethod def reload_models(cls): """重新加载模型(配置变更后)""" with cls.model_lock: cls.asr_pipeline = None cls.sentiment_model = None cls.sentiment_tokenizer = None gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() cls.load_models() @classmethod def load_asr_model(cls, model_path: str): """加载语音识别模型""" try: if not os.path.exists(model_path): raise FileNotFoundError(f"ASR模型路径不存在: {model_path}") asr_kwargs = {} if hasattr(torch, 'quantization'): asr_kwargs['quantize'] = 'int8' print("启用ASR模型量化") cls.asr_pipeline = pipeline( task=Tasks.auto_speech_recognition, model=model_path, device='cuda' if torch.cuda.is_available() else 'cpu', **asr_kwargs ) print("ASR模型加载完成") except Exception as e: print(f"加载ASR模型失败: {str(e)}") raise @classmethod def load_sentiment_model(cls, model_path: str): """加载情感分析模型""" try: if not os.path.exists(model_path): raise FileNotFoundError(f"情感分析模型路径不存在: {model_path}") cls.sentiment_model = AutoModelForSequenceClassification.from_pretrained(model_path) cls.sentiment_tokenizer = AutoTokenizer.from_pretrained(model_path) if torch.cuda.is_available(): cls.sentiment_model = cls.sentiment_model.cuda() print("情感分析模型加载完成") except Exception as e: print(f"加载情感分析模型失败: {str(e)}") raise # ====================== 核心分析线程(优化版) ====================== class AnalysisThread(QThread): progress_updated = pyqtSignal(int, str, str) result_ready = pyqtSignal(dict) finished_all = pyqtSignal() error_occurred = pyqtSignal(str, str) memory_warning = pyqtSignal() resource_cleanup = pyqtSignal() def __init__(self, audio_paths: List[str], temp_dir: str = "temp_wav"): super().__init__() self.audio_paths = audio_paths self.temp_dir = temp_dir self.is_running = True self.current_file = "" self.max_concurrent = min( ConfigManager().get("max_concurrent", 1), self.get_max_concurrent_tasks() ) self.resource_monitor = ResourceMonitor() self.semaphore = Semaphore(self.max_concurrent) os.makedirs(temp_dir, exist_ok=True) def run(self): try: if not ModelLoader.models_loaded: self.error_occurred.emit("模型未加载", "请等待模型加载完成后再开始分析") return self.progress_updated.emit(0, f"最大并行任务数: {self.max_concurrent}", "") # 使用线程池并行处理 with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: # 创建任务 future_to_path = {} for path in self.audio_paths: if not self.is_running: break # 使用信号量控制并发 self.semaphore.acquire() batch_size = self.get_available_batch_size() future = executor.submit(self.analyze_audio, path, batch_size) future_to_path[future] = path future.add_done_callback(lambda f: self.semaphore.release()) # 处理完成的任务 for i, future in enumerate(concurrent.futures.as_completed(future_to_path)): if not self.is_running: break path = future_to_path[future] self.current_file = os.path.basename(path) # 内存检查 if self.check_memory_usage(): self.memory_warning.emit() self.is_running = False break try: result = future.result() if result: self.result_ready.emit(result) # 更新进度 progress = int((i + 1) / len(self.audio_paths) * 100) self.progress_updated.emit( progress, f"完成: {self.current_file} ({i + 1}/{len(self.audio_paths)})", self.current_file ) except Exception as e: result = { "file_name": self.current_file, "status": "error", "error": f"分析失败: {str(e)}" } self.result_ready.emit(result) # 分析完成后 if self.is_running: self.finished_all.emit() except Exception as e: self.error_occurred.emit("系统错误", str(e)) traceback.print_exc() finally: # 确保资源清理 self.resource_cleanup.emit() self.cleanup_resources() def analyze_audio(self, audio_path: str, batch_size: int) -> Dict: """分析单个音频文件(整合所有优化)""" result = { "file_name": os.path.basename(audio_path), "status": "processing" } wav_paths = [] try: # 1. 音频格式转换 wav_paths = AudioProcessor.convert_to_wav(audio_path, self.temp_dir) if not wav_paths: result["error"] = "格式转换失败(请检查ffmpeg是否安装)" result["status"] = "error" return result # 2. 提取音频特征(合并所有分段) audio_features = self._extract_audio_features(wav_paths) result.update(audio_features) result["duration_str"] = self._format_duration(audio_features["duration"]) # 3. 语音识别与处理 all_segments, full_text = self._process_asr_segments(wav_paths) # 4. 说话人区分(使用优化后的方法) agent_segments, customer_segments = self.identify_speakers(all_segments) # 5. 生成带说话人标签的文本 labeled_text = self._generate_labeled_text(all_segments, agent_segments, customer_segments) result["asr_text"] = labeled_text.strip() # 6. 文本分析(包含方言预处理) text_analysis = self._analyze_text(agent_segments, customer_segments, batch_size) result.update(text_analysis) # 7. 服务规范检查(使用方言适配的关键词) service_check = self._check_service_rules(agent_segments) result.update(service_check) # 8. 问题解决率(上下文关联) result["issue_resolved"] = self._check_issue_resolution(customer_segments, agent_segments) result["status"] = "success" except Exception as e: result["error"] = f"分析失败: {str(e)}" result["status"] = "error" finally: # 清理临时文件 self._cleanup_temp_files(wav_paths) # 显式内存清理 self.cleanup_resources() return result def identify_speakers(self, segments: List[Dict]) -> Tuple[List[Dict], List[Dict]]: """区分客服与客户(优化版:子串匹配+提前终止)""" if not segments: return [], [] # 获取预编译的正则表达式 opening_patterns = DialectConfig.get_compiled_opening() closing_patterns = DialectConfig.get_compiled_closing() agent_id = None found_by_opening = False found_by_closing = False # 策略1:在前3段中查找开场白关键词(提前终止) for seg in segments[:3]: text = seg["text"] # 检查是否包含任意开场关键词 for pattern in opening_patterns: if pattern.search(text): agent_id = seg["spk_id"] found_by_opening = True break # 找到即终止内层循环 if found_by_opening: break # 找到即终止外层循环 # 策略2:在后3段中查找结束语关键词(提前终止) if not found_by_opening: # 逆序遍历最后3段 for seg in reversed(segments[-3:] if len(segments) >= 3 else segments): text = seg["text"] # 检查是否包含任意结束关键词 for pattern in closing_patterns: if pattern.search(text): agent_id = seg["spk_id"] found_by_closing = True break # 找到即终止内层循环 if found_by_closing: break # 找到即终止外层循环 # 策略3:如果前两种策略未找到,使用说话频率最高的作为客服 if agent_id is None: spk_counts = {} for seg in segments: spk_id = seg["spk_id"] spk_counts[spk_id] = spk_counts.get(spk_id, 0) + 1 if spk_counts: agent_id = max(spk_counts, key=spk_counts.get) else: return [], [] # 如果没有有效的agent_id,返回空列表 # 使用集合存储agent的spk_id,提高查询效率 agent_spk_ids = {agent_id} return ( [seg for seg in segments if seg["spk_id"] in agent_spk_ids], [seg for seg in segments if seg["spk_id"] not in agent_spk_ids] ) def _analyze_text(self, agent_segments: List[Dict], customer_segments: List[Dict], batch_size: int) -> Dict: """文本情感分析(优化版:向量化批处理)""" def analyze_speaker(segments: List[Dict], speaker_type: str) -> Dict: if not segments: return { f"{speaker_type}_negative": 0.0, f"{speaker_type}_neutral": 1.0, f"{speaker_type}_positive": 0.0, f"{speaker_type}_emotions": "无" } # 方言预处理 - 使用优化的一次性替换 texts = [seg["text"] for seg in segments] processed_texts = DialectConfig.preprocess_text(texts) # 使用DataLoader进行批处理 with ModelLoader.model_lock: inputs = ModelLoader.sentiment_tokenizer( processed_texts, padding=True, truncation=True, max_length=128, return_tensors="pt" ) # 创建TensorDataset和DataLoader dataset = TensorDataset(inputs['input_ids'], inputs['attention_mask']) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False) device = "cuda" if torch.cuda.is_available() else "cpu" sentiment_dist = [] emotions = [] # 批量处理 for batch in dataloader: input_ids, attention_mask = batch inputs = { 'input_ids': input_ids.to(device), 'attention_mask': attention_mask.to(device) } with torch.no_grad(): outputs = ModelLoader.sentiment_model(**inputs) batch_probs = torch.nn.functional.softmax(outputs.logits, dim=-1) sentiment_dist.append(batch_probs.cpu()) # 情绪识别(批量) emotion_keywords = ["愤怒", "生气", "鬼火", "不耐烦", "搞哪样嘛", "恼火", "背时"] for text in processed_texts: if any(kw in text for kw in emotion_keywords): if any(kw in text for kw in ["愤怒", "生气", "鬼火", "恼火"]): emotions.append("愤怒") elif any(kw in text for kw in ["不耐烦", "搞哪样嘛"]): emotions.append("不耐烦") elif "背时" in text: emotions.append("沮丧") # 合并结果 if sentiment_dist: all_probs = torch.cat(sentiment_dist, dim=0) avg_sentiment = torch.mean(all_probs, dim=0).tolist() else: avg_sentiment = [0.0, 1.0, 0.0] # 默认值 return { f"{speaker_type}_negative": round(avg_sentiment[0], 4), f"{speaker_type}_neutral": round(avg_sentiment[1], 4), f"{speaker_type}_positive": round(avg_sentiment[2], 4), f"{speaker_type}_emotions": ",".join(set(emotions)) if emotions else "无" } return { **analyze_speaker(agent_segments, "agent"), **analyze_speaker(customer_segments, "customer") } # ====================== 辅助方法 ====================== def get_available_batch_size(self) -> int: """根据GPU内存动态调整batch size(考虑并行)""" if not torch.cuda.is_available(): return 4 # CPU默认批次 total_mem = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) # GB per_task_mem = total_mem / self.max_concurrent # 修正批次大小逻辑:显存越少,批次越小 if per_task_mem < 2: return 2 elif per_task_mem < 4: return 4 else: return 8 def get_max_concurrent_tasks(self) -> int: """根据系统资源计算最大并行任务数""" if torch.cuda.is_available(): total_mem = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) if total_mem < 6: return 1 elif total_mem < 12: return 2 else: return 3 else: # CPU模式下根据核心数设置 return max(1, os.cpu_count() // 2) def check_memory_usage(self) -> bool: try: mem_percent = self.resource_monitor.memory_percent() return mem_percent.get("cpu", 0) > 85 or mem_percent.get("gpu", 0) > 85 except: return False def _extract_audio_features(self, wav_paths: List[str]) -> Dict[str, float]: """提取音频特征(合并所有分段)""" combined_y = np.array([], dtype=np.float32) sr = ConfigManager().get("sample_rate", 16000) for path in wav_paths: y, _ = librosa.load(path, sr=sr) combined_y = np.concatenate((combined_y, y)) return AudioProcessor.extract_features_from_audio(combined_y, sr) def _process_asr_segments(self, wav_paths: List[str]) -> Tuple[List[Dict], str]: """处理ASR分段""" segments = [] full_text = "" for path in wav_paths: result = ModelLoader.asr_pipeline( path, hotwords=DialectConfig.get_asr_hotwords(), output_dir=None ) for seg in result[0]["sentences"]: segments.append({ "start": seg["start"], "end": seg["end"], "text": seg["text"], "spk_id": seg.get("spk_id", "0") }) full_text += seg["text"] + " " return segments, full_text.strip() def _generate_labeled_text(self, all_segments: List[Dict], agent_segments: List[Dict], customer_segments: List[Dict]) -> str: """生成带说话人标签的文本""" agent_spk_id = agent_segments[0]["spk_id"] if agent_segments else None customer_spk_id = customer_segments[0]["spk_id"] if customer_segments else None labeled_text = [] for seg in all_segments: if seg["spk_id"] == agent_spk_id: speaker = "客服" elif seg["spk_id"] == customer_spk_id: speaker = "客户" else: speaker = f"说话人{seg['spk_id']}" labeled_text.append(f"[{speaker}]: {seg['text']}") return "\n".join(labeled_text) def _check_service_rules(self, agent_segments: List[Dict]) -> Dict: """检查服务规范""" forbidden_keywords = DialectConfig.get_combined_keywords()["forbidden"] found_forbidden = [] found_opening = False found_closing = False # 检查开场白(前3段) for seg in agent_segments[:3]: text = seg["text"] if any(kw in text for kw in DialectConfig.get_combined_keywords()["opening"]): found_opening = True break # 检查结束语(后3段) for seg in reversed(agent_segments[-3:] if len(agent_segments) >= 3 else agent_segments): text = seg["text"] if any(kw in text for kw in DialectConfig.get_combined_keywords()["closing"]): found_closing = True break # 检查禁用词 for seg in agent_segments: text = seg["text"] for kw in forbidden_keywords: if kw in text: found_forbidden.append(kw) break return { "opening_found": found_opening, "closing_found": found_closing, "forbidden_words": ", ".join(set(found_forbidden)) if found_forbidden else "无" } def _check_issue_resolution(self, customer_segments: List[Dict], agent_segments: List[Dict]) -> bool: """检查问题是否解决(增强版)""" if not customer_segments or not agent_segments: return False # 提取所有文本 customer_texts = [seg["text"] for seg in customer_segments] agent_texts = [seg["text"] for seg in agent_segments] full_conversation = " ".join(customer_texts + agent_texts) # 问题解决关键词 resolution_keywords = ["解决", "处理", "完成", "已", "好了", "可以了", "没问题"] thank_keywords = ["谢谢", "感谢", "多谢"] negative_keywords = ["没解决", "不行", "不对", "还是", "仍然", "再"] # 检查是否有负面词汇 has_negative = any(kw in full_conversation for kw in negative_keywords) if has_negative: return False # 检查客户最后是否表达感谢 last_customer_text = customer_segments[-1]["text"] if any(kw in last_customer_text for kw in thank_keywords): return True # 检查是否有解决关键词 if any(kw in full_conversation for kw in resolution_keywords): return True # 检查客服是否确认解决 for agent_text in reversed(agent_texts[-3:]): # 检查最后3段 if any(kw in agent_text for kw in resolution_keywords): return True return False def _cleanup_temp_files(self, paths: List[str]): """清理临时文件(增强兼容性)""" for path in paths: try: if os.path.exists(path): # Windows系统可能需要多次尝试 for _ in range(3): try: os.remove(path) break except PermissionError: time.sleep(0.1) except: pass def _format_duration(self, seconds: float) -> str: """将秒转换为时分秒格式""" minutes, seconds = divmod(int(seconds), 60) hours, minutes = divmod(minutes, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def cleanup_resources(self): """显式清理资源""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def stop(self): """停止分析""" self.is_running = False # ====================== 模型加载线程 ====================== class ModelLoadThread(QThread): progress_updated = pyqtSignal(int, str) finished = pyqtSignal(bool, str) def run(self): try: # 检查模型路径 config = ConfigManager().get("model_paths") if not os.path.exists(config["asr"]): self.finished.emit(False, "ASR模型路径不存在") return if not os.path.exists(config["sentiment"]): self.finished.emit(False, "情感分析模型路径不存在") return self.progress_updated.emit(20, "加载语音识别模型...") ModelLoader.load_asr_model(config["asr"]) self.progress_updated.emit(60, "加载情感分析模型...") ModelLoader.load_sentiment_model(config["sentiment"]) self.progress_updated.emit(100, "模型加载完成") self.finished.emit(True, "模型加载成功。建议:可通过设置界面修改模型路径") except Exception as e: self.finished.emit(False, f"模型加载失败: {str(e)}。建议:检查模型路径是否正确,或重新下载模型文件") # ====================== GUI主界面 ====================== class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("贵州方言客服质检系统") self.setGeometry(100, 100, 1200, 800) self.setup_ui() self.setup_menu() self.analysis_thread = None self.model_load_thread = None self.temp_dir = "temp_wav" os.makedirs(self.temp_dir, exist_ok=True) self.model_loaded = False def setup_ui(self): """设置用户界面""" # 主布局 main_widget = QWidget() main_layout = QVBoxLayout() main_widget.setLayout(main_layout) self.setCentralWidget(main_widget) # 工具栏 toolbar = QToolBar("主工具栏") toolbar.setIconSize(QSize(24, 24)) self.addToolBar(toolbar) # 添加文件按钮 add_file_action = QAction(QIcon("icons/add.png"), "添加文件", self) add_file_action.triggered.connect(self.add_files) toolbar.addAction(add_file_action) # 开始分析按钮 analyze_action = QAction(QIcon("icons/start.png"), "开始分析", self) analyze_action.triggered.connect(self.start_analysis) toolbar.addAction(analyze_action) # 停止按钮 stop_action = QAction(QIcon("icons/stop.png"), "停止分析", self) stop_action.triggered.connect(self.stop_analysis) toolbar.addAction(stop_action) # 设置按钮 settings_action = QAction(QIcon("icons/settings.png"), "设置", self) settings_action.triggered.connect(self.open_settings) toolbar.addAction(settings_action) # 分割布局 splitter = QSplitter(Qt.Horizontal) main_layout.addWidget(splitter) # 左侧文件列表 left_widget = QWidget() left_layout = QVBoxLayout() left_widget.setLayout(left_layout) file_list_label = QLabel("待分析文件列表") file_list_label.setFont(QFont("Arial", 12, QFont.Bold)) left_layout.addWidget(file_list_label) self.file_list = QListWidget() self.file_list.setSelectionMode(QListWidget.ExtendedSelection) left_layout.addWidget(self.file_list) # 右侧结果区域 right_widget = QWidget() right_layout = QVBoxLayout() right_widget.setLayout(right_layout) # 进度条 progress_label = QLabel("分析进度") progress_label.setFont(QFont("Arial", 12, QFont.Bold)) right_layout.addWidget(progress_label) self.progress_bar = QProgressBar() self.progress_bar.setRange(0, 100) self.progress_bar.setTextVisible(True) right_layout.addWidget(self.progress_bar) # 当前文件标签 self.current_file_label = QLabel("当前文件: 无") right_layout.addWidget(self.current_file_label) # 结果标签页 self.tab_widget = QTabWidget() right_layout.addWidget(self.tab_widget, 1) # 文本结果标签页 text_tab = QWidget() text_layout = QVBoxLayout() text_tab.setLayout(text_layout) self.text_result = QTextEdit() self.text_result.setReadOnly(True) text_layout.addWidget(self.text_result) self.tab_widget.addTab(text_tab, "文本结果") # 详细结果标签页 detail_tab = QWidget() detail_layout = QVBoxLayout() detail_tab.setLayout(detail_layout) self.result_table = QTableWidget() self.result_table.setColumnCount(10) self.result_table.setHorizontalHeaderLabels([ "文件名", "时长", "语速", "音量稳定性", "客服情感", "客户情感", "开场白", "结束语", "禁用词", "问题解决" ]) self.result_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) detail_layout.addWidget(self.result_table) self.tab_widget.addTab(detail_tab, "详细结果") # 添加左右部件到分割器 splitter.addWidget(left_widget) splitter.addWidget(right_widget) splitter.setSizes([300, 900]) def setup_menu(self): """设置菜单栏""" menu_bar = self.menuBar() # 文件菜单 file_menu = menu_bar.addMenu("文件") add_file_action = QAction("添加文件", self) add_file_action.triggered.connect(self.add_files) file_menu.addAction(add_file_action) export_action = QAction("导出结果", self) export_action.triggered.connect(self.export_results) file_menu.addAction(export_action) exit_action = QAction("退出", self) exit_action.triggered.connect(self.close) file_menu.addAction(exit_action) # 分析菜单 analysis_menu = menu_bar.addMenu("分析") start_action = QAction("开始分析", self) start_action.triggered.connect(self.start_analysis) analysis_menu.addAction(start_action) stop_action = QAction("停止分析", self) stop_action.triggered.connect(self.stop_analysis) analysis_menu.addAction(stop_action) # 设置菜单 settings_menu = menu_bar.addMenu("设置") config_action = QAction("系统配置", self) config_action.triggered.connect(self.open_settings) settings_menu.addAction(config_action) model_action = QAction("加载模型", self) model_action.triggered.connect(self.load_models) settings_menu.addAction(model_action) def add_files(self): """添加文件到分析列表""" files, _ = QFileDialog.getOpenFileNames( self, "选择音频文件", "", "音频文件 (*.mp3 *.wav *.amr *.m4a)" ) if files: for file in files: self.file_list.addItem(file) def start_analysis(self): """开始分析""" if self.file_list.count() == 0: QMessageBox.warning(self, "警告", "请先添加要分析的音频文件") return if not self.model_loaded: QMessageBox.warning(self, "警告", "模型未加载,请先加载模型") return # 获取文件路径 audio_paths = [self.file_list.item(i).text() for i in range(self.file_list.count())] # 清空结果 self.text_result.clear() self.result_table.setRowCount(0) # 创建分析线程 self.analysis_thread = AnalysisThread(audio_paths, self.temp_dir) # 连接信号 self.analysis_thread.progress_updated.connect(self.update_progress) self.analysis_thread.result_ready.connect(self.handle_result) self.analysis_thread.finished_all.connect(self.analysis_finished) self.analysis_thread.error_occurred.connect(self.show_error) self.analysis_thread.memory_warning.connect(self.handle_memory_warning) self.analysis_thread.resource_cleanup.connect(self.cleanup_resources) # 启动线程 self.analysis_thread.start() def stop_analysis(self): """停止分析""" if self.analysis_thread and self.analysis_thread.isRunning(): self.analysis_thread.stop() self.analysis_thread.wait() QMessageBox.information(self, "信息", "分析已停止") def load_models(self): """加载模型""" if self.model_load_thread and self.model_load_thread.isRunning(): return self.model_load_thread = ModelLoadThread() self.model_load_thread.progress_updated.connect( lambda value, msg: self.progress_bar.setValue(value) ) self.model_load_thread.finished.connect(self.handle_model_load_result) self.model_load_thread.start() def update_progress(self, progress: int, message: str, current_file: str): """更新进度""" self.progress_bar.setValue(progress) self.current_file_label.setText(f"当前文件: {current_file}") def handle_result(self, result: Dict): """处理分析结果""" # 添加到文本结果 self.text_result.append(f"文件: {result['file_name']}") self.text_result.append(f"状态: {result['status']}") if result["status"] == "success": self.text_result.append(f"时长: {result['duration_str']}") self.text_result.append(f"语速: {result['syllable_rate']} 音节/秒") self.text_result.append(f"音量稳定性: {result['volume_stability']}") self.text_result.append(f"客服情感: 负面({result['agent_negative']:.2%}) " f"中性({result['agent_neutral']:.2%}) " f"正面({result['agent_positive']:.2%})") self.text_result.append(f"客服情绪: {result['agent_emotions']}") self.text_result.append(f"客户情感: 负面({result['customer_negative']:.2%}) " f"中性({result['customer_neutral']:.2%}) " f"正面({result['customer_positive']:.2%})") self.text_result.append(f"客户情绪: {result['customer_emotions']}") self.text_result.append(f"开场白: {'有' if result['opening_found'] else '无'}") self.text_result.append(f"结束语: {'有' if result['closing_found'] else '无'}") self.text_result.append(f"禁用词: {result['forbidden_words']}") self.text_result.append(f"问题解决: {'是' if result['issue_resolved'] else '否'}") self.text_result.append("\n=== 对话文本 ===\n") self.text_result.append(result["asr_text"]) self.text_result.append("\n" + "=" * 50 + "\n") # 添加到结果表格 row = self.result_table.rowCount() self.result_table.insertRow(row) self.result_table.setItem(row, 0, QTableWidgetItem(result["file_name"])) self.result_table.setItem(row, 1, QTableWidgetItem(result["duration_str"])) self.result_table.setItem(row, 2, QTableWidgetItem(str(result["syllable_rate"]))) self.result_table.setItem(row, 3, QTableWidgetItem(str(result["volume_stability"]))) self.result_table.setItem(row, 4, QTableWidgetItem( f"负:{result['agent_negative']:.2f} 中:{result['agent_neutral']:.2f} 正:{result['agent_positive']:.2f}" )) self.result_table.setItem(row, 5, QTableWidgetItem( f"负:{result['customer_negative']:.2f} 中:{result['customer_neutral']:.2f} 正:{result['customer_positive']:.2f}" )) self.result_table.setItem(row, 6, QTableWidgetItem("是" if result["opening_found"] else "否")) self.result_table.setItem(row, 7, QTableWidgetItem("是" if result["closing_found"] else "否")) self.result_table.setItem(row, 8, QTableWidgetItem(result["forbidden_words"])) self.result_table.setItem(row, 9, QTableWidgetItem("是" if result["issue_resolved"] else "否")) # 根据结果着色 if not result["opening_found"]: self.result_table.item(row, 6).setBackground(QColor(255, 200, 200)) if not result["closing_found"]: self.result_table.item(row, 7).setBackground(QColor(255, 200, 200)) if result["forbidden_words"] != "无": self.result_table.item(row, 8).setBackground(QColor(255, 200, 200)) if not result["issue_resolved"]: self.result_table.item(row, 9).setBackground(QColor(255, 200, 200)) def analysis_finished(self): """分析完成""" QMessageBox.information(self, "完成", "所有音频分析完成") self.progress_bar.setValue(100) def show_error(self, title: str, message: str): """显示错误信息""" QMessageBox.critical(self, title, message) def handle_memory_warning(self): """处理内存警告""" QMessageBox.warning(self, "内存警告", "内存使用过高,分析已停止。请关闭其他应用程序后重试") def cleanup_resources(self): """清理资源""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def handle_model_load_result(self, success: bool, message: str): """处理模型加载结果""" if success: self.model_loaded = True QMessageBox.information(self, "成功", message) else: QMessageBox.critical(self, "错误", message) def open_settings(self): """打开设置对话框""" settings_dialog = QDialog(self) settings_dialog.setWindowTitle("系统设置") settings_dialog.setFixedSize(500, 400) layout = QVBoxLayout() # ASR模型路径 asr_layout = QHBoxLayout() asr_label = QLabel("ASR模型路径:") asr_line = QLineEdit(ConfigManager().get("model_paths")["asr"]) asr_browse = QPushButton("浏览...") def browse_asr(): path = QFileDialog.getExistingDirectory(self, "选择ASR模型目录") if path: asr_line.setText(path) asr_browse.clicked.connect(browse_asr) asr_layout.addWidget(asr_label) asr_layout.addWidget(asr_line) asr_layout.addWidget(asr_browse) layout.addLayout(asr_layout) # 情感分析模型路径 sentiment_layout = QHBoxLayout() sentiment_label = QLabel("情感模型路径:") sentiment_line = QLineEdit(ConfigManager().get("model_paths")["sentiment"]) sentiment_browse = QPushButton("浏览...") def browse_sentiment(): path = QFileDialog.getExistingDirectory(self, "选择情感模型目录") if path: sentiment_line.setText(path) sentiment_browse.clicked.connect(browse_sentiment) sentiment_layout.addWidget(sentiment_label) sentiment_layout.addWidget(sentiment_line) sentiment_layout.addWidget(sentiment_browse) layout.addLayout(sentiment_layout) # 并发设置 concurrent_layout = QHBoxLayout() concurrent_label = QLabel("最大并发任务:") concurrent_spin = QSpinBox() concurrent_spin.setRange(1, 8) concurrent_spin.setValue(ConfigManager().get("max_concurrent", 1)) concurrent_layout.addWidget(concurrent_label) concurrent_layout.addWidget(concurrent_spin) layout.addLayout(concurrent_layout) # 方言设置 dialect_layout = QHBoxLayout() dialect_label = QLabel("方言设置:") dialect_combo = QComboBox() dialect_combo.addItems(["标准普通话", "贵州方言"]) dialect_combo.setCurrentIndex(1 if ConfigManager().get("dialect_config") == "guizhou" else 0) dialect_layout.addWidget(dialect_label) dialect_layout.addWidget(dialect_combo) layout.addLayout(dialect_layout) # 音频时长限制 duration_layout = QHBoxLayout() duration_label = QLabel("最大音频时长(秒):") duration_spin = QSpinBox() duration_spin.setRange(60, 86400) # 1分钟到24小时 duration_spin.setValue(ConfigManager().get("max_audio_duration", 3600)) duration_layout.addWidget(duration_label) duration_layout.addWidget(duration_spin) layout.addLayout(duration_layout) # 按钮 button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) button_box.accepted.connect(settings_dialog.accept) button_box.rejected.connect(settings_dialog.reject) layout.addWidget(button_box) settings_dialog.setLayout(layout) if settings_dialog.exec_() == QDialog.Accepted: # 保存设置 ConfigManager().set("model_paths", { "asr": asr_line.text(), "sentiment": sentiment_line.text() }) ConfigManager().set("max_concurrent", concurrent_spin.value()) ConfigManager().set("dialect_config", "guizhou" if dialect_combo.currentIndex() == 1 else "standard") ConfigManager().set("max_audio_duration", duration_spin.value()) # 重新加载模型 ModelLoader.reload_models() def export_results(self): """导出结果""" if self.result_table.rowCount() == 0: QMessageBox.warning(self, "警告", "没有可导出的结果") return path, _ = QFileDialog.getSaveFileName( self, "保存结果", "", "CSV文件 (*.csv)" ) if path: try: with open(path, "w", encoding="utf-8") as f: # 写入表头 headers = [] for col in range(self.result_table.columnCount()): headers.append(self.result_table.horizontalHeaderItem(col).text()) f.write(",".join(headers) + "\n") # 写入数据 for row in range(self.result_table.rowCount()): row_data = [] for col in range(self.result_table.columnCount()): item = self.result_table.item(row, col) row_data.append(item.text() if item else "") f.write(",".join(row_data) + "\n") QMessageBox.information(self, "成功", f"结果已导出到: {path}") except Exception as e: QMessageBox.critical(self, "错误", f"导出失败: {str(e)}") def closeEvent(self, event): """关闭事件处理""" if self.analysis_thread and self.analysis_thread.isRunning(): self.analysis_thread.stop() self.analysis_thread.wait() # 清理临时目录(增强兼容性) try: for file in os.listdir(self.temp_dir): file_path = os.path.join(self.temp_dir, file) if os.path.isfile(file_path): # Windows系统可能需要多次尝试 for _ in range(3): try: os.remove(file_path) break except PermissionError: time.sleep(0.1) os.rmdir(self.temp_dir) except: pass event.accept() # ====================== 程序入口 ====================== if __name__ == "__main__": torch.set_num_threads(4) # 限制CPU线程数 app = QApplication(sys.argv) # 设置应用样式 app.setStyle('Fusion') window = MainWindow() window.show() sys.exit(app.exec_())

filetype

import os # 定义绝对路径(根据你的实际情况修改) log_dir = r"D:\jupyter notebook\logs" # 2. 确保目录存在且可写(修复核心问题[^1]) os.makedirs(log_dir, exist_ok=True) # 自动创建目录 # 3. 验证路径属性(调试用) print(f"路径类型: {'文件' if os.path.isfile(log_dir) else '目录'}") print(f"路径存在: {os.path.exists(log_dir)}") tensorboard_cb = tf.keras.callbacks.TensorBoard( log_dir=log_dir, # 使用绝对路径 histogram_freq=1, write_graph=True, write_images=True ) print("\n训练BERT模型...") import tensorflow as tf from transformers import BertTokenizer, TFBertForSequenceClassification from transformers import InputExample, InputFeatures # 确保使用匹配的分词器和模型[^1][^4] model_path = r"D:\jupyter notebook\models" tokenizer = BertTokenizer.from_pretrained(model_path, unk_token="[UNK]") # 显式处理未登录词 # 验证词表一致性 print(f"分词器词表大小: {tokenizer.vocab_size}") # 准备BERT输入(添加安全验证) def convert_examples_to_tfdataset(texts, labels, max_length=100): input_examples = [ InputExample(guid=i, text_a=text, label=label) for i, (text, label) in enumerate(zip(texts, labels)) ] features = [] for ex in input_examples: input_dict = tokenizer.encode_plus( ex.text_a, add_special_tokens=True, max_length=max_length, padding='max_length', truncation=True, return_attention_mask=True, return_token_type_ids=True ) features.append(InputFeatures( input_ids=input_dict['input_ids'], attention_mask=input_dict['attention_mask'], token_type_ids=input_dict['token_type_ids'], label=ex.label )) def gen(): for f in features: # 关键修复:确保token ID在词表范围内[^1][^4] safe_input_ids = [min(token_id, tokenizer.vocab_size - 1) for token_id in f.input_ids] yield ( { 'input_ids': safe_input_ids, 'attention_mask': f.attention_mask, 'token_type_ids': f.token_type_ids }, f.label ) return tf.data.Dataset.from_generator( gen, output_types=( { 'input_ids': tf.int32, 'attention_mask': tf.int32, 'token_type_ids': tf.int32 }, tf.int32 ), output_shapes=( { 'input_ids': tf.TensorShape([max_length]), 'attention_mask': tf.TensorShape([max_length]), 'token_type_ids': tf.TensorShape([max_length]) }, tf.TensorShape([]) ) ) # 创建数据集 train_ds = convert_examples_to_tfdataset(X_train.tolist(), y_train.tolist()) test_ds = convert_examples_to_tfdataset(X_test.tolist(), y_test.tolist()) # 批处理 batch_size = 32 train_ds = train_ds.shuffle(100).batch(batch_size).prefetch(tf.data.AUTOTUNE) test_ds = test_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE) # 构建BERT模型(确保配置一致) bert_model = TFBertForSequenceClassification.from_pretrained( model_path, num_labels=1, from_pt=True # 显式声明使用PyTorch权重[^1] ) # 验证模型词表大小 print(f"模型词表大小: {bert_model.config.vocab_size}") assert tokenizer.vocab_size == bert_model.config.vocab_size, \ "词表大小不匹配!请检查模型文件完整性" # 修复配置不一致 if bert_model.config.vocab_size != tokenizer.vocab_size: bert_model.config.vocab_size = tokenizer.vocab_size print(f"已手动修正模型词表大小为: {tokenizer.vocab_size}") bert_model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=3e-5), loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), metrics=['accuracy'] ) # 安全训练(添加调试回调) print("\n开始训练BERT模型...") bert_history = bert_model.fit( train_ds, validation_data=test_ds, epochs=3, verbose=1, callbacks=[ tensorboard_cb, tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=1) ] ) # 评估BERT模型 bert_results = bert_model.evaluate(test_ds) print(f"BERT测试准确率: {bert_results[1]:.4f}") 这个代码运行特别久,怎么办

filetype

import sys import json import csv import os import time import re import requests import concurrent.futures from urllib.parse import urlparse, urljoin from requests.exceptions import RequestException from PyQt5.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget, QLabel, QLineEdit, QPushButton, QTextEdit, QTableWidget, QTableWidgetItem, QTreeWidget, QTreeWidgetItem, QHeaderView, QFileDialog, QMessageBox, QSplitter, QSpinBox, QAction, QDialog, QFormLayout, QDialogButtonBox, QProgressBar, QGroupBox ) from PyQt5.QtCore import Qt, QThread, pyqtSignal from PyQt5.QtGui import QIcon # 忽略SSL证书验证的警告信息 requests.packages.urllib3.disable_warnings() class FingerprintManager: def __init__(self): self.fingerprints = [] self.default_file_path = os.path.join(os.path.expanduser("~"), "cms_fingerprints.json") if not self.load_from_default_file(): self.load_default_fingerprints() def load_default_fingerprints(self): # 优化默认指纹库,确保正则表达式正确 self.fingerprints = [ { "cms": "WordPress", "version": "", "confidence": 0, "http_headers": [ {"header": "X-Powered-By", "pattern": "PHP/.*", "score": 10, "type": "general"} ], "html_content": [ {"pattern": "<meta name=\"generator\" content=\"WordPress ([\\d.]+)\"", "score": 150, "type": "core", "version_group": 1}, {"pattern": "wp-content/themes/([^/]+)", "score": 80, "type": "specific"}, {"pattern": "wp-includes/js/wp-util.js", "score": 90, "type": "specific"} ], "url_paths": [ {"path": "/wp-admin", "score": 80, "type": "specific"}, {"path": "/wp-login.php", "score": 100, "type": "core"} ] }, { "cms": "示例站点", "version": "", "confidence": 0, "html_content": [ {"pattern": "恭喜, 站点创建成功!", "score": 120, "type": "core"}, {"pattern": "

这是默认index.html,本页面由系统自动生成

", "score": 100, "type": "core"} ], "url_paths": [] }, { "cms": "Nginx", "version": "", "confidence": 0, "http_headers": [ {"header": "Server", "pattern": "nginx/([\\d.]+)", "score": 90, "type": "core", "version_group": 1} ], "html_content": [ {"pattern": "If you see this page, the nginx web server is successfully installed", "score": 120, "type": "core"} ] }, { "cms": "Drupal", "version": "", "html_content": [ {"pattern": "<meta name=\"generator\" content=\"Drupal ([\\d.]+)\"", "score": 150, "type": "core", "version_group": 1}, {"pattern": "sites/default/files", "score": 70, "type": "specific"} ], "url_paths": [ {"path": "/sites/all", "score": 80, "type": "specific"} ] }, { "cms": "ThinkPHP", "version": "", "html_content": [ {"pattern": "think\\\\Exception", "score": 100, "type": "core"}, {"pattern": "app\\\\controller", "score": 80, "type": "specific"} ] }, { "cms": "Yii", "version": "", "html_content": [ {"pattern": "yii\\\\base\\\\Exception", "score": 100, "type": "core"}, {"pattern": "yii\\\\web\\\\HttpException", "score": 90, "type": "specific"} ] }, { "cms": "Phalcon", "version": "", "html_content": [ {"pattern": "Phalcon\\\\Exception", "score": 100, "type": "core"} ] }, { "cms": "FuelPHP", "version": "", "html_content": [ {"pattern": "Fuel\\\\Exception", "score": 100, "type": "core"} ] }, { "cms": "Habari", "version": "", "html_content": [ {"pattern": "Habari\\\\Core\\\\Exception", "score": 100, "type": "core"} ] }, { "cms": "帝国CMS", "version": "", "html_content": [ {"pattern": "ecmsinfo\\(", "score": 100, "type": "core"} ] } ] self.save_to_default_file() def load_from_default_file(self): try: if os.path.exists(self.default_file_path): with open(self.default_file_path, 'r', encoding='utf-8') as f: loaded_data = json.load(f) valid_fingerprints = [] for fp in loaded_data: if self._is_valid_fingerprint(fp): cleaned_fp = self._clean_fingerprint(fp) valid_fingerprints.append(cleaned_fp) else: print(f"跳过无效指纹: {fp}") self.fingerprints = valid_fingerprints return True return False except Exception as e: print(f"从默认文件加载指纹失败: {e}") return False def _clean_fingerprint(self, fp): """清理指纹中的正则表达式,修复常见错误""" for header in fp.get('http_headers', []): if 'pattern' in header: header['pattern'] = self._fix_regex_pattern(header['pattern']) for html in fp.get('html_content', []): if 'pattern' in html: html['pattern'] = self._fix_regex_pattern(html['pattern']) for url in fp.get('url_paths', []): if 'pattern' in url: url['pattern'] = self._fix_regex_pattern(url['pattern']) return fp def _fix_regex_pattern(self, pattern): """修复常见的正则表达式错误""" if not pattern: return "" # 修复未转义的反斜杠 fixed = re.sub(r'(?<!\\)\\(?!["\\/])', r'\\\\', pattern) # 修复未闭合的括号 open_count = fixed.count('(') close_count = fixed.count(')') if open_count > close_count: fixed += ')' * (open_count - close_count) # 修复不完整的字符类 if '[' in fixed and ']' not in fixed: fixed += ']' return fixed def _is_valid_fingerprint(self, fp): required_fields = ["cms"] for field in required_fields: if field not in fp: return False if not fp["cms"].strip(): return False for key in ["http_headers", "html_content", "url_paths"]: if key not in fp: fp[key] = [] return True def save_to_default_file(self): try: dir_path = os.path.dirname(self.default_file_path) if not os.path.exists(dir_path): os.makedirs(dir_path) with open(self.default_file_path, 'w', encoding='utf-8') as f: json.dump(self.fingerprints, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"保存指纹到默认文件失败: {e}") return False def add_fingerprint(self, fingerprint): if self._is_valid_fingerprint(fingerprint): cleaned = self._clean_fingerprint(fingerprint) self.fingerprints.append(cleaned) self.save_to_default_file() return True print(f"无法添加无效指纹: {fingerprint}") return False def remove_fingerprint(self, index): if 0 <= index < len(self.fingerprints): self.fingerprints.pop(index) self.save_to_default_file() def update_fingerprint(self, index, fingerprint): if 0 <= index < len(self.fingerprints) and self._is_valid_fingerprint(fingerprint): cleaned = self._clean_fingerprint(fingerprint) self.fingerprints[index] = cleaned self.save_to_default_file() return True return False def clear_fingerprints(self): self.fingerprints = [] self.save_to_default_file() return True def restore_default_fingerprints(self): self.load_default_fingerprints() return True def get_fingerprints(self): return self.fingerprints def export_fingerprints(self, filename): try: with open(filename, 'w', encoding='utf-8') as f: json.dump(self.fingerprints, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"导出失败: {e}") return False def import_fingerprints(self, filename): try: with open(filename, 'r', encoding='utf-8') as f: imported_data = json.load(f) valid_fingerprints = [] for fp in imported_data: if self._is_valid_fingerprint(fp): cleaned = self._clean_fingerprint(fp) valid_fingerprints.append(cleaned) else: print(f"导入时跳过无效指纹: {fp}") if valid_fingerprints: self.fingerprints = valid_fingerprints self.save_to_default_file() return True print("导入的指纹全部无效") return False except Exception as e: print(f"导入失败: {e}") return False class DetectionWorker(QThread): progress_signal = pyqtSignal(int, int, str) result_signal = pyqtSignal(dict) log_signal = pyqtSignal(str) finished_signal = pyqtSignal() def __init__(self, urls, fingerprints, max_threads=10, retry_count=2): super().__init__() self.urls = urls self.fingerprints = fingerprints self.max_threads = max_threads self.running = True self.retry_count = retry_count self.timeout = 15 # 超时时间(秒) # 缓存响应以提高性能 self.response_cache = {} def run(self): self.log_signal.emit("开始检测...") total = len(self.urls) for i, url in enumerate(self.urls): if not self.running: break self.progress_signal.emit(i+1, total, url) result = self.detect_cms(url) self.result_signal.emit(result) self.log_signal.emit("检测完成!") self.finished_signal.emit() def stop(self): self.running = False def preprocess_html(self, html): """优化HTML预处理:保留标签结构,不过度压缩""" processed = re.sub(r'\n\s+', '\n', html) processed = re.sub(r'>\s+<', '><', processed) return processed.strip() def escape_special_chars(self, pattern): """安全转义正则特殊字符""" if not pattern: return "" safe_pattern = re.sub(r'\\(?![\\.*+?^${}()|[\]sSdDwWtnbfvr])', r'\\\\', pattern) return safe_pattern def validate_regex(self, pattern): """验证正则表达式是否有效""" if not pattern: return True, pattern try: re.compile(pattern) return True, pattern except re.error as e: fixed = pattern if "bad escape" in str(e): fixed = re.sub(r'(?<!\\)\\(?!["\\/])', r'\\\\', pattern) elif "unterminated subpattern" in str(e): open_count = pattern.count('(') close_count = pattern.count(')') if open_count > close_count: fixed = pattern + ')' * (open_count - close_count) try: re.compile(fixed) self.log_signal.emit(f"自动修复正则表达式: {pattern} -> {fixed}") return True, fixed except re.error: return False, pattern def extract_version(self, content, pattern, group_idx): """从匹配结果中提取版本号""" if not pattern or group_idx is None: return "" try: match = re.search(pattern, content, re.IGNORECASE) if match and len(match.groups()) >= group_idx: return match.group(group_idx).strip() except re.error as e: self.log_signal.emit(f"版本提取正则错误 {pattern}: {str(e)}") return "" def fetch_url_content(self, url): """带重试机制的URL内容获取""" # 检查缓存 if url in self.response_cache: return self.response_cache[url] headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9' } for attempt in range(self.retry_count + 1): try: response = requests.get( url, headers=headers, allow_redirects=True, verify=False, timeout=self.timeout ) response.encoding = response.apparent_encoding # 缓存响应 self.response_cache[url] = response return response except RequestException as e: self.log_signal.emit(f"请求尝试 {attempt+1} 失败: {str(e)}") if attempt >= self.retry_count: return None time.sleep(1) return None def build_full_url(self, base_url, path): """构建完整的URL""" if not path.startswith('/'): path = '/' + path parsed = urlparse(base_url) return f"{parsed.scheme}://{parsed.netloc}{path}" def check_url_path(self, base_url, path, pattern, item_score, weight): """检查URL路径特征 - 主动访问并验证""" full_url = self.build_full_url(base_url, path) feature_desc = f"URL路径: {full_url}" # 尝试获取响应 response = self.fetch_url_content(full_url) if response and response.status_code == 200: # 如果有正则模式,检查内容 if pattern: is_valid, fixed_pattern = self.validate_regex(pattern) if is_valid: try: if re.search(fixed_pattern, response.text, re.IGNORECASE): return True, feature_desc, item_score * weight except re.error as e: self.log_signal.emit(f"URL路径正则错误: {str(e)}") # 如果没有正则模式,只要状态200就算匹配 else: return True, feature_desc, item_score * weight return False, feature_desc, 0 def detect_cms(self, url): original_url = url if not url.startswith(('http://', 'https://')): urls_to_try = [f'http://{url}', f'https://{url}'] else: urls_to_try = [url] response = None for test_url in urls_to_try: response = self.fetch_url_content(test_url) if response: url = test_url break if not response: return { "url": original_url, "status": -1, "results": [{"cms": "无法访问", "version": "", "confidence": 0, "judgment_basis": ["无法建立连接"]}], "primary": {"cms": "无法访问", "version": "", "confidence": 0} } status_code = response.status_code headers = response.headers html_content = response.text final_url = response.url processed_html = self.preprocess_html(html_content) self.log_signal.emit(f"获取内容: {final_url} (状态码: {status_code})") cms_matches = [] min_score_threshold = 50 for cms in self.fingerprints: total_score = 0 version = "" # 记录详细的判断依据 judgment_basis = [] matched_features = [] unmatched_features = [] # 1. 匹配HTTP头特征 for header_item in cms.get('http_headers', []): header_name = header_item.get('header', '').lower() pattern = header_item.get('pattern', '') item_score = header_item.get('score', 0) feature_type = header_item.get('type', 'general') if not header_name or not pattern: continue is_valid, fixed_pattern = self.validate_regex(pattern) if not is_valid: self.log_signal.emit(f"跳过无效HTTP头正则: {pattern}") continue weight = 2 if feature_type == 'core' else 1 adjusted_score = item_score * weight feature_desc = f"HTTP头[{header_name}]匹配模式[{fixed_pattern}]" if header_name in headers: header_value = str(headers[header_name]) try: if re.search(fixed_pattern, header_value, re.IGNORECASE): total_score += adjusted_score matched_features.append(f"{feature_desc} (+{adjusted_score})") judgment_basis.append(f"✓ {feature_desc},匹配成功,加{adjusted_score}分") if 'version_group' in header_item: version = self.extract_version( header_value, fixed_pattern, header_item['version_group'] ) or version else: unmatched_features.append(f"{feature_desc} (未匹配)") judgment_basis.append(f"✗ {feature_desc},未匹配") except re.error as e: self.log_signal.emit(f"HTTP头正则执行错误 {fixed_pattern}: {str(e)}") else: unmatched_features.append(f"{feature_desc} (Header不存在)") judgment_basis.append(f"✗ {feature_desc},Header不存在") # 2. 匹配HTML内容特征 for html_item in cms.get('html_content', []): pattern = html_item.get('pattern', '').strip() item_score = html_item.get('score', 0) feature_type = html_item.get('type', 'general') if not pattern: continue is_valid, fixed_pattern = self.validate_regex(pattern) if not is_valid: self.log_signal.emit(f"跳过无效HTML正则: {pattern}") continue weight = 2.5 if feature_type == 'core' else (1.5 if feature_type == 'specific' else 1) adjusted_score = int(item_score * weight) feature_desc = f"HTML内容匹配模式[{fixed_pattern[:50]}{'...' if len(fixed_pattern)>50 else ''}]" try: if '<' in fixed_pattern and '>' in fixed_pattern: escaped_pattern = self.escape_special_chars(fixed_pattern) flexible_pattern = re.sub(r'\s+', r'\\s+', escaped_pattern) match_found = re.search(flexible_pattern, processed_html, re.IGNORECASE | re.DOTALL) else: match_found = re.search(fixed_pattern, processed_html, re.IGNORECASE | re.DOTALL) if match_found: total_score += adjusted_score matched_features.append(f"{feature_desc} (+{adjusted_score})") judgment_basis.append(f"✓ {feature_desc},匹配成功,加{adjusted_score}分") if 'version_group' in html_item: version = self.extract_version( processed_html, fixed_pattern, html_item['version_group'] ) or version else: unmatched_features.append(f"{feature_desc} (未匹配)") judgment_basis.append(f"✗ {feature_desc},未匹配") except re.error as e: self.log_signal.emit(f"HTML正则执行错误 {fixed_pattern}: {str(e)}") # 3. 匹配URL路径特征 - 使用线程池并发处理 url_path_tasks = [] with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, self.max_threads)) as executor: for url_item in cms.get('url_paths', []): path = url_item.get('path', '') pattern = url_item.get('pattern', '') item_score = url_item.get('score', 0) feature_type = url_item.get('type', 'general') if not path: continue weight = 2 if feature_type == 'core' else 1 adjusted_score = item_score * weight # 提交任务到线程池 task = executor.submit( self.check_url_path, final_url, path, pattern, item_score, weight ) url_path_tasks.append((task, adjusted_score, path)) # 处理URL路径特征结果 for task, adjusted_score, path in url_path_tasks: try: matched, desc, score = task.result() if matched: total_score += score matched_features.append(f"{desc} (+{score})") judgment_basis.append(f"✓ {desc},访问成功,加{score}分") else: unmatched_features.append(f"{desc} (访问失败或未匹配)") judgment_basis.append(f"✗ {desc},访问失败或未匹配") except Exception as e: self.log_signal.emit(f"URL路径检查出错: {str(e)}") # 计算置信度 max_possible = sum( (h.get('score', 0) * (2 if h.get('type') == 'core' else 1)) for h in cms.get('http_headers', []) ) + sum( (h.get('score', 0) * (2.5 if h.get('type') == 'core' else 1)) for h in cms.get('html_content', []) ) + sum( (u.get('score', 0) * (2 if u.get('type') == 'core' else 1)) for u in cms.get('url_paths', []) ) confidence = min(100, int((total_score / max_possible) * 100)) if max_possible > 0 else 0 # 汇总判断依据 if matched_features: judgment_basis.insert(0, f"匹配到{len(matched_features)}个特征,总分{total_score}") else: judgment_basis.insert(0, f"未匹配到任何特征,总分0") if total_score >= min_score_threshold: cms_matches.append({ "cms": cms['cms'], "version": version or cms.get('version', ''), "score": total_score, "confidence": confidence, "judgment_basis": judgment_basis, # 存储详细判断依据 "features": matched_features }) cms_matches.sort(key=lambda x: (-x['confidence'], -x['score'])) filtered_results = [] if cms_matches: max_score = cms_matches[0]['score'] for match in cms_matches: if match['score'] >= max_score * 0.8 or match['confidence'] >= 70: filtered_results.append(match) # 如果没有匹配到任何结果,添加一个默认结果并说明原因 if not filtered_results: filtered_results.append({ "cms": "未知", "version": "", "confidence": 0, "judgment_basis": ["未匹配到任何已知CMS的特征", "请检查指纹库是否完整或添加新指纹"] }) primary_result = filtered_results[0] if filtered_results else { "cms": "未知", "version": "", "confidence": 0 } return { "url": final_url, "status": status_code, "results": filtered_results, "primary": primary_result } class AddFingerprintDialog(QDialog): def __init__(self, parent=None, fingerprint=None): super().__init__(parent) self.fingerprint = fingerprint self.setWindowTitle("编辑指纹" if fingerprint else "添加指纹") self.setGeometry(300, 300, 600, 500) self.init_ui() def init_ui(self): layout = QVBoxLayout() form_layout = QFormLayout() self.cms_input = QLineEdit() self.version_input = QLineEdit() form_layout.addRow("CMS名称*:", self.cms_input) form_layout.addRow("默认版本:", self.version_input) regex_help = QLabel("正则表达式提示: 反斜杠需要输入两次(\\\\),特殊字符(如. * + ?)需要转义") regex_help.setStyleSheet("color: #2980b9; font-size: 12px;") form_layout.addRow(regex_help) type_note = QLabel("特征类型说明: core(核心特征,权重高) > specific(特定特征) > general(通用特征)") type_note.setStyleSheet("color: #666; font-size: 12px;") form_layout.addRow(type_note) layout.addLayout(form_layout) # HTTP头特征表格 http_group = QWidget() http_layout = QVBoxLayout(http_group) http_layout.addWidget(QLabel("HTTP头特征:")) self.http_table = QTableWidget(0, 4) self.http_table.setHorizontalHeaderLabels(["Header", "Pattern", "Score", "Type(core/general)"]) self.http_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) http_btn_layout = QHBoxLayout() add_http_btn = QPushButton("添加") add_http_btn.clicked.connect(lambda: self.add_row(self.http_table, ["", "", "50", "general"])) remove_http_btn = QPushButton("移除") remove_http_btn.clicked.connect(lambda: self.remove_row(self.http_table)) http_btn_layout.addWidget(add_http_btn) http_btn_layout.addWidget(remove_http_btn) http_layout.addWidget(self.http_table) http_layout.addLayout(http_btn_layout) layout.addWidget(http_group) # HTML内容特征表格 html_group = QWidget() html_layout = QVBoxLayout(html_group) html_layout.addWidget(QLabel("HTML内容特征:")) self.html_table = QTableWidget(0, 4) self.html_table.setHorizontalHeaderLabels(["Pattern", "Score", "Type(core/specific)", "版本提取组(可选)"]) self.html_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) html_btn_layout = QHBoxLayout() add_html_btn = QPushButton("添加") add_html_btn.clicked.connect(lambda: self.add_row(self.html_table, ["", "80", "specific", ""])) remove_html_btn = QPushButton("移除") remove_html_btn.clicked.connect(lambda: self.remove_row(self.html_table)) html_btn_layout.addWidget(add_html_btn) html_btn_layout.addWidget(remove_html_btn) html_layout.addWidget(self.html_table) html_layout.addLayout(html_btn_layout) layout.addWidget(html_group) # URL路径特征表格 url_group = QWidget() url_layout = QVBoxLayout(url_group) url_layout.addWidget(QLabel("URL路径特征 (将主动访问这些路径):")) self.url_table = QTableWidget(0, 4) self.url_table.setHorizontalHeaderLabels(["Path", "Pattern(可选)", "Score", "Type(core/specific)"]) self.url_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) url_btn_layout = QHBoxLayout() add_url_btn = QPushButton("添加") add_url_btn.clicked.connect(lambda: self.add_row(self.url_table, ["", "", "60", "specific"])) remove_url_btn = QPushButton("移除") remove_url_btn.clicked.connect(lambda: self.remove_row(self.url_table)) url_btn_layout.addWidget(add_url_btn) url_btn_layout.addWidget(remove_url_btn) url_layout.addWidget(self.url_table) url_layout.addLayout(url_btn_layout) layout.addWidget(url_group) # 测试正则按钮 test_btn = QPushButton("测试选中的正则表达式") test_btn.clicked.connect(self.test_selected_regex) layout.addWidget(test_btn) # 确认按钮 btn_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) btn_box.accepted.connect(self.accept) btn_box.rejected.connect(self.reject) layout.addWidget(btn_box) self.setLayout(layout) self.load_fingerprint_data() def test_selected_regex(self): current_table = None pattern = "" if self.http_table.currentRow() >= 0: current_table = self.http_table item = self.http_table.item(self.http_table.currentRow(), 1) if item: pattern = item.text() elif self.html_table.currentRow() >= 0: current_table = self.html_table item = self.html_table.item(self.html_table.currentRow(), 0) if item: pattern = item.text() if not pattern: QMessageBox.information(self, "测试结果", "请选择一个正则表达式进行测试") return try: re.compile(pattern) QMessageBox.information(self, "测试结果", f"正则表达式有效:\n{pattern}") except re.error as e: fixed = pattern if "bad escape" in str(e): fixed = re.sub(r'(?<!\\)\\(?!["\\/])', r'\\\\', pattern) elif "unterminated subpattern" in str(e): open_count = pattern.count('(') close_count = pattern.count(')') if open_count > close_count: fixed = pattern + ')' * (open_count - close_count) try: re.compile(fixed) QMessageBox.information( self, "修复成功", f"原表达式无效: {str(e)}\n修复后表达式: {fixed}" ) if current_table == self.http_table: self.http_table.item(self.http_table.currentRow(), 1).setText(fixed) else: self.html_table.item(self.html_table.currentRow(), 0).setText(fixed) except re.error as e2: QMessageBox.warning( self, "测试失败", f"正则表达式无效: {str(e2)}\n表达式: {pattern}" ) def add_row(self, table, default_values): row = table.rowCount() table.insertRow(row) for col, val in enumerate(default_values): table.setItem(row, col, QTableWidgetItem(val)) def remove_row(self, table): row = table.currentRow() if row >= 0: table.removeRow(row) def load_fingerprint_data(self): if not self.fingerprint: return self.cms_input.setText(self.fingerprint.get("cms", "")) self.version_input.setText(self.fingerprint.get("version", "")) for header in self.fingerprint.get("http_headers", []): self.add_row(self.http_table, [ header.get("header", ""), header.get("pattern", ""), str(header.get("score", 50)), header.get("type", "general") ]) for html in self.fingerprint.get("html_content", []): self.add_row(self.html_table, [ html.get("pattern", ""), str(html.get("score", 80)), html.get("type", "specific"), str(html.get("version_group", "")) if "version_group" in html else "" ]) for path in self.fingerprint.get("url_paths", []): self.add_row(self.url_table, [ path.get("path", ""), path.get("pattern", ""), str(path.get("score", 60)), path.get("type", "specific") ]) def validate_regex(self, pattern): try: if pattern: re.compile(pattern) return True except re.error as e: QMessageBox.warning(self, "正则错误", f"模式 '{pattern}' 无效: {str(e)}\n请使用测试按钮修复") return False def get_fingerprint(self): cms_name = self.cms_input.text().strip() if not cms_name: QMessageBox.warning(self, "输入错误", "CMS名称不能为空") return None for row in range(self.html_table.rowCount()): pattern_item = self.html_table.item(row, 0) if pattern_item and not self.validate_regex(pattern_item.text().strip()): return None fingerprint = { "cms": cms_name, "version": self.version_input.text().strip(), "confidence": 0, "http_headers": [], "html_content": [], "url_paths": [] } for row in range(self.http_table.rowCount()): header = self.http_table.item(row, 0).text().strip() if self.http_table.item(row, 0) else "" pattern = self.http_table.item(row, 1).text().strip() if self.http_table.item(row, 1) else "" score = int(self.http_table.item(row, 2).text() or 50) f_type = self.http_table.item(row, 3).text().strip() or "general" if header and pattern: fingerprint["http_headers"].append({ "header": header, "pattern": pattern, "score": score, "type": f_type }) for row in range(self.html_table.rowCount()): pattern = self.html_table.item(row, 0).text().strip() if self.html_table.item(row, 0) else "" score = int(self.html_table.item(row, 1).text() or 80) f_type = self.html_table.item(row, 2).text().strip() or "specific" version_group = self.html_table.item(row, 3).text().strip() if pattern: item = { "pattern": pattern, "score": score, "type": f_type } if version_group and version_group.isdigit(): item["version_group"] = int(version_group) fingerprint["html_content"].append(item) for row in range(self.url_table.rowCount()): path = self.url_table.item(row, 0).text().strip() if self.url_table.item(row, 0) else "" pattern = self.url_table.item(row, 1).text().strip() if self.url_table.item(row, 1) else "" score = int(self.url_table.item(row, 2).text() or 60) f_type = self.url_table.item(row, 3).text().strip() or "specific" if path: fingerprint["url_paths"].append({ "path": path, "pattern": pattern, "score": score, "type": f_type }) return fingerprint class JudgmentBasisDialog(QDialog): """判断依据展示对话框""" def __init__(self, parent=None, result=None): super().__init__(parent) self.result = result self.setWindowTitle(f"识别依据 - {result['url']}") self.setGeometry(400, 200, 800, 600) self.init_ui() def init_ui(self): layout = QVBoxLayout() # 基本信息 basic_info = QLabel(f"""

URL: {self.result['url']}

状态码: {self.result['status']}

""") layout.addWidget(basic_info) # 识别结果 results_group = QGroupBox("识别结果汇总") results_layout = QVBoxLayout() for i, res in enumerate(self.result['results']): is_primary = (i == 0) # 第一个结果是主要结果 result_label = QLabel(f"""

{'★ ' if is_primary else ''}{res['cms']} v{res['version']} 置信度: {res['confidence']}%

""") results_layout.addWidget(result_label) results_group.setLayout(results_layout) layout.addWidget(results_group) # 详细判断依据 basis_group = QTabWidget() for res in self.result['results']: text_edit = QTextEdit() text_edit.setReadOnly(True) # 显示所有判断依据 text_edit.setText("\n".join(res['judgment_basis'])) basis_group.addTab(text_edit, f"{res['cms']} (置信度{res['confidence']}%)") layout.addWidget(basis_group) # 关闭按钮 btn_box = QDialogButtonBox(QDialogButtonBox.Ok) btn_box.accepted.connect(self.accept) layout.addWidget(btn_box) self.setLayout(layout) class CMSDetectorApp(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("多CMS识别工具 (带判断依据)") self.setGeometry(100, 100, 1200, 800) self.fingerprint_manager = FingerprintManager() self.results = [] self.create_menu() self.init_ui() self.apply_styles() def create_menu(self): menubar = self.menuBar() file_menu = menubar.addMenu("文件") import_action = QAction("导入网站列表", self) import_action.triggered.connect(self.import_urls) file_menu.addAction(import_action) export_action = QAction("导出结果", self) export_action.triggered.connect(self.export_results) file_menu.addAction(export_action) file_menu.addSeparator() exit_action = QAction("退出", self) exit_action.setShortcut("Ctrl+Q") exit_action.triggered.connect(self.close) file_menu.addAction(exit_action) fingerprint_menu = menubar.addMenu("指纹库") add_fingerprint_action = QAction("添加指纹", self) add_fingerprint_action.triggered.connect(self.add_fingerprint) fingerprint_menu.addAction(add_fingerprint_action) import_fingerprint_action = QAction("导入指纹库", self) import_fingerprint_action.triggered.connect(self.import_fingerprints) fingerprint_menu.addAction(import_fingerprint_action) export_fingerprint_action = QAction("导出指纹库", self) export_fingerprint_action.triggered.connect(self.export_fingerprints) fingerprint_menu.addAction(export_fingerprint_action) clear_fingerprint_action = QAction("清空指纹库", self) clear_fingerprint_action.triggered.connect(self.clear_fingerprints) fingerprint_menu.addAction(clear_fingerprint_action) restore_default_action = QAction("恢复默认指纹库", self) restore_default_action.triggered.connect(self.restore_default_fingerprints) fingerprint_menu.addAction(restore_default_action) help_menu = menubar.addMenu("帮助") about_action = QAction("关于", self) about_action.triggered.connect(self.show_about) help_menu.addAction(about_action) def init_ui(self): main_widget = QWidget() main_layout = QVBoxLayout() self.tabs = QTabWidget() self.detection_tab = self.create_detection_tab() self.fingerprint_tab = self.create_fingerprint_tab() self.tabs.addTab(self.detection_tab, "网站检测") self.tabs.addTab(self.fingerprint_tab, "指纹库管理") main_layout.addWidget(self.tabs) main_widget.setLayout(main_layout) self.setCentralWidget(main_widget) self.status_bar = self.statusBar() self.status_label = QLabel("就绪") self.status_bar.addWidget(self.status_label) self.detection_thread = None def apply_styles(self): self.setStyleSheet(""" QMainWindow { background-color: #f0f0f0; } QTabWidget::pane { border: 1px solid #cccccc; background: white; } QTableWidget { background-color: white; alternate-background-color: #f8f8f8; gridline-color: #e0e0e0; } QHeaderView::section { background-color: #e0e0e0; padding: 4px; border: 1px solid #d0d0d0; } QPushButton { background-color: #4a86e8; color: white; border: none; padding: 5px 10px; border-radius: 4px; } QPushButton:hover { background-color: #3a76d8; } QPushButton:pressed { background-color: #2a66c8; } QPushButton:disabled { background-color: #a0a0a0; } QPushButton#clearBtn { background-color: #e74c3c; } QPushButton#clearBtn:hover { background-color: #c0392b; } QPushButton#restoreBtn { background-color: #27ae60; } QPushButton#restoreBtn:hover { background-color: #219653; } """) def create_detection_tab(self): tab = QWidget() layout = QVBoxLayout() # URL输入区域 control_layout = QHBoxLayout() self.url_input = QLineEdit() self.url_input.setPlaceholderText("输入网站URL (例如: example.com 或 http://example.com)") add_url_btn = QPushButton("添加URL") add_url_btn.clicked.connect(self.add_single_url) import_btn = QPushButton("导入URL列表") import_btn.clicked.connect(self.import_urls) clear_btn = QPushButton("清空列表") clear_btn.clicked.connect(self.clear_urls) control_layout.addWidget(self.url_input, 4) control_layout.addWidget(add_url_btn, 1) control_layout.addWidget(import_btn, 1) control_layout.addWidget(clear_btn, 1) layout.addLayout(control_layout) # URL列表区域 url_list_layout = QVBoxLayout() url_list_layout.addWidget(QLabel("待检测网站列表:")) self.url_list = QTextEdit() self.url_list.setPlaceholderText("每行一个URL") self.url_list.setMinimumHeight(80) url_list_layout.addWidget(self.url_list) layout.addLayout(url_list_layout) # 检测控制区域 detection_control_layout = QHBoxLayout() self.thread_spin = QSpinBox() self.thread_spin.setRange(1, 20) self.thread_spin.setValue(5) self.thread_spin.setPrefix("线程数: ") self.retry_spin = QSpinBox() self.retry_spin.setRange(0, 3) self.retry_spin.setValue(1) self.retry_spin.setPrefix("重试次数: ") self.timeout_spin = QSpinBox() self.timeout_spin.setRange(5, 60) self.timeout_spin.setValue(15) self.timeout_spin.setPrefix("超时时间(秒): ") self.detect_btn = QPushButton("开始检测") self.detect_btn.clicked.connect(self.start_detection) self.stop_btn = QPushButton("停止检测") self.stop_btn.clicked.connect(self.stop_detection) self.stop_btn.setEnabled(False) detection_control_layout.addWidget(self.thread_spin) detection_control_layout.addWidget(self.retry_spin) detection_control_layout.addWidget(self.timeout_spin) detection_control_layout.addStretch() detection_control_layout.addWidget(self.detect_btn) detection_control_layout.addWidget(self.stop_btn) layout.addLayout(detection_control_layout) # 进度条 self.progress_bar = QProgressBar() self.progress_bar.setRange(0, 100) self.progress_bar.setTextVisible(True) layout.addWidget(self.progress_bar) # 结果展示区域 splitter = QSplitter(Qt.Vertical) self.result_table = QTableWidget(0, 6) # 增加一列显示操作 self.result_table.setHorizontalHeaderLabels(["URL", "状态", "CMS类型", "版本", "置信度(%)", "操作"]) self.result_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch) self.result_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents) self.result_table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch) self.result_table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents) self.result_table.horizontalHeader().setSectionResizeMode(4, QHeaderView.ResizeToContents) self.result_table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeToContents) self.result_table.setAlternatingRowColors(True) self.log_area = QTextEdit() self.log_area.setReadOnly(True) self.log_area.setMinimumHeight(150) splitter.addWidget(self.result_table) splitter.addWidget(self.log_area) splitter.setSizes([400, 150]) layout.addWidget(splitter, 1) tab.setLayout(layout) return tab def create_fingerprint_tab(self): tab = QWidget() layout = QVBoxLayout() btn_layout = QHBoxLayout() add_btn = QPushButton("添加指纹") add_btn.clicked.connect(self.add_fingerprint) edit_btn = QPushButton("编辑指纹") edit_btn.clicked.connect(self.edit_fingerprint) remove_btn = QPushButton("删除指纹") remove_btn.clicked.connect(self.remove_fingerprint) clear_btn = QPushButton("清空指纹库") clear_btn.setObjectName("clearBtn") clear_btn.clicked.connect(self.clear_fingerprints) restore_btn = QPushButton("恢复默认") restore_btn.setObjectName("restoreBtn") restore_btn.clicked.connect(self.restore_default_fingerprints) import_btn = QPushButton("导入指纹库") import_btn.clicked.connect(self.import_fingerprints) export_btn = QPushButton("导出指纹库") export_btn.clicked.connect(self.export_fingerprints) btn_layout.addWidget(add_btn) btn_layout.addWidget(edit_btn) btn_layout.addWidget(remove_btn) btn_layout.addWidget(clear_btn) btn_layout.addWidget(restore_btn) btn_layout.addStretch() btn_layout.addWidget(import_btn) btn_layout.addWidget(export_btn) layout.addLayout(btn_layout) self.fingerprint_tree = QTreeWidget() self.fingerprint_tree.setHeaderLabels(["CMS名称", "版本", "核心特征数", "总特征数"]) self.fingerprint_tree.setColumnWidth(0, 200) self.fingerprint_tree.setSortingEnabled(True) self.populate_fingerprint_tree() layout.addWidget(self.fingerprint_tree, 1) tab.setLayout(layout) return tab def populate_fingerprint_tree(self): self.fingerprint_tree.clear() fingerprints = self.fingerprint_manager.get_fingerprints() for i, fp in enumerate(fingerprints): try: cms_name = fp["cms"] version = fp.get("version", "") core_features = 0 total_features = 0 for h in fp.get("http_headers", []): total_features += 1 if h.get("type") == "core": core_features += 1 for h in fp.get("html_content", []): total_features += 1 if h.get("type") == "core": core_features += 1 for u in fp.get("url_paths", []): total_features += 1 if u.get("type") == "core": core_features += 1 item = QTreeWidgetItem([ cms_name, version, str(core_features), str(total_features) ]) item.setData(0, Qt.UserRole, i) self.fingerprint_tree.addTopLevelItem(item) except Exception as e: self.log(f"处理指纹时出错: {e},已跳过") def add_single_url(self): url = self.url_input.text().strip() if url: current_text = self.url_list.toPlainText() new_text = current_text + (("\n" + url) if current_text else url) self.url_list.setPlainText(new_text) self.url_input.clear() def import_urls(self): file_path, _ = QFileDialog.getOpenFileName( self, "导入URL列表", "", "文本文件 (*.txt);;所有文件 (*)" ) if file_path: try: with open(file_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] self.url_list.setPlainText("\n".join(urls)) self.log(f"成功导入 {len(urls)} 个URL") except Exception as e: QMessageBox.critical(self, "导入错误", f"导入失败: {str(e)}") def clear_urls(self): self.url_list.clear() def start_detection(self): urls_text = self.url_list.toPlainText().strip() if not urls_text: QMessageBox.warning(self, "警告", "请先添加要检测的URL") return urls = [url.strip() for url in urls_text.splitlines() if url.strip()] if not urls: QMessageBox.warning(self, "警告", "没有有效的URL") return self.result_table.setRowCount(0) self.results = [] max_threads = self.thread_spin.value() retry_count = self.retry_spin.value() timeout = self.timeout_spin.value() self.detection_thread = DetectionWorker( urls, self.fingerprint_manager.get_fingerprints(), max_threads, retry_count ) self.detection_thread.timeout = timeout self.detection_thread.progress_signal.connect(self.update_progress) self.detection_thread.result_signal.connect(self.add_result) self.detection_thread.log_signal.connect(self.log) self.detection_thread.finished_signal.connect(self.detection_finished) self.detect_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.progress_bar.setRange(0, len(urls)) self.progress_bar.setValue(0) self.detection_thread.start() def stop_detection(self): if self.detection_thread and self.detection_thread.isRunning(): self.detection_thread.stop() self.log("检测已停止") self.detection_finished() def detection_finished(self): self.detect_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.status_label.setText("检测完成") def update_progress(self, current, total, url): self.progress_bar.setMaximum(total) self.progress_bar.setValue(current) self.status_label.setText(f"正在检测: {url} ({current}/{total})") def show_judgment_basis(self, result): """显示判断依据对话框""" dialog = JudgmentBasisDialog(self, result) dialog.exec_() def add_result(self, result): self.results.append(result) row = self.result_table.rowCount() self.result_table.insertRow(row) # URL url_item = QTableWidgetItem(result["url"]) url_item.setFlags(url_item.flags() ^ Qt.ItemIsEditable) self.result_table.setItem(row, 0, url_item) # 状态码 status = result["status"] status_item = QTableWidgetItem(str(status)) status_item.setFlags(status_item.flags() ^ Qt.ItemIsEditable) if status == 200: status_item.setForeground(Qt.darkGreen) elif 400 <= status < 500: status_item.setForeground(Qt.darkRed) elif status >= 500: status_item.setForeground(Qt.darkMagenta) self.result_table.setItem(row, 1, status_item) # CMS类型(主结果) primary = result["primary"] cms_item = QTableWidgetItem(primary["cms"]) cms_item.setFlags(cms_item.flags() ^ Qt.ItemIsEditable) self.result_table.setItem(row, 2, cms_item) # 版本 version_item = QTableWidgetItem(primary["version"]) version_item.setFlags(version_item.flags() ^ Qt.ItemIsEditable) self.result_table.setItem(row, 3, version_item) # 置信度 confidence = primary["confidence"] confidence_item = QTableWidgetItem(f"{confidence}%") confidence_item.setFlags(confidence_item.flags() ^ Qt.ItemIsEditable) if confidence >= 90: confidence_item.setForeground(Qt.darkGreen) elif confidence >= 70: confidence_item.setForeground(Qt.darkBlue) elif confidence >= 50: confidence_item.setForeground(Qt.darkOrange) else: confidence_item.setForeground(Qt.darkGray) self.result_table.setItem(row, 4, confidence_item) # 查看依据按钮 view_btn = QPushButton("查看依据") # 使用lambda表达式传递当前result view_btn.clicked.connect(lambda checked, res=result: self.show_judgment_basis(res)) self.result_table.setCellWidget(row, 5, view_btn) def add_fingerprint(self): dialog = AddFingerprintDialog(self) if dialog.exec_() == QDialog.Accepted: fingerprint = dialog.get_fingerprint() if fingerprint and self.fingerprint_manager.add_fingerprint(fingerprint): self.populate_fingerprint_tree() self.log(f"已添加指纹: {fingerprint['cms']}") def edit_fingerprint(self): selected_items = self.fingerprint_tree.selectedItems() if not selected_items: QMessageBox.warning(self, "警告", "请选择一个指纹进行编辑") return item = selected_items[0] index = item.data(0, Qt.UserRole) fingerprints = self.fingerprint_manager.get_fingerprints() if index is None or not (0 <= index < len(fingerprints)): QMessageBox.warning(self, "错误", "无效的指纹索引") return fingerprint = fingerprints[index] dialog = AddFingerprintDialog(self, fingerprint) if dialog.exec_() == QDialog.Accepted: updated = dialog.get_fingerprint() if updated and self.fingerprint_manager.update_fingerprint(index, updated): self.populate_fingerprint_tree() self.log(f"已更新指纹: {updated['cms']}") def remove_fingerprint(self): selected_items = self.fingerprint_tree.selectedItems() if not selected_items: QMessageBox.warning(self, "警告", "请选择要删除的指纹") return item = selected_items[0] cms_name = item.text(0) index = item.data(0, Qt.UserRole) reply = QMessageBox.question( self, "确认删除", f"确定要删除 '{cms_name}' 的指纹吗?", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.fingerprint_manager.remove_fingerprint(index) self.populate_fingerprint_tree() self.log(f"已删除指纹: {cms_name}") def clear_fingerprints(self): if not self.fingerprint_manager.get_fingerprints(): QMessageBox.information(self, "提示", "指纹库已为空") return reply = QMessageBox.question( self, "确认清空", "确定要清空所有指纹吗?此操作不可恢复!", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.fingerprint_manager.clear_fingerprints() self.populate_fingerprint_tree() self.log("已清空所有指纹") def restore_default_fingerprints(self): reply = QMessageBox.question( self, "确认恢复", "确定要恢复默认指纹库吗?当前指纹将被替换!", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.fingerprint_manager.restore_default_fingerprints() self.populate_fingerprint_tree() self.log("已恢复默认指纹库") def import_fingerprints(self): file_path, _ = QFileDialog.getOpenFileName( self, "导入指纹库", "", "JSON文件 (*.json);;所有文件 (*)" ) if file_path and self.fingerprint_manager.import_fingerprints(file_path): self.populate_fingerprint_tree() self.log(f"成功导入指纹库: {file_path}") def export_fingerprints(self): file_path, _ = QFileDialog.getSaveFileName( self, "导出指纹库", "cms_fingerprints.json", "JSON文件 (*.json)" ) if file_path and self.fingerprint_manager.export_fingerprints(file_path): self.log(f"成功导出指纹库: {file_path}") def export_results(self): if not self.results: QMessageBox.warning(self, "警告", "没有结果可导出") return file_path, _ = QFileDialog.getSaveFileName( self, "导出结果", "", "CSV文件 (*.csv);;JSON文件 (*.json)" ) if not file_path: return try: if file_path.endswith(".csv"): with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(["URL", "状态", "CMS类型", "版本", "置信度(%)"]) for result in self.results: primary = result["primary"] writer.writerow([ result["url"], result["status"], primary["cms"], primary["version"], primary["confidence"] ]) elif file_path.endswith(".json"): # 导出完整结果,包括判断依据 with open(file_path, 'w', encoding='utf-8') as f: json.dump(self.results, f, indent=4, ensure_ascii=False) self.log(f"结果已导出到: {file_path}") except Exception as e: QMessageBox.critical(self, "导出错误", f"导出失败: {str(e)}") def log(self, message): timestamp = time.strftime("%H:%M:%S") self.log_area.append(f"[{timestamp}] {message}") def show_about(self): about_text = """

多CMS识别工具 (带判断依据)

版本: 2.3.0

功能特点:

  • 显示详细的识别判断依据
  • URL路径特征主动访问验证
  • 并发检测提高效率
  • 核心特征加权识别,准确率高
  • 支持正则表达式测试和验证
  • 可自定义超时时间和重试次数

使用说明: 点击结果中的"查看依据"按钮可查看详细的识别依据

""" QMessageBox.about(self, "关于", about_text) def closeEvent(self, event): if self.detection_thread and self.detection_thread.isRunning(): reply = QMessageBox.question( self, "检测中", "检测仍在进行中,确定要退出吗?", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.detection_thread.stop() event.accept() else: event.ignore() else: event.accept() if __name__ == "__main__": if hasattr(Qt, 'AA_EnableHighDpiScaling'): QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) if hasattr(Qt, 'AA_UseHighDpiPixmaps'): QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) app = QApplication(sys.argv) app.setStyle("Fusion") window = CMSDetectorApp() window.show() sys.exit(app.exec_()) 修改代码提高验证效率 其他无需修改 完整输出