file-type

Pycharm培训专用:Pycharm_training教程解析

ZIP文件

下载需积分: 10 | 8KB | 更新于2025-08-17 | 104 浏览量 | 0 下载量 举报 收藏
download 立即下载
标题中提到的"Pycharm_training"表明这是一个以Pycharm作为培训主题的存储库。Pycharm是由JetBrains公司开发的一款集成开发环境(IDE),专门用于Python语言的开发。它提供了代码编辑、调试、测试等功能,旨在提高开发者的工作效率。由于标题和描述内容重复,可以推断这个存储库的主要目的是为了开展Pycharm相关的培训活动。 重要知识点如下: 1. Pycharm概述:Pycharm是专为Python设计的IDE,它支持多种Python开发任务,包括但不限于代码编写、调试、测试和版本控制。它提供代码补全、语法高亮、错误检查和快速修复、单元测试集成等功能。Pycharm也有社区版和专业版之分,专业版提供更多的高级特性,如Web开发、数据分析和科学计算等。 2. Pycharm的专业功能:对于专业版用户,Pycharm提供了额外的功能,例如对Django和Flask等Web框架的高级支持、对Jupyter Notebook的集成、数据库工具支持以及远程开发能力。这些功能为Python开发者提供了更为全面的开发体验。 3. Pycharm的安装和配置:进行Pycharm培训时,一个重要的知识点就是如何在不同的操作系统上安装Pycharm,如何配置其环境变量,以及如何设置Python解释器。这些基础知识对于确保培训顺利进行和参与者能够正常开始使用Pycharm至关重要。 4. Pycharm项目管理:Pycharm的一个重要方面是其项目管理功能,它可以帮助用户管理项目文件和依赖。培训应该包括如何创建新项目、如何导入现有项目,以及如何利用Pycharm的虚拟环境工具来管理不同项目的依赖。 5. Pycharm的调试工具:调试是开发过程中的一个重要环节,Pycharm的调试工具可以帮助开发者快速定位代码中的错误。培训应当涵盖如何设置断点、如何逐步执行代码以及如何检查变量的值。 6. Pycharm的代码质量检查:Pycharm具备代码质量检查的功能,可以自动发现代码中潜在的问题并提供修改建议。这部分内容应当涉及静态代码分析、代码格式化、重构代码的技巧等。 7. Pycharm的版本控制集成:现代软件开发离不开版本控制系统。Pycharm与Git和Mercurial等版本控制系统有很好的集成。培训应包含如何在Pycharm中初始化版本控制仓库、如何提交更改以及如何解决合并冲突等。 8. Pycharm的单元测试:单元测试是保证代码质量的一个重要环节。培训应讲解如何在Pycharm中编写、运行和管理单元测试,以及如何使用Pycharm提供的测试工具来检查代码覆盖率。 由于没有提供具体的文件列表,仅能根据标题、描述和标签来推测存储库内容。但是,可以肯定的是,一个专注于Pycharm培训的存储库会包括与上述知识点相关的教程、示例代码、练习项目和参考资料。这会帮助培训参与者更好地掌握使用Pycharm进行Python开发的各项技能。

相关推荐

filetype

F:\Anaconda\python.exe F:\Pycharm\pycharm_item\GCN\DGCNN-main\train.py 第 1 折 began training on cpu ... Traceback (most recent call last): File "F:\Pycharm\pycharm_item\GCN\DGCNN-main\train.py", line 220, in <module> sys.exit(main()) ^^^^^^ File "F:\Pycharm\pycharm_item\GCN\DGCNN-main\train.py", line 212, in main acc_test_best = train(train_iter, test_iter, model, criterion, optimizer, num_epochs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\Pycharm\pycharm_item\GCN\DGCNN-main\train.py", line 98, in train output = model(images) ^^^^^^^^^^^^^ File "F:\Anaconda\Lib\site-packages\torch\nn\modules\module.py", line 1553, in _wrapped_call_impl return self._call_impl(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\Anaconda\Lib\site-packages\torch\nn\modules\module.py", line 1562, in _call_impl return forward_call(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\Pycharm\pycharm_item\GCN\DGCNN-main\model.py", line 47, in forward x = self.BN1(x.transpose(1, 2)).transpose(1, 2) #因为第三维 才为特征维度 ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\Anaconda\Lib\site-packages\torch\nn\modules\module.py", line 1553, in _wrapped_call_impl return self._call_impl(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\Anaconda\Lib\site-packages\torch\nn\modules\module.py", line 1562, in _call_impl return forward_call(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\Anaconda\Lib\site-packages\torch\nn\modules\batchnorm.py", line 176, in forward return F.batch_norm( ^^^^^^^^^^^^^ File "F:\Anaconda\Lib\site-packages\torch\nn\functional.py", line 2512, in batch_norm return torch.batch_norm( ^^^^^^^^^^^^^^^^^ RuntimeError: running_mean should contain 8064 elements not 5 进程已结束,退出代码为 1

filetype

import numpy as np import torch import torch.nn as nn import torch.optim as optim import matplotlib.pyplot as plt import matplotlib.colors as mcolors from collections import deque import random import os EPISODES = 5 # 训练轮数 # 修改文件扩展名为TXT DATA_PATH = "/tmp/pycharm_project_79/D3QN-main/mk/Mk01.txt" # ================= 增强版数据集解析 ================= def parse_mk01(file_path): """动态解析MK01文件,包含多层数据校验""" jobs = [] machine_ids = set() try: with open(file_path, 'r') as f: lines = [line.strip() for line in f if line.strip()] # 动态解析机器数 machine_count = 10 # 默认值 if len(lines) > 0: header = list(map(int, lines[0].split()[:2])) # 只取前两个有效数字 job_count = max(header[0], 0) if len(header) > 0 else 0 if len(header) > 1: machine_count = max(machine_count, header[1]) lines = lines[1:] current_idx = 0 for job_id in range(job_count): if current_idx >= len(lines): break # 解析工序数 op_info = lines[current_idx].split() op_count = int(op_info[0]) if op_info else 0 current_idx += 1 operations = [] for _ in range(op_count): if current_idx >= len(lines): break parts = list(map(int, lines[current_idx].split())) current_idx += 1 machines = [] # 动态解析机器-时间对 for i in range(0, len(parts), 2): if i + 1 >= len(parts): break raw_machine = parts[i] duration = max(parts[i + 1], 1) # 确保持续时间有效 # 计算有效机器ID machine = (abs(raw_machine) - 1) % machine_count machine_ids.add(machine) machines.append((machine, duration)) # 确保至少有一个有效机器 if not machines: machine = 0 machines.append((machine, 10)) machine_ids.add(machine) operations.append({"machines": machines}) # 确保至少有一个有效工序 if operations: jobs.append({ "operations": operations, "due_date": None }) # 生成有效机器列表 valid_machines = list(machine_ids) if machine_ids else [0] return { "jobs": jobs if jobs else [{ "operations": [{"machines": [(0, 10)]}], "due_date": 100 }], "machines": valid_machines } except Exception as e: print(f"解析警告:{str(e)},已启用备用数据") return { "jobs": [{ "operations": [{"machines": [(0, 10)]}], "due_date": 100 }], "machines": [0] } # ================= 神经网络架构 ================= class DuelingDQN(nn.Module): def __init__(self, input_dim, output_dim): super().__init__() self.feature = nn.Sequential( nn.Linear(input_dim, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU() ) self.value_stream = nn.Sequential( nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 1) ) self.advantage_stream = nn.Sequential( nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, output_dim) ) def forward(self, x): x = self.feature(x) value = self.value_stream(x) advantage = self.advantage_stream(x) return value + (advantage - advantage.mean(dim=-1, keepdim=True)) # ================= 增强版智能体类定义 ================= class OperationAgent: """工序智能体:带安全保护的机器选择""" def __init__(self, state_dim, action_dim): self.policy_net = DuelingDQN(state_dim, action_dim) self.target_net = DuelingDQN(state_dim, action_dim) self.target_net.load_state_dict(self.policy_net.state_dict()) self.optimizer = optim.Adam(self.policy_net.parameters(), lr=1e-4) self.memory = deque(maxlen=100000) self.batch_size = 128 self.gamma = 0.99 self.epsilon = 1.0 self.epsilon_min = 0.01 self.epsilon_decay = 0.995 def select_action(self, state, valid_actions): try: if not valid_actions: return 0 # 返回默认机器0(需确保其存在) # 添加有效性检查 valid_actions = list(set(valid_actions)) # 去重 if not valid_actions: return 0 if np.random.random() < self.epsilon: return np.random.choice(valid_actions) else: with torch.no_grad(): state_tensor = torch.FloatTensor(state).unsqueeze(0) q_values = self.policy_net(state_tensor) return valid_actions[torch.argmax(q_values[0, valid_actions]).item()] except Exception as e: print(f"动作选择异常: {str(e)}, 使用默认动作") return valid_actions[0] if valid_actions else 0 def update_epsilon(self): self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay) def store_experience(self, state, action, reward, next_state, done): self.memory.append(( torch.FloatTensor(state), torch.LongTensor([action]), torch.FloatTensor([reward]), torch.FloatTensor(next_state), torch.BoolTensor([done]) )) def optimize(self): if len(self.memory) < self.batch_size: return batch = random.sample(self.memory, self.batch_size) states, actions, rewards, next_states, dones = zip(*batch) states = torch.stack(states) actions = torch.stack(actions) rewards = torch.stack(rewards) next_states = torch.stack(next_states) dones = torch.stack(dones) current_q = self.policy_net(states).gather(1, actions) next_q = self.target_net(next_states).max(1)[0].detach() target_q = rewards + (1 - dones.float()) * self.gamma * next_q.unsqueeze(1) loss = nn.MSELoss()(current_q, target_q) self.optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0) self.optimizer.step() def update_target_net(self): self.target_net.load_state_dict(self.policy_net.state_dict()) class QueueAgent: """队列智能体:管理工序优先级排序""" def __init__(self, state_dim): self.policy_net = DuelingDQN(state_dim, 2) # 接受/拒绝 self.optimizer = optim.Adam(self.policy_net.parameters(), lr=1e-4) self.memory = deque(maxlen=50000) self.batch_size = 64 def prioritize_operations(self, state, operations): with torch.no_grad(): state_tensor = torch.FloatTensor(state).unsqueeze(0) priority_scores = self.policy_net(state_tensor).squeeze().numpy() sorted_indices = np.argsort(-priority_scores) return [operations[i] for i in sorted_indices] class MachineAgent: """机器智能体:执行加工决策""" def __init__(self, state_dim): self.policy_net = DuelingDQN(state_dim, 3) # 加工/维护/空闲 self.optimizer = optim.Adam(self.policy_net.parameters(), lr=1e-4) self.memory = deque(maxlen=50000) self.batch_size = 64 def select_action(self, state): with torch.no_grad(): state_tensor = torch.FloatTensor(state).unsqueeze(0) q_values = self.policy_net(state_tensor) return torch.argmax(q_values).item() # ================= 调度环境 ================= class FlexibleJobShopEnv: def __init__(self, config): # 数据校验与修复 self.jobs = [] for job in config.get("jobs", []): valid_ops = [] for op in job.get("operations", []): if len(op.get("machines", [])) > 0: valid_ops.append(op) if valid_ops: self.jobs.append({ "operations": valid_ops, "due_date": job.get("due_date", 100) }) # 确保至少有一个有效作业 if not self.jobs: self.jobs = [{ "operations": [{"machines": [(0, 10)]}], "due_date": 100 }] # 动态生成机器列表 self.machines = list(set( m for job in self.jobs for op in job["operations"] for m, _ in op["machines"] )) or [0] # 其他初始化参数 self.max_steps = 1000 self.job_arrival_rate = 0.1 self.machine_break_prob = 0.02 self.reset() def reset(self): self.current_step = 0 self.schedule = {m: [] for m in self.machines} self.active_jobs = [] self.completed_jobs = [] self.machine_states = { m: { "status": "idle", "current_job": None, "remaining_time": 0 } for m in self.machines } self.event_queue = deque() # 安全初始化作业 try: for _ in range(min(3, len(self.jobs))): self._add_job(random.choice(range(len(self.jobs)))) except: self._add_job(0) def _add_job(self, job_idx): try: job_data = self.jobs[job_idx] job = { "id": len(self.active_jobs), "operations": [op.copy() for op in job_data["operations"]], "current_op": 0, "arrival_time": self.current_step, "due_date": self.current_step + np.random.randint(50, 100) } self.active_jobs.append(job) except: self.active_jobs.append({ "id": len(self.active_jobs), "operations": [{"machines": [(0, 10)]}], "current_op": 0, "arrival_time": self.current_step, "due_date": self.current_step + 100 }) def _get_state(self): """构建全局状态向量""" state = [] # 机器状态特征 for m in self.machines: state += [ 1 if self.machine_states[m]["status"] == "busy" else 0, self.machine_states[m]["remaining_time"] / 100 ] # 作业状态特征 job_features = [] for job in self.active_jobs: job_features += [ (job["due_date"] - self.current_step) / 100, job["current_op"] / len(job["operations"]) ] state += job_features[:10] # 取前5个作业的特征 # 队列状态 state.append(len(self.event_queue) / 20) return np.array(state, dtype=np.float32) def step(self, machine_actions): """执行一个时间步""" reward = 0 done = False # 处理机器动作 for machine, action in machine_actions.items(): state = self.machine_states[machine] if action == 0 and state["status"] == "idle": if self.event_queue: selected_op = self.event_queue.popleft() duration = selected_op["duration"] self.schedule[machine].append({ "job_id": selected_op["job_id"], "op_idx": selected_op["op_idx"], "start": self.current_step, "end": self.current_step + duration }) state.update({ "status": "busy", "current_job": selected_op["job_id"], "remaining_time": duration }) reward += 2.0 # 成功处理奖励 # 更新机器状态 for machine in self.machines: state = self.machine_states[machine] if state["status"] == "busy": state["remaining_time"] -= 1 if state["remaining_time"] <= 0: job_id = state["current_job"] job = next(j for j in self.active_jobs if j["id"] == job_id) job["current_op"] += 1 if job["current_op"] >= len(job["operations"]): self.completed_jobs.append(job) self.active_jobs.remove(job) reward += 10.0 # 完成作业奖励 state.update({"status": "idle", "current_job": None}) # 处理动态事件 self._handle_events() # 计算奖励 reward += self._calculate_utilization_reward() reward -= self._calculate_tardiness_penalty() reward -= len(self.event_queue) * 0.1 # 队列长度惩罚 # 检查终止条件 self.current_step += 1 if self.current_step >= self.max_steps: done = True if len(self.completed_jobs) >= len(self.jobs): done = True reward += 100.0 # 提前完成所有作业奖励 return self._get_state(), reward, done, {} def _calculate_utilization_reward(self): busy_machines = sum(1 for m in self.machines if self.machine_states[m]["status"] == "busy") return (busy_machines / len(self.machines)) * 2.5 def _calculate_tardiness_penalty(self): penalty = 0 for job in self.active_jobs: if self.current_step > job["due_date"]: penalty += (self.current_step - job["due_date"]) * 0.2 return min(penalty, 15.0) # 限制最大惩罚 def _handle_events(self): # 新作业到达 if np.random.poisson(self.job_arrival_rate): self._add_job(random.choice(range(len(self.jobs)))) # 机器故障处理 for machine in self.machines: if self.machine_states[machine]["status"] == "busy": if np.random.rand() < self.machine_break_prob: self.machine_states[machine]["status"] = "break" self.machine_states[machine]["remaining_time"] = np.random.randint(5, 15) def render(self): """可视化当前状态""" plt.figure(figsize=(12, 6)) colors = list(mcolors.TABLEAU_COLORS.values()) # 机器利用率 plt.subplot(1, 2, 1) util = [len(ops) for ops in self.schedule.values()] plt.bar(range(len(self.machines)), util, color=colors[0]) plt.title("Machine Utilization") plt.xlabel("Machine ID") plt.ylabel("Completed Operations") # 作业进度 plt.subplot(1, 2, 2) if self.active_jobs: progress = [ (job["current_op"] / max(len(job["operations"]), 1)) * 100 # 防止除零 for job in self.active_jobs ] plt.bar(range(len(progress)), progress, color=colors[1]) plt.title("Job Progress") plt.xlabel("Job ID") plt.ylabel("Completion (%)") plt.tight_layout() plt.pause(0.01) plt.close() # ================= 训练框架 ================= class Trainer: def __init__(self, env_config): self.env = FlexibleJobShopEnv(env_config) # 动态获取状态维度 self.env.reset() sample_state = self.env._get_state() state_dim = len(sample_state) self.op_agents = [ OperationAgent(state_dim=state_dim, action_dim=10) for _ in range(len(env_config["jobs"])) ] self.queue_agent = QueueAgent(state_dim=state_dim) self.machine_agents = { m: MachineAgent(state_dim=state_dim) for m in env_config["machines"] } def train(self, episodes=5): rewards_history = [] moving_avg = [] for ep in range(episodes): state = self.env.reset() total_reward = 0 done = False while not done: # 工序智能体决策 # 在Trainer的train方法中修改: op_actions = [] for job in self.env.active_jobs: if job["current_op"] < len(job["operations"]): # 确保存在可选机器 machines = job["operations"][job["current_op"]]["machines"] if not machines: machines = [(0, 10)] # 默认机器 valid_machines = [m[0] for m in machines] agent = self.op_agents[job["id"]] action = agent.select_action(state, valid_machines) # 查找对应duration try: duration = next(d for m, d in machines if m == action) except StopIteration: duration = 10 # 默认值 op_actions.append({ "job_id": job["id"], "op_idx": job["current_op"], "machine": action, "duration": duration }) # 队列智能体排序 sorted_ops = self.queue_agent.prioritize_operations(state, op_actions) self.env.event_queue = deque(sorted_ops) # 机器智能体执行 machine_actions = {} for machine in self.env.machines: if self.env.machine_states[machine]["status"] == "idle": action = self.machine_agents[machine].select_action(state) machine_actions[machine] = action # 环境步进 next_state, reward, done, _ = self.env.step(machine_actions) total_reward += reward # 存储经验 for agent in self.op_agents: agent.store_experience(state, action, reward, next_state, done) state = next_state # 优化模型 for agent in self.op_agents: agent.optimize() agent.update_epsilon() if ep % 10 == 0: agent.update_target_net() rewards_history.append(total_reward) moving_avg.append(np.mean(rewards_history[-10:])) print(f"Episode {ep + 1}/{episodes}, Reward: {total_reward:.2f}, Epsilon: {self.op_agents[0].epsilon:.3f}") # 定期保存模型 if ep % 50 == 0: self.save_models(f"checkpoint_ep{ep}") # 训练后可视化 self.plot_training(rewards_history, moving_avg) self.env.render() def save_models(self, path): os.makedirs(path, exist_ok=True) for idx, agent in enumerate(self.op_agents): torch.save(agent.policy_net.state_dict(), f"{path}/op_agent_{idx}.pth") torch.save(self.queue_agent.policy_net.state_dict(), f"{path}/queue_agent.pth") for m in self.env.machines: torch.save(self.machine_agents[m].policy_net.state_dict(), f"{path}/machine_{m}.pth") def plot_training(self, rewards, moving_avg): plt.figure(figsize=(12, 6)) plt.plot(rewards, alpha=0.6, label='Episode Reward') plt.plot(moving_avg, linewidth=2, label='Moving Average (10)') plt.xlabel("Episode") plt.ylabel("Reward") plt.title("Training Progress") plt.legend() plt.savefig("training_progress.png") plt.close() # ================= 主程序 ================= if __name__ == "__main__": # 加载配置 mk_data = parse_mk01(DATA_PATH) # 使用配置好的路径 # 初始化训练器 trainer = Trainer(mk_data) # 开始训练 try: trainer.train(episodes=EPISODES) except KeyboardInterrupt: print("\n训练被用户中断,正在保存当前模型...") trainer.save_models("interrupted_training") except Exception as e: print(f"训练过程中出现异常:{str(e)}") exit(1) print("训练完成!结果已保存至当前目录") 这个代码报错,如何解决?new(): data must be a sequence (got NoneType)

filetype

import time import cv2 import tkinter as tk from tkinter import filedialog, ttk, messagebox from PIL import Image, ImageTk import numpy as np from sklearn.metrics import confusion_matrix, precision_recall_curve, average_precision_score import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import pandas as pd import subprocess import os import random from threading import Thread # 用于训练时不阻塞界面 class ObjectDetectionApp: def __init__(self): self.window = tk.Tk() self.window.title("YOLOv4训练&验证系统") self.window.geometry("1600x1000") # 模式变量(训练/验证/原评估) self.mode = tk.StringVar(value="validation") self.train_running = False # 训练状态标志 # 模型配置(可自定义) self.base_cfg = "yolov4.cfg" # 基础配置文件 self.custom_cfg = "yolov4-custom.cfg" # 训练用自定义配置 self.weights_path = "yolov4.weights" # 默认权重/训练后权重 self.classes_file = "coco.names" self.classes = self.load_classes() self.COLORS = np.random.uniform(0, 255, size=(len(self.classes), 3)) self.voc_root = None # 新增:VOC数据集根路径 # 训练参数 self.train_params = { "epochs": 100, "batch": 8, "subdivisions": 4, "learning_rate": 0.001, "data_file": "custom.data", "names_file": self.classes_file } # 摄像头初始化 self.cap = None self.image_flipped = True # 评估相关变量(验证阶段使用) self.predicted_labels = [] self.ground_truths = [] self.performance_data = {} # 创建界面 self.create_widgets() self.window.mainloop() def create_widgets(self): # -------------------- 顶部模式栏 -------------------- mode_frame = tk.Frame(self.window, padx=10, pady=5) mode_frame.pack(fill=tk.X) tk.Radiobutton(mode_frame, text="模型训练", variable=self.mode, value="training", command=self.switch_mode).pack(side=tk.LEFT, padx=10) tk.Radiobutton(mode_frame, text="实时验证", variable=self.mode, value="validation", command=self.switch_mode).pack(side=tk.LEFT, padx=10) # -------------------- 训练模块控件 -------------------- self.train_frame = tk.Frame(self.window, padx=10, pady=5) # VOC数据集选择按钮 self.select_voc_btn = tk.Button(self.train_frame, text="选择VOC数据集", command=self.select_voc_dataset) self.select_voc_btn.pack(side=tk.LEFT, padx=10) # 训练参数输入 param_frame = tk.Frame(self.train_frame) tk.Label(param_frame, text="迭代次数:").grid(row=0, column=0, padx=5) self.epochs_entry = ttk.Entry(param_frame, width=8) self.epochs_entry.grid(row=0, column=1, padx=5) self.epochs_entry.insert(0, "100") tk.Label(param_frame, text="批次大小:").grid(row=1, column=0, padx=5) self.batch_entry = ttk.Entry(param_frame, width=8) self.batch_entry.grid(row=1, column=1, padx=5) self.batch_entry.insert(0, "8") param_frame.pack(side=tk.LEFT, padx=10) # 数据集加载按钮 self.load_train_btn = tk.Button(self.train_frame, text="加载训练集", command=self.load_train_dataset) self.load_train_btn.pack(side=tk.LEFT, padx=10) self.load_val_btn = tk.Button(self.train_frame, text="加载验证集", command=self.load_val_dataset) self.load_val_btn.pack(side=tk.LEFT, padx=10) # 开始训练按钮(正确顺序:先定义,再布局) self.start_train_btn = tk.Button(self.train_frame, text="开始训练", command=self.start_training_thread) self.start_train_btn.pack(side=tk.LEFT, padx=10) # -------------------- 主显示区域 -------------------- self.main_frame = tk.Frame(self.window) self.main_frame.pack(fill=tk.BOTH, expand=True) # 视频/图像显示区 self.photo_label = tk.Label(self.main_frame, width=1000, height=600) self.photo_label.pack(side=tk.LEFT, padx=10, pady=10) # 右侧信息区 self.right_frame = tk.Frame(self.main_frame, width=400) self.right_frame.pack(side=tk.RIGHT, padx=10, pady=10, fill=tk.Y) # 日志文本框 self.log_text = tk.Text(self.right_frame, width=40, height=15) self.log_text.pack(pady=5, fill=tk.X) self.log_text.insert(tk.END, "系统日志:\n") # 性能报告区 self.report_canvas = None self.report_frame = tk.Frame(self.right_frame) self.report_frame.pack(pady=5, fill=tk.BOTH, expand=True) # 初始显示验证模式 self.switch_mode() def select_voc_dataset(self): """选择VOC数据集根目录并执行划分""" self.voc_root = filedialog.askdirectory(title="选择VOC数据集根目录(如VOCdevkit/VOC2007)") if not self.voc_root: return # 检查所选目录下是否存在JPEGImages和Annotations目录 jpeg_dir = os.path.join(self.voc_root, "JPEGImages") ann_dir = os.path.join(self.voc_root, "Annotations") if not (os.path.isdir(jpeg_dir) and os.path.isdir(ann_dir)): messagebox.showerror("错误", "所选目录不是正确的VOC数据集根目录(缺少JPEGImages或Annotations目录)") return try: # 执行VOC数据集划分(75:25) self.split_voc_dataset(self.voc_root) self.log_text.insert(tk.END, "数据集划分完成!\n") # 自动设置训练/验证集路径到参数中 self.train_params["train_images"] = os.path.join(self.voc_root, "ImageSets/Main/train.txt") self.train_params["val_images"] = os.path.join(self.voc_root, "ImageSets/Main/val.txt") self.log_text.insert(tk.END, f"训练集路径:{self.train_params['train_images']}\n") self.log_text.insert(tk.END, f"验证集路径:{self.train_params['val_images']}\n") except Exception as e: messagebox.showerror("错误", f"数据集划分失败:{str(e)}") self.log_text.insert(tk.END, f"划分错误:{str(e)}\n") def split_voc_dataset(self, voc_root, train_ratio=0.75): """VOC数据集划分核心逻辑(集成到应用中)""" img_dir = os.path.join(voc_root, "JPEGImages") ann_dir = os.path.join(voc_root, "Annotations") sets_dir = os.path.join(voc_root, "ImageSets", "Main") os.makedirs(sets_dir, exist_ok=True) img_files = [f for f in os.listdir(img_dir) if f.lower().endswith((".jpg", ".jpeg", ".png"))] valid_ids = [] for img_file in img_files: img_id = os.path.splitext(img_file)[0] ann_path = os.path.join(ann_dir, f"{img_id}.xml") if os.path.isfile(ann_path): valid_ids.append(img_file) # 保存带扩展名的图像文件名 if not valid_ids: messagebox.showerror("错误", "未找到任何有效标注文件,确保JPEGImages和Annotations目录中的文件一一对应(除扩展名外文件名相同)") return random.shuffle(valid_ids) total = len(valid_ids) train_count = int(total * train_ratio) train_ids = valid_ids[:train_count] val_ids = valid_ids[train_count:] train_txt = os.path.join(sets_dir, "train.txt") val_txt = os.path.join(sets_dir, "val.txt") # 写入完整路径(如:D:/pycharm/pythonProject/VOCdevkit/VOC2007/JPEGImages/1.jpg) with open(train_txt, "w") as f: for img_file in train_ids: img_full_path = os.path.join(voc_root, "JPEGImages", img_file) f.write(img_full_path + '\n') with open(val_txt, "w") as f: for img_file in val_ids: img_full_path = os.path.join(voc_root, "JPEGImages", img_file) f.write(img_full_path + '\n') self.log_text.insert(tk.END, f"总样本数: {total},训练集: {len(train_ids)},验证集: {len(val_ids)}\n") def switch_mode(self): """模式切换逻辑""" current_mode = self.mode.get() # 关闭摄像头 if self.cap: self.cap.release() self.cap = None self.photo_label.config(image=None) # 清空界面 self.log_text.delete(1.0, tk.END) self.log_text.insert(tk.END, "系统日志:\n") if current_mode == "training": self.train_frame.pack(fill=tk.X, pady=5) self.report_frame.pack_forget() self.log_text.insert(tk.END, "切换到训练模式\n") else: # 验证模式 self.train_frame.pack_forget() self.report_frame.pack(fill=tk.BOTH, expand=True) self.cap = cv2.VideoCapture(0) # 打开摄像头 self.update_validation_frame() # 启动实时验证 self.log_text.insert(tk.END, "切换到实时验证模式\n") def load_train_dataset(self): """加载训练数据集(图像+标注)""" img_dir = filedialog.askdirectory(title="选择训练图像文件夹") label_dir = filedialog.askdirectory(title="选择训练标注文件夹") if img_dir and label_dir: self.train_params["train_images"] = img_dir self.train_params["train_labels"] = label_dir self.log_text.insert(tk.END, f"训练集加载完成:{img_dir}\n") def load_val_dataset(self): """加载验证数据集""" img_dir = filedialog.askdirectory(title="选择验证图像文件夹") label_dir = filedialog.askdirectory(title="选择验证标注文件夹") if img_dir and label_dir: self.train_params["val_images"] = img_dir self.train_params["val_labels"] = label_dir self.log_text.insert(tk.END, f"验证集加载完成:{img_dir}\n") def start_training_thread(self): """启动训练线程(防止界面阻塞)""" if self.train_running: messagebox.showwarning("提示", "训练已在进行中") return self.train_running = True Thread(target=self.start_training, daemon=True).start() def start_training(self): """执行训练流程(调用Darknet命令)""" try: # 生成训练配置文件 self.generate_training_config() # 训练命令(示例,根据实际Darknet路径调整) cmd = [ "darknet.exe", "detector", "train", self.train_params["data_file"], self.custom_cfg, "yolov4.conv.137", # 预训练权重 "-map", # 计算mAP "-gpus", "0", "-batch", self.batch_entry.get(), "-subdivisions", "4", "-epochs", self.epochs_entry.get() ] self.log_text.insert(tk.END, "开始训练...\n") process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) # 实时输出训练日志 for line in process.stdout: self.log_text.insert(tk.END, line) self.log_text.see(tk.END) # 自动滚动 process.wait() self.train_running = False self.log_text.insert(tk.END, "训练完成!最佳权重已保存\n") # 训练完成后生成性能报告 self.generate_training_report() except Exception as e: self.log_text.insert(tk.END, f"训练错误:{str(e)}\n") self.train_running = False def generate_training_config(self): """生成自定义训练配置文件""" with open(self.base_cfg, 'r') as f: cfg_lines = f.readlines() # 修改批次和subdivisions for i, line in enumerate(cfg_lines): if line.startswith("batch="): cfg_lines[i] = f"batch={self.batch_entry.get()}\n" if line.startswith("subdivisions="): cfg_lines[i] = "subdivisions=4\n" with open(self.custom_cfg, 'w') as f: f.writelines(cfg_lines) # 生成data文件,指定训练和验证的图像列表路径 data_content = f""" train = {os.path.join(self.voc_root, "ImageSets/Main/train.txt")} # 训练集图像列表 valid = {os.path.join(self.voc_root, "ImageSets/Main/val.txt")} # 验证集图像列表 names = {self.classes_file} backup = backup/ eval = coco """ with open(self.train_params["data_file"], 'w') as f: f.write(data_content) def generate_training_report(self): """生成训练性能报告""" # 假设从训练日志或结果文件中读取数据 # 这里模拟加载验证集结果 y_true = ["car", "person", "car", "bike"] y_pred = ["car", "person", "bike", "bike"] confidences = [0.92, 0.85, 0.78, 0.91] # 计算指标 cm = confusion_matrix(y_true, y_pred, labels=self.classes) precision, recall, _ = precision_recall_curve(y_true, confidences, pos_label="car") ap = average_precision_score([1 if x == "car" else 0 for x in y_true], confidences) # 绘制报告 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) # 混淆矩阵 ax1.imshow(cm, cmap=plt.cm.Blues) ax1.set_title("验证集混淆矩阵") ax1.set_xticks(range(len(self.classes))) ax1.set_xticklabels(self.classes, rotation=45) ax1.set_yticks(range(len(self.classes))) ax1.set_yticklabels(self.classes) # PR曲线 ax2.plot(recall, precision) ax2.set_title(f"PR曲线 (AP={ap:.2f})") ax2.set_xlabel("召回率") ax2.set_ylabel("精确率") # 在界面显示 if self.report_canvas: self.report_canvas.get_tk_widget().destroy() self.report_canvas = FigureCanvasTkAgg(fig, master=self.report_frame) self.report_canvas.draw() self.report_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def update_validation_frame(self): """实时验证摄像头画面""" if not self.cap or not self.mode.get() == "validation": return ret, frame = self.cap.read() if ret: if self.image_flipped: frame = cv2.flip(frame, 1) # 使用训练后的权重进行检测 frame, results = self.detect_objects(frame) # 显示结果 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(frame_rgb) tk_img = ImageTk.PhotoImage(image=pil_img) self.photo_label.config(image=tk_img) self.photo_label.image = tk_img # 记录验证结果(可选) self.record_validation_results(results) self.window.after(30, self.update_validation_frame) def detect_objects(self, frame): """使用当前权重进行目标检测""" # 改为使用实例变量 self.net 存储网络 self.net = cv2.dnn.readNetFromDarknet( self.custom_cfg if self.mode.get() == "training" else self.base_cfg, self.weights_path ) # 使用 self.net 获取层信息 layer_names = self.net.getLayerNames() output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()] # 图像预处理 height, width = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False) # 前向传播(使用 self.net) self.net.setInput(blob) outs = self.net.forward(output_layers) # 注意:output_layers 已提前计算 # 后续代码保持不变... class_ids = [] confidences = [] boxes = [] for out in outs: for detection in out: scores = detection[5:] class_id = np.argmax(scores) confidence = scores[class_id] if confidence > 0.5: # 置信度阈值 center_x = int(detection[0] * width) center_y = int(detection[1] * height) w = int(detection[2] * width) h = int(detection[3] * height) # 计算边界框坐标 x = int(center_x - w / 2) y = int(center_y - h / 2) boxes.append([x, y, w, h]) confidences.append(float(confidence)) class_ids.append(class_id) # 非极大值抑制 indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4) results = [] for i in range(len(boxes)): if i in indexes: x, y, w, h = boxes[i] label = str(self.classes[class_ids[i]]) confidence = confidences[i] results.append((label, x, y, x + w, y + h, confidence)) # 绘制边界框和标签 color = self.COLORS[class_ids[i]] cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) cv2.putText(frame, f"{label}: {confidence:.2f}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return frame, results def record_validation_results(self, results): """记录验证结果(预测标签和置信度)""" # 从results中提取预测的标签(假设results格式为:(label, x1, y1, x2, y2, confidence)) for result in results: predicted_label = result[0] # 标签 confidence = result[5] # 置信度 self.predicted_labels.append(predicted_label) # 如果有真实标签(例如从验证集标注文件读取),可添加类似逻辑: # self.ground_truths.append(ground_truth_label) def load_classes(self): with open(self.classes_file, "r") as f: return [line.strip() for line in f.readlines()] def close_app(self): if self.cap: self.cap.release() self.window.destroy() if __name__ == "__main__": app = ObjectDetectionApp() 代码检测

filetype

import os import re import glob import numpy as np import pandas as pd import matplotlib.pyplot as plt from pyproj import Transformer from sklearn.preprocessing import StandardScaler import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.utils.data import random_split plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # ============================== # 1. 增强型文件加载器 # ============================== class EnhancedTropoLoader: def __init__(self, data_root): self.data_root = data_root self.transformer = Transformer.from_crs("EPSG:4978", "EPSG:4326") self.site_cache = {} self.feature_names = [ 'trotot', 'tgntot', 'tgetot', 'stddev', 'lat', 'lon', 'alt', 'hour' ] def _parse_site_code(self, filename): """改进版站点代码解析,支持多种格式""" patterns = [ r"_([A-Z]{4})\d{2}[A-Z]{3}_TRO\.TRO$", # ABMF00GLP → ABMF r"_([A-Z]{4}\d{2})[A-Z]{3}_TRO\.TRO$", # AC2300USA → AC23 r"_([A-Z]{4})00([A-Z]{3})_TRO\.TRO$", # ABPO00MDG → ABPO r"_([A-Z]{4})_TRO\.TRO$" # ABPO_TRO.TRO → ABPO ] for pattern in patterns: match = re.search(pattern, filename) if match: code = match.group(1) # 清理尾部数字(如果存在) return re.sub(r'\d{2}$', '', code) if len(code) > 4 else code return None def _parse_file(self, file_path): """解析单个文件""" try: # 获取站点代码 filename = os.path.basename(file_path) site_code = self._parse_site_code(filename) if not site_code: print(f"跳过无法解析站点的文件: {file_path}") return None # 读取坐标 coordinates = self._get_coordinates(file_path) if not coordinates: print(f"跳过无有效坐标的文件: {file_path}") return None # 坐标转换 lat, lon, alt = self.transformer.transform( coordinates['x'], coordinates['y'], coordinates['z'] ) if None in (lat, lon, alt): return None # 读取观测数据 records = self._read_observations(file_path, site_code) if len(records) < 10: print(f"跳过数据不足的文件: {file_path}") return None # 创建DataFrame df = pd.DataFrame(records) df['lat'] = lat df['lon'] = lon df['alt'] = alt return df except Exception as e: print(f"文件解析失败 [{file_path}]: {str(e)}") return None def _get_coordinates(self, file_path): """获取站点坐标""" if file_path in self.site_cache: return self.site_cache[file_path] coordinates = None try: with open(file_path, 'r') as f: current_section = None for line in f: line = line.strip() if line.startswith('+'): current_section = line[1:] elif line.startswith('-'): current_section = None elif current_section == 'TROP/STA_COORDINATES' and not line.startswith('*'): parts = line.split() if len(parts) >= 7: coordinates = { 'x': float(parts[4]), 'y': float(parts[5]), 'z': float(parts[6]) } break except Exception as e: print(f"坐标解析失败: {str(e)}") self.site_cache[file_path] = coordinates return coordinates def _read_observations(self, file_path, site_code): """读取观测数据""" records = [] try: with open(file_path, 'r') as f: current_section = None for line in f: line = line.strip() if line.startswith('+'): current_section = line[1:] elif line.startswith('-'): current_section = None elif current_section == 'TROP/SOLUTION' and not line.startswith('*'): parts = line.split() if len(parts) >= 7: records.append({ 'epoch': parts[1], 'trotot': float(parts[2]), 'stddev': float(parts[3]), 'tgntot': float(parts[4]), 'tgetot': float(parts[6]), 'site': site_code }) except Exception as e: print(f"观测数据读取失败: {str(e)}") return records def load_all_data(self): """加载所有数据""" all_dfs = [] for file_path in glob.glob(os.path.join(self.data_root, '**', '*.TRO'), recursive=True): df = self._parse_file(file_path) if df is not None: all_dfs.append(df) print(f"成功加载: {file_path} 记录数: {len(df)}") return pd.concat(all_dfs) if all_dfs else pd.DataFrame() # ============================== # 2. 时间序列数据集 # ============================== class TemporalDataset(Dataset): def __init__(self, data, window_size=6): self.window_size = window_size self.site_to_id = {site: idx for idx, site in enumerate(data['site'].unique())} # 按站点和时间排序 data = data.sort_values(['site', 'time']) # 生成序列 self.sequences = [] self.targets = [] self.site_labels = [] self.timestamps = [] for site, group in data.groupby('site'): values = group[self.feature_names].values times = group['time'].values unix_times = (times.astype(np.datetime64) - np.datetime64('1970-01-01T00:00:00')) / np.timedelta64(1, 's') for i in range(len(values) - self.window_size): self.sequences.append(values[i:i + self.window_size]) self.targets.append(values[i + self.window_size][0]) self.site_labels.append(self.site_to_id[site]) self.timestamps.append(unix_times[i + self.window_size]) self.num_samples = len(self.sequences) def __len__(self): return self.num_samples def __getitem__(self, idx): # 添加高斯噪声增强 noise = torch.randn(self.window_size, len(self.feature_names)) * 0.01 return ( torch.FloatTensor(self.sequences[idx]) + noise, torch.FloatTensor([self.targets[idx]]), torch.tensor(self.site_labels[idx], dtype=torch.long), torch.FloatTensor([self.timestamps[idx]]) ) # ============================== # 3. 改进的LSTM模型 # ============================== class EnhancedLSTM(nn.Module): def __init__(self, input_size, num_sites, hidden_size=128): super().__init__() self.embedding = nn.Embedding(num_sites, 16) self.lstm = nn.LSTM( input_size, hidden_size, num_layers=3, bidirectional=True, batch_first=True, dropout=0.4 ) self.attention = nn.Sequential( nn.Linear(hidden_size * 2, 32), nn.Tanh(), nn.Linear(32, 1), nn.Softmax(dim=1) ) self.regressor = nn.Sequential( nn.Linear(hidden_size * 2 + 16, 64), nn.LayerNorm(64), nn.ReLU(), nn.Dropout(0.3), nn.Linear(64, 32), nn.LayerNorm(32), nn.ReLU(), nn.Linear(32, 1) ) def forward(self, x, site_ids): lstm_out, _ = self.lstm(x) attn_weights = self.attention(lstm_out) context = torch.sum(attn_weights * lstm_out, dim=1) site_emb = self.embedding(site_ids) combined = torch.cat([context, site_emb], dim=1) return self.regressor(combined) # ============================== # 4. 训练管理器 # ============================== class TrainingManager: def __init__(self, data_root): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.loader = EnhancedTropoLoader(data_root) self.scaler = StandardScaler() def _preprocess(self, raw_df): """数据预处理""" # 时间解析 raw_df['time'] = raw_df['epoch'].apply( lambda x: pd.to_datetime( f"20{x.split(':')[0]}-{x.split(':')[1]}", format='%Y-%j' ) + pd.to_timedelta(int(x.split(':')[2]), unit='s') ) raw_df = raw_df.dropna(subset=['time']) # 特征工程 raw_df['hour'] = raw_df['time'].dt.hour raw_df['doy_sin'] = np.sin(2 * np.pi * raw_df['time'].dt.dayofyear / 365) raw_df['doy_cos'] = np.cos(2 * np.pi * raw_df['time'].dt.dayofyear / 365) # 标准化 raw_df[self.loader.feature_names] = self.scaler.fit_transform( raw_df[self.loader.feature_names] ) return raw_df def train(self, window_size=6, epochs=200, batch_size=64): # 加载数据 raw_df = self.loader.load_all_data() if raw_df.empty: raise ValueError("未加载到有效数据") # 预处理 processed_df = self._preprocess(raw_df) # 创建数据集 full_dataset = TemporalDataset(processed_df, window_size) print(f"数据集样本数量: {len(full_dataset)}") # 划分数据集 train_size = int(0.8 * len(full_dataset)) test_size = len(full_dataset) - train_size train_dataset, test_dataset = random_split( full_dataset, [train_size, test_size], generator=torch.Generator().manual_seed(42) ) # 初始化模型 model = EnhancedLSTM( input_size=len(self.loader.feature_names), num_sites=len(full_dataset.site_to_id), hidden_size=128 ).to(self.device) # 训练配置 optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5) scheduler = optim.lr_scheduler.OneCycleLR( optimizer, max_lr=1e-3, steps_per_epoch=len(train_loader), epochs=epochs ) criterion = nn.MSELoss() # 训练循环 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) best_loss = float('inf') history = {'train': [], 'val': []} for epoch in range(epochs): model.train() train_loss = 0 for seq, target, site, _ in train_loader: seq = seq.to(self.device) target = target.to(self.device) site = site.to(self.device) optimizer.zero_grad() pred = model(seq, site) loss = criterion(pred, target) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() train_loss += loss.item() scheduler.step() # 验证 model.eval() val_loss = 0 with torch.no_grad(): val_loader = DataLoader(test_dataset, batch_size=256) for seq, target, site, _ in val_loader: seq = seq.to(self.device) target = target.to(self.device) site = site.to(self.device) pred = model(seq, site) val_loss += criterion(pred, target).item() avg_train = train_loss / len(train_loader) avg_val = val_loss / len(val_loader) history['train'].append(avg_train) history['val'].append(avg_val) # 保存最佳模型 if avg_val < best_loss: best_loss = avg_val torch.save(model.state_dict(), 'best_model.pth') print(f"Epoch {epoch + 1:03d} | Train Loss: {avg_train:.4f} | Val Loss: {avg_val:.4f}") return model, history def evaluate(self, model, output_dir='results'): """评估与结果保存""" os.makedirs(output_dir, exist_ok=True) # 重新加载数据 raw_df = self.loader.load_all_data() processed_df = self._preprocess(raw_df) full_dataset = TemporalDataset(processed_df, window_size=6) # 预测 model.eval() results = [] with torch.no_grad(): test_loader = DataLoader(full_dataset, batch_size=256) for seq, target, site, timestamp in test_loader: seq = seq.to(self.device) site = site.to(self.device) pred = model(seq, site).cpu().numpy().flatten() true = target.numpy().flatten() times = pd.to_datetime(timestamp.numpy().flatten(), unit='s') for p, t, s, ts in zip(pred, true, site, times): results.append({ 'site': list(full_dataset.site_to_id.keys())[s], 'timestamp': ts, 'true': t, 'pred': p }) # 反标准化 result_df = pd.DataFrame(results) dummy = np.zeros((len(result_df), len(self.loader.feature_names))) dummy[:, 0] = result_df['true'] result_df['true'] = self.scaler.inverse_transform(dummy)[:, 0] dummy[:, 0] = result_df['pred'] result_df['pred'] = self.scaler.inverse_transform(dummy)[:, 0] # 保存结果 self._save_results(result_df, output_dir) return result_df def _save_results(self, df, output_dir): """保存结果和可视化""" # 按站点保存 for site, group in df.groupby('site'): site_dir = os.path.join(output_dir, site) os.makedirs(site_dir, exist_ok=True) # 保存数据 csv_path = os.path.join(site_dir, f'{site}_predictions.csv') group.to_csv(csv_path, index=False) # 生成可视化 self._plot_predictions(group, site, site_dir) # 保存汇总 df.to_csv(os.path.join(output_dir, 'all_predictions.csv'), index=False) print(f"结果已保存至 {output_dir}") def _plot_predictions(self, data, site, save_dir): """生成可视化图表""" plt.figure(figsize=(16, 9)) plt.plot(data['timestamp'], data['true'], label='真实值', linewidth=1.5) plt.plot(data['timestamp'], data['pred'], label='预测值', linestyle='--', alpha=0.8) plt.title(f'站点 {site} 对流层延迟预测 (MAE: {np.mean(np.abs(data["true"] - data["pred"])):.2f}mm)') plt.xlabel('时间') plt.ylabel('延迟量 (mm)') plt.legend() plt.grid(True) plt.gcf().autofmt_xdate() plot_path = os.path.join(save_dir, f'{site}_comparison.png') plt.savefig(plot_path, dpi=300, bbox_inches='tight') plt.close() # ============================== # 主程序 # ============================== if __name__ == "__main__": try: trainer = TrainingManager(data_root='./data') model, history = trainer.train(epochs=200) results = trainer.evaluate(model) # 生成统计报告 report = results.groupby('site').apply(lambda x: pd.Series({ 'MAE(mm)': np.mean(np.abs(x['true'] - x['pred'])), 'Max_True': x['true'].max(), 'Min_True': x['true'].min(), 'Max_Pred': x['pred'].max(), 'Min_Pred': x['pred'].min(), 'Samples': len(x) })).reset_index() print("\n站点预测性能报告:") print(report.to_markdown(index=False)) # 绘制训练曲线 plt.figure(figsize=(12, 6)) plt.plot(history['train'], label='训练损失') plt.plot(history['val'], label='验证损失') plt.title('训练过程') plt.xlabel('Epoch') plt.ylabel('MSE Loss') plt.legend() plt.savefig('training_history.png', bbox_inches='tight') except Exception as e: print(f"运行出错: {str(e)}") "D:\Pycharm 2024\idle\pythonProject1\.venv\Scripts\python.exe" D:\idle\test-lstm\LSTM_TROP_TEST.py 成功加载: ./data\IGS0OPSFIN_20250010000_01D_05M_ABMF00GLP_TRO.TRO 记录数: 288 跳过无法解析站点的文件: ./data\IGS0OPSFIN_20250010000_01D_05M_AC2300USA_TRO.TRO 成功加载: ./data\IGS0OPSFIN_20250020000_01D_05M_ABMF00GLP_TRO.TRO 记录数: 288 成功加载: ./data\IGS0OPSFIN_20250020000_01D_05M_ABPO00MDG_TRO.TRO 记录数: 288 运行出错: 'TemporalDataset' object has no attribute 'feature_names' 进程已结束,退出代码为 0

filetype

C:\Users\hp\PycharmProjects\PythonProject\.venv\Scripts\python.exe "D:\PyCharm 2025.1.1.1\PythonProject\ChineseWrite\chinese_rec.py" 2025-05-31 20:05:14.516144: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`. 2025-05-31 20:05:15.196581: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`. train Begin training ./data/train/03755 WARNING:tensorflow:From D:\PyCharm 2025.1.1.1\PythonProject\ChineseWrite\chinese_rec.py:19: The name tf.disable_v2_behavior is deprecated. Please use tf.compat.v1.disable_v2_behavior instead. WARNING:tensorflow:From C:\Users\hp\PycharmProjects\PythonProject\.venv\Lib\site-packages\tensorflow\python\compat\v2_compat.py:98: disable_resource_variables (from tensorflow.python.ops.resource_variables_toggle) is deprecated and will be removed in a future version. Instructions for updating: non-resource variables are not supported in the long term Traceback (most recent call last): File "D:\PyCharm 2025.1.1.1\PythonProject\ChineseWrite\chinese_rec.py", line 384, in <module> main(None) # 直接调用主函数 ^^^^^^^^^^ File "D:\PyCharm 2025.1.1.1\PythonProject\ChineseWrite\chinese_rec.py", line 351, in main train() File "D:\PyCharm 2025.1.1.1\PythonProject\ChineseWrite\chinese_rec.py", line 148, in train train_feeder = DataIterator(data_dir='./data/train/') ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\PyCharm 2025.1.1.1\PythonProject\ChineseWrite\chinese_rec.py", line 62, in __init__ self.labels = [int(file_name[len(data_dir):].split(os.sep)[0]) for file_name in self.image_names] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ValueError: invalid literal for int() with base 10: '去_9.png'

filetype

这是目前的代码from rdkit import Chem from rdkit.Chem import AllChem import torch from torch_geometric.data import Data from rdkit.Chem import Descriptors from rdkit.Chem.rdmolfiles import MolToXYZFile def calc_descriptors(mol): return { "MW": Descriptors.MolWt(mol), # 分子量 "LogP": Descriptors.MolLogP(mol), # 脂水分配系数 "HBD": Descriptors.NumHDonors(mol), # 氢键供体 "HBA": Descriptors.NumHAcceptors(mol), # 氢键受体 "TPSA": Descriptors.TPSA(mol), # 极性表面积 "RotBonds": Descriptors.NumRotatableBonds(mol) # 可旋转键 } def smi_to_graph(smi_file): """将.smi文件转换为分子图数据集""" data_list = [] with open(smi_file) as f: for line in f: smi, name = line.strip().split('\t') # 假设格式: SMILES\tName # 转换为分子对象 mol = Chem.MolFromSmiles(smi) if mol is None: continue # 添加氢原子并生成3D构象 mol = Chem.AddHs(mol) AllChem.EmbedMolecule(mol, randomSeed=42) # 原子特征矩阵 atom_features = [] for atom in mol.GetAtoms(): features = [ atom.GetAtomicNum(), # 原子序数 atom.GetDegree(), # 连接度 atom.GetHybridization().real, # 杂化类型 atom.GetFormalCharge(), # 形式电荷 atom.GetIsAromatic() # 芳香性 ] atom_features.append(features) x = torch.tensor(atom_features, dtype=torch.float) # 边索引和边特征 edge_index = [] edge_attr = [] for bond in mol.GetBonds(): i = bond.GetBeginAtomIdx() j = bond.GetEndAtomIdx() edge_index.append([i, j]) edge_index.append([j, i]) # 无向图需双向添加 bond_features = [ bond.GetBondTypeAsDouble(), # 键类型 bond.IsInRing(), # 是否成环 bond.GetIsConjugated() # 共轭性 ] edge_attr.append(bond_features) edge_attr.append(bond_features) # 对应双向边 edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous() edge_attr = torch.tensor(edge_attr, dtype=torch.float) data_list.append(Data( x=x, edge_index=edge_index, edge_attr=edge_attr, name=name, smiles=smi )) return data_list # 转换.smi文件 dataset = smi_to_graph(r"D:\PyCharm 2025.2.0.1\project\分子数据.smi") import torch.nn.functional as F from torch_geometric.nn import GCNConv, global_mean_pool class GNNPredictor(torch.nn.Module): def __init__(self, node_dim, hidden_dim=128, out_dim=1): super().__init__() self.conv1 = GCNConv(node_dim, hidden_dim) self.conv2 = GCNConv(hidden_dim, hidden_dim) self.fc = torch.nn.Sequential( torch.nn.Linear(hidden_dim, 64), torch.nn.ReLU(), torch.nn.Linear(64, out_dim) ) def forward(self, data): x, edge_index, batch = data.x, data.edge_index, data.batch # 图卷积层 x = F.relu(self.conv1(x, edge_index)) x = F.dropout(x, p=0.3, training=self.training) x = F.relu(self.conv2(x, edge_index)) # 全局池化 x = global_mean_pool(x, batch) # 全连接层 return self.fc(x) from torch_geometric.loader import DataLoader from sklearn.model_selection import train_test_split # 添加标签(示例:载药量) #for data in dataset: # data.y = torch.tensor([get_loading(data.smiles)], dtype=torch.float) # 数据集划分 train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42) train_loader = DataLoader(train_data, batch_size=32, shuffle=True) test_loader = DataLoader(test_data, batch_size=32, shuffle=False) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model = GNNPredictor(node_dim=5).to(device) optimizer = torch.optim.Adam(model.parameters(), lr=0.001) criterion = torch.nn.MSELoss() for epoch in range(100): model.train() total_loss = 0 for batch in train_loader: batch = batch.to(device) optimizer.zero_grad() pred = model(batch).view(-1) loss = criterion(pred, batch.y) loss.backward() optimizer.step() total_loss += loss.item() print(f"Epoch {epoch}, Loss: {total_loss / len(train_loader):.4f}")请帮我改正

filetype

(venv) E:\pycharm\study\dorm_face_recognition>python main.py E:\pycharm\study\venv\lib\site-packages\onnxruntime\capi\onnxruntime_inference_collection.py:69: UserWarning: Specified provider ‘CUDAExecutionProvider’ is not in available provider names.Available providers: ‘AzureExecutionProvider, CPUExecutionProvider’ warnings.warn( Applied providers: [‘CPUExecutionProvider’], with options: {‘CPUExecutionProvider’: {}} find model: C:\Users\27731/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 [‘None’, 3, 192, 192] 0.0 1.0 Applied providers: [‘CPUExecutionProvider’], with options: {‘CPUExecutionProvider’: {}} find model: C:\Users\27731/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 [‘None’, 3, 192, 192] 0.0 1.0 Applied providers: [‘CPUExecutionProvider’], with options: {‘CPUExecutionProvider’: {}} find model: C:\Users\27731/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, ‘?’, ‘?’] 127.5 128.0 Applied providers: [‘CPUExecutionProvider’], with options: {‘CPUExecutionProvider’: {}} find model: C:\Users\27731/.insightface\models\buffalo_l\genderage.onnx genderage [‘None’, 3, 96, 96] 0.0 1.0 Applied providers: [‘CPUExecutionProvider’], with options: {‘CPUExecutionProvider’: {}} find model: C:\Users\27731/.insightface\models\buffalo_l\w600k_r50.onnx recognition [‘None’, 3, 112, 112] 127.5 127.5 set det-size: (640, 640) 模型初始化失败: ‘FaceAnalysis’ object has no attribute ‘load_model’ 2025-06-24 12:37:00,165 - INFO - MTCNN 检测器初始化完成 2025-06-24 12:37:00,581 - INFO - FaceNet 特征提取器初始化完成 分类器加载成功,成员: chenguanxi, huge, wuyanzu, zunlong 2025-06-24 12:37:21,208 - ERROR - 重新训练失败: ‘FaceRecognition’ object has no attribute ‘training_data’ 识别图片没有输出结果,给出完整的程序和步骤

filetype

# -*- coding: utf-8 -*- # @Time : 2021/6/17 20:29 # @Author : dejahu # @Email : [email protected] # @File : train_mobilenet.py # @Software: PyCharm # @Brief : mobilenet模型训练代码,训练的模型会保存在models目录下,折线图会保存在results目录下 import tensorflow as tf import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import classification_report, confusion_matrix import seaborn as sns from time import time # 数据集加载函数 def data_load(data_dir, test_data_dir, img_height, img_width, batch_size): train_ds = tf.keras.preprocessing.image_dataset_from_directory( data_dir, label_mode='categorical', seed=123, image_size=(img_height, img_width), batch_size=batch_size) val_ds = tf.keras.preprocessing.image_dataset_from_directory( test_data_dir, label_mode='categorical', seed=123, image_size=(img_height, img_width), batch_size=batch_size) class_names = train_ds.class_names return train_ds, val_ds, class_names # 构建MobileNet模型 def model_load(IMG_SHAPE=(224, 224, 3), class_num=15): base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet') base_model.trainable = False model = tf.keras.models.Sequential([ tf.keras.layers.experimental.preprocessing.Rescaling(1. / 127.5, offset=-1, input_shape=IMG_SHAPE), base_model, tf.keras.layers.GlobalAveragePooling2D(), tf.keras.layers.Dense(class_num, activation='softmax') ]) model.summary() model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) return model # 绘制训练过程中的损失和准确率 def show_loss_acc(history): acc = history.history['accuracy'] val_acc = history.history['val_accuracy'] loss = history.history['loss'] val_loss = history.history['val_loss'] plt.figure(figsize=(8, 8)) 输出中文类别名

kolten
  • 粉丝: 58
上传资源 快速赚钱