活动介绍

# F5隐写方法的模拟实现 def f5_hide(image_data, secret_message): # 量化矩阵 QUANTIZATION_MATRIX = np.array([[16, 11, 10, 16, 24, 40, 51, 61], [12, 12, 14, 19, 26, 58, 60, 55], [14, 13, 16, 24, 40, 57, 69, 56], [14, 17, 22, 29, 51, 87, 80, 62], [18, 22, 37, 56, 68, 109, 103, 77], [24, 35, 55, 64, 81, 104, 113, 92], [49, 64, 78, 87, 103, 121, 120, 101], [72, 92, 95, 98, 112, 100, 103, 99]]) # 将秘密信息转换为二进制字符串 secret_bits = ''.join(format(ord(char), '08b') for char in secret_message) # 转换为YUV色彩空间 yuv_image = cv2.cvtColor(image_data, cv2.COLOR_BGR2YCrCb) y_channel, u_channel, v_channel = cv2.split(yuv_image) # 对Y通道进行分块DCT变换 h, w = y_channel.shape block_size = 8 bit_idx = 0 # 为了增加安全性,打乱DCT块的处理顺序(洗牌过程) blocks = [(i, j) for i in range(0, h, block_size) for j in range(0, w, block_size)] random.shuffle(blocks) for (i, j) in blocks: block = y_channel[i:i+block_size, j:j+block_size] if block.shape == (block_size, block_size): dct_block = cv2.dct(block.astype(np.float32)) dct_block = np.round(dct_block / QUANTIZATION_MATRIX) # 嵌入秘密信息 for x in range(block_size): for y in range(block_size): # 跳过直流系数(即dct_block[0, 0]) if x == 0 and y == 0: continue if bit_idx < len(secret_bits) and dct_block[x, y] != 0: # 使用伴随比特翻转的方法嵌入秘密信息 if (int(dct_block[x, y]) % 2) != int(secret_bits[bit_idx]): if dct_block[x, y] > 0: dct_block[x, y] -= 1 else: dct_block[x, y] += 1 bit_idx += 1 if bit_idx >= len(secret_bits): break # 反量化并进行逆DCT dct_block = dct_block * QUANTIZATION_MATRIX block = cv2.idct(dct_block) y_channel[i:i+block_size, j:j+block_size] = block # 合并Y、U、V通道 yuv_image = cv2.merge([y_channel, u_channel, v_channel]) encoded_image_data = cv2.cvtColor(yuv_image, cv2.COLOR_YCrCb2BGR) return encoded_image_data将这个转换为c++代码void f5EmbedColorImage(cv::Mat& image, const std::string& secret_message)

时间: 2025-07-12 19:45:35 浏览: 17
<think>好的,我现在需要将用户提供的Python F5隐写代码转换为C++的void f5EmbedColorImage(cv::Mat& image, const std::string& secret_message)函数。首先,我得仔细理解Python代码的每个步骤,然后考虑如何在C++中实现相同的功能,同时注意OpenCV在C++中的不同用法。 首先,Python代码的主要步骤包括: 1. 定义量化矩阵。 2. 将秘密信息转换为二进制字符串。 3. 转换图像到YUV颜色空间(实际用的是YCrCb)。 4. 对Y通道进行分块DCT变换。 5. 打乱块的处理顺序。 6. 在每个块中嵌入秘密信息。 7. 反量化和逆DCT。 8. 合并通道并转换回BGR。 接下来,我需要逐一处理这些步骤,考虑C++中可能的问题。 首先,量化矩阵在Python中是numpy数组,C++中可以用cv::Mat来存储。需要初始化一个8x8的矩阵,数据类型应该是CV_32F或类似,因为后面会有浮点运算。 然后,将秘密信息转换为二进制字符串。Python中使用的是列表推导式和format,C++中需要遍历每个字符,将每个字符转换为8位二进制字符串。这里要注意字符的ASCII值处理,以及补足前导零。例如,使用std::bitset<8>来处理每个字符的二进制转换。 颜色空间转换部分,Python中使用cv2.cvtColor,C++中同样使用cv::cvtColor函数,参数需要注意COLOR_BGR2YCrCb。然后分离通道,使用cv::split函数得到Y、U、V三个通道的Mat对象。 分块处理Y通道。Python中的循环是遍历每个8x8块,C++中需要嵌套循环,但用户提到要打乱块的处理顺序。Python中使用random.shuffle来打乱块列表,C++中可以使用std::shuffle,但需要生成一个块的坐标列表,然后随机打乱。 在处理每个块时,Python中使用了cv2.dct,C++中的cv::dct函数需要输入和输出是浮点型的Mat。需要注意将块转换为float类型,然后进行DCT变换。量化步骤是除以量化矩阵并取整,C++中需要逐个元素相除,然后四舍五入,可能使用cv::divide函数或逐个元素操作。 嵌入秘密信息时,遍历每个DCT系数,跳过直流分量,然后根据秘密信息的二进制位调整系数的奇偶性。这里要注意,C++中访问Mat的元素需要使用.at<float>(x,y),并且需要处理正负数的调整。例如,如果当前位的奇偶性不符合秘密位,就减1或加1,但要注意不能改变符号,或者在系数为0时如何处理。但原Python代码中已经跳过了dct_block[x,y]为0的情况,所以在C++中同样需要判断。 反量化时,乘以量化矩阵,然后进行逆DCT(cv::idct),然后将结果放回原Y通道的块中。 最后合并通道,转换回BGR,并修改传入的image参数。 现在,需要注意的一些细节问题: 1. C++中的随机数生成:打乱块顺序时,需要生成随机种子,并正确使用std::shuffle。可能需要使用std::mt19937引擎。 2. Mat的类型问题:在C++中,进行DCT和IDCT需要输入数据是浮点型,比如CV_32F。所以在处理每个块时,必须确保块的数据类型正确。 3. 块的边界处理:如果图像的尺寸不是8的倍数,最后一个块可能不足8x8。原Python代码中检查block的形状是否为(8,8),在C++中可能需要类似的判断,或者在循环条件中处理。 4. 嵌入位的索引控制:在C++中,需要一个变量来跟踪当前嵌入到了第几位,并在嵌入完成后提前退出循环。 5. OpenCV的split和merge函数在C++中的使用方式,与Python略有不同,需要正确处理各个通道的Mat。 可能的错误点: - 未正确设置随机种子,导致每次洗牌顺序相同,影响安全性。 - 类型转换错误,例如未将块转换为float导致DCT计算错误。 - 在修改DCT系数时,未正确处理正负号,导致系数错误。 - 未正确处理Y通道的引用,导致修改后的数据未保存。 现在逐步实现代码: 首先,量化矩阵的创建: cv::Mat QUANTIZATION_MATRIX = (cv::Mat_<float>(8,8) << 16, 11, 10, 16, 24, 40, 51, 61, 12, 12, 14, 19, 26, 58, 60, 55, 14, 13, 16, 24, 40, 57, 69, 56, 14, 17, 22, 29, 51, 87, 80, 62, 18, 22, 37, 56, 68, 109, 103, 77, 24, 35, 55, 64, 81, 104, 113, 92, 49, 64, 78, 87, 103, 121, 120, 101, 72, 92, 95, 98, 112, 100, 103, 99); 然后,处理秘密信息为二进制字符串: std::string secret_bits; for (char c : secret_message) { secret_bits += std::bitset<8>(c).to_string(); } 颜色空间转换: cv::Mat yuv_image; cv::cvtColor(image, yuv_image, cv::COLOR_BGR2YCrCb); std::vector<cv::Mat> channels; cv::split(yuv_image, channels); cv::Mat y_channel = channels[0]; cv::Mat u_channel = channels[1]; cv::Mat v_channel = channels[2]; 生成块列表并洗牌: int block_size = 8; int h = y_channel.rows; int w = y_channel.cols; std::vector<cv::Point> blocks; for (int i = 0; i < h; i += block_size) { for (int j = 0; j < w; j += block_size) { blocks.emplace_back(j, i); // 注意OpenCV是列先行,即x是列,y是行 } } // 洗牌 std::random_device rd; std::mt19937 rng(rd()); std::shuffle(blocks.begin(), blocks.end(), rng); 处理每个块: size_t bit_idx = 0; for (const auto& block_pos : blocks) { int x = block_pos.y; // 因为Point的x是列,即j,对应行是i int y = block_pos.x; if (x + block_size > h || y + block_size > w) { continue; // 忽略不足8x8的块 } cv::Mat block = y_channel(cv::Rect(y, x, block_size, block_size)).clone(); block.convertTo(block, CV_32F); // 转换为浮点型以进行DCT cv::Mat dct_block; cv::dct(block, dct_block); // 量化 cv::divide(dct_block, QUANTIZATION_MATRIX, dct_block); dct_block = cv::Mat_<int>(dct_block); // 四舍五入为整数?或者使用cv::round? // 可能需要逐个元素处理,因为cv::round返回的是浮点型 for (int i = 0; i < dct_block.rows; ++i) { for (int j = 0; j < dct_block.cols; ++j) { dct_block.at<float>(i,j) = std::round(dct_block.at<float>(i,j)); } } // 嵌入秘密信息 for (int i = 0; i < block_size; ++i) { for (int j = 0; j < block_size; ++j) { if (i == 0 && j == 0) continue; // 跳过直流分量 if (bit_idx >= secret_bits.size()) break; int& coeff = dct_block.at<int>(i,j); // 这里可能需要确认类型是否正确 if (coeff == 0) continue; char target_bit = secret_bits[bit_idx]; int current_bit = coeff % 2; if (current_bit != (target_bit - '0')) { if (coeff > 0) { coeff -= 1; } else { coeff += 1; } } ++bit_idx; } if (bit_idx >= secret_bits.size()) break; } // 反量化 cv::Mat dequantized = dct_block.mul(QUANTIZATION_MATRIX); cv::Mat idct_block; cv::idct(dequantized, idct_block); // 将结果复制回原Y通道 idct_block.convertTo(idct_block, y_channel.type()); idct_block.copyTo(y_channel(cv::Rect(y, x, block_size, block_size))); if (bit_idx >= secret_bits.size()) break; } 合并通道并转换回BGR: cv::merge(std::vector<cv::Mat>{y_channel, u_channel, v_channel}, yuv_image); cv::cvtColor(yuv_image, image, cv::COLOR_YCrCb2BGR); 但在这个过程中,可能存在几个问题: 1. 在量化过程中,使用cv::divide得到的是浮点数,之后需要四舍五入为整数。但直接转换为int可能有问题,应该使用cv::round函数。或者,在C++中,是否可以更准确地处理四舍五入? 2. dct_block的类型是否正确?在Python中,dct_block是经过量化的浮点数组,但在C++中,如果在除以量化矩阵后四舍五入为整数,可能需要将dct_block转换为整数类型?或者保持浮点型,但存储整数值? 例如,在Python中,dct_block是np.array,但元素是整数,但在C++中,Mat的类型是CV_32F,所以存储的是浮点数,但值为整数。例如,5.0。在嵌入位时,需要将其转换为整数,或者直接取模。 可能的解决方法是,在量化后,将dct_block转换为CV_32S类型?但OpenCV的某些操作可能不支持整数类型。所以更安全的做法是保持为浮点型,但在处理时将其四舍五入为整数,然后存储为浮点型。 例如: for (int i = 0; i < dct_block.rows; i++) { for (int j = 0; j < dct_block.cols; j++) { float val = dct_block.at<float>(i, j); dct_block.at<float>(i, j) = std::round(val); } } 这样,dct_block中的每个元素都是四舍五入后的浮点数,例如3.0,4.0等。 在嵌入位时,检查该浮点数的奇偶性: float coeff = dct_block.at<float>(i,j); if (coeff == 0) continue; int int_coeff = static_cast<int>(coeff); if (int_coeff % 2 != (secret_bits[bit_idx] - '0')) { if (int_coeff > 0) { dct_block.at<float>(i,j) -= 1.0f; } else { dct_block.at<float>(i,j) += 1.0f; } } 这样处理可能更准确。 另一个问题是在反量化时,需要将dct_block乘以QUANTIZATION_MATRIX。此时,dct_block已经被修改过,所以直接相乘即可。 然后进行逆DCT,并将结果转换回原数据类型(例如,原Y通道可能是uchar类型)。例如: cv::Mat dequantized = dct_block.mul(QUANTIZATION_MATRIX); cv::Mat idct_block; cv::idct(dequantized, idct_block); idct_block.convertTo(idct_block, y_channel.type()); // 转换为原类型,比如CV_8U idct_block.copyTo(y_channel(cv::Rect(y, x, block_size, block_size))); 但这里需要注意的是,原Y通道的类型通常是uchar(CV_8U),所以在逆DCT之后需要进行适当的类型转换,并确保值在0-255范围内。可能需要使用cv::normalize或者饱和转换。 例如,在Python代码中,逆DCT后的块被直接赋值回Y通道,可能已经处理了溢出,但在C++中需要显式处理。例如: idct_block.convertTo(idct_block, CV_8U, 1.0, 0.5); // 四舍五入? 或者使用cv::Mat::convertTo的饱和操作。 例如,在转换时: idct_block.convertTo(idct_block, CV_8U); 这会自动将浮点数值饱和到0-255,截断到整数。 但需要注意逆DCT后的值可能超出0-255的范围,因此需要进行裁剪。例如: cv::Mat temp; idct_block.convertTo(temp, CV_8U); temp.copyTo(...) 或者在进行逆DCT之后,先进行归一化: cv::normalize(idct_block, idct_block, 0, 255, cv::NORM_MINMAX, CV_8U); 这可能更安全。 综上,整个流程需要处理数据类型转换、四舍五入、溢出处理,以及正确的块操作。 可能的错误点总结: 1. 块坐标处理错误,因为OpenCV的Rect是(x,y,width,height),而循环中的i和j可能对应于行和列,需要正确转换。 2. 在嵌入秘密位时,没有正确判断系数是否为0,或者在修改时导致系数变为0,影响后续处理。 3. 未正确处理随机洗牌,导致块处理顺序不正确。 4. 在反量化时,乘法顺序或类型错误,导致结果不正确。 5. 逆DCT后的数据类型转换不正确,导致图像损坏。 现在,综合以上分析,可以编写C++代码。</think>以下是F5隐写算法的C++实现: ```cpp #include <opencv2/opencv.hpp> #include <vector> #include <random> #include <bitset> void f5EmbedColorImage(cv::Mat& image, const std::string& secret_message) { // 量化矩阵 (需要转置为OpenCV的存储顺序) cv::Mat QUANTIZATION_MATRIX = (cv::Mat_<float>(8,8) << 16, 11, 10, 16, 24, 40, 51, 61, 12, 12, 14, 19, 26, 58, 60, 55, 14, 13, 16, 24, 40, 57, 69, 56, 14, 17, 22, 29, 51, 87, 80, 62, 18, 22, 37, 56, 68, 109, 103, 77, 24, 35, 55, 64, 81, 104, 113, 92, 49, 64, 78, 87, 103, 121, 120, 101, 72, 92, 95, 98, 112, 100, 103, 99); // 转换秘密信息为二进制位序列 std::string secret_bits; for (char c : secret_message) { secret_bits += std::bitset<8>(c).to_string(); } // 转换到YCrCb颜色空间 cv::Mat yuv_image; cv::cvtColor(image, yuv_image, cv::COLOR_BGR2YCrCb); std::vector<cv::Mat> channels(3); cv::split(yuv_image, channels); cv::Mat& y_channel = channels[0]; const int block_size = 8; const int rows = y_channel.rows; const int cols = y_channel.cols; // 生成块坐标列表并洗牌 std::vector<cv::Point> blocks; for (int y = 0; y < rows; y += block_size) { for (int x = 0; x < cols; x += block_size) { blocks.emplace_back(x, y); } } std::random_device rd; std::shuffle(blocks.begin(), blocks.end(), std::mt19937(rd())); size_t bit_idx = 0; for (const auto& block : blocks) { if (bit_idx >= secret_bits.size()) break; cv::Rect roi(block.x, block.y, block_size, block_size); if (roi.x + roi.width > cols || roi.y + roi.height > rows) continue; cv::Mat block_data = y_channel(roi).clone(); block_data.convertTo(block_data, CV_32F); // DCT变换 cv::Mat dct_coeffs; cv::dct(block_data, dct_coeffs); // 量化和嵌入 for (int i = 0; i < block_size; ++i) { for (int j = 0; j < block_size; ++j) { if (i == 0 && j == 0) continue; // 跳过直流分量 // 量化 float q_val = QUANTIZATION_MATRIX.at<float>(i,j); float quantized = std::round(dct_coeffs.at<float>(i,j) / q_val); if (quantized == 0) continue; if (bit_idx >= secret_bits.size()) break; // 嵌入信息 int target_bit = secret_bits[bit_idx] - '0'; int current_bit = static_cast<int>(quantized) % 2; if (current_bit != target_bit) { quantized += (quantized > 0) ? -1 : 1; } // 反量化并更新系数 dct_coeffs.at<float>(i,j) = quantized * q_val; ++bit_idx; } } // 逆DCT cv::Mat idct_block; cv::idct(dct_coeffs, idct_block); idct_block.convertTo(idct_block, y_channel.type()); idct_block.copyTo(y_channel(roi)); } // 合并通道并转换回BGR cv::merge(channels, yuv_image); cv::cvtColor(yuv_image, image, cv::COLOR_YCrCb2BGR); } ``` 实现要点解释: 1. 颜色空间转换:使用OpenCV的YCrCb替代YUV(对应COLOR_BGR2YCrCb) 2. 块处理顺序:通过洗牌算法随机化处理顺序增强安全性 3. 系数处理:跳过直流分量,仅修改交流分量 4. 数据类型:全程保持浮点运算精度,仅在最后阶段转换为图像原始类型 5. 溢出处理:OpenCV的convertTo会自动进行饱和转换
阅读全文

相关推荐

import tkinter as tk from tkinter import filedialog, messagebox from PIL import Image, ImageTk import cv2 import numpy as np from scipy.fftpack import dct, idct import threading class DCTCompressorApp: def __init__(self, root): self.root = root self.root.title("DCT图像压缩工具") self.root.geometry("1100x850") # 图像变量 self.original_image = None self.compressed_image = None self.residual_image = None self.tk_images = [] # 防止图像被垃圾回收 self.filename = None self.compression_ratio = 1.0 # DCT参数 self.quality = tk.IntVar(value=50) # 压缩质量 (1-100) self.block_size = tk.IntVar(value=8) # DCT块大小 self.show_dct = tk.BooleanVar(value=False) # 是否显示DCT系数 # 创建界面 self.create_widgets() def create_widgets(self): # 菜单栏 menubar = tk.Menu(self.root) filemenu = tk.Menu(menubar, tearoff=0) filemenu.add_command(label="打开", command=self.open_image) filemenu.add_command(label="保存压缩图", command=self.save_compressed) filemenu.add_command(label="保存残差图", command=self.save_residual) filemenu.add_separator() filemenu.add_command(label="退出", command=self.root.quit) menubar.add_cascade(label="文件", menu=filemenu) self.root.config(menu=menubar) # 控制面板 control_frame = tk.Frame(self.root) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10) # 压缩参数 param_frame = tk.LabelFrame(control_frame, text="DCT压缩参数", padx=5, pady=5) param_frame.pack(fill=tk.X, padx=5, pady=5) tk.Label(param_frame, text="压缩质量:").pack(anchor=tk.W) tk.Scale(param_frame, from_=1, to=100, orient=tk.HORIZONTAL, variable=self.quality).pack(fill=tk.X) tk.Label(param_frame, text="块大小:").pack(anchor=tk.W) tk.Scale(param_frame, from_=4, to=32, resolution=4, orient=tk.HORIZONTAL, variable=self.block_size).pack(fill=tk.X) tk.Checkbutton(param_frame, text="显示DCT系数", variable=self.show_dct).pack(anchor=tk.W) # 操作按钮 btn_frame = tk.Frame(control_frame) btn_frame.pack(fill=tk.X, padx=5, pady=10) tk.Button(btn_frame, text="执行DCT压缩", command=self.compress_image).pack(side=tk.TOP, fill=tk.X, pady=5) tk.Button(btn_frame, text="重置图像", command=self.reset_image).pack(side=tk.TOP, fill=tk.X, pady=5) # 信息显示 info_frame = tk.LabelFrame(control_frame, text="压缩信息", padx=5, pady=5) info_frame.pack(fill=tk.X, padx=5, pady=5) self.info_label = tk.Label(info_frame, text="压缩比: -\nPSNR: - dB", justify=tk.LEFT) self.info_label.pack() # 图像显示区域 display_frame = tk.Frame(self.root) display_frame.pack(side=tk.RIGHT, expand=True, fill=tk.BOTH, padx=10, pady=10) # 原始图像 orig_frame = tk.LabelFrame(display_frame, text="原始图像", padx=5, pady=5) orig_frame.pack(side=tk.LEFT, expand=True, fill=tk.BOTH) self.orig_canvas = tk.Canvas(orig_frame, bg="#f0f0f0", bd=0, highlightthickness=0) self.orig_canvas.pack(expand=True, fill=tk.BOTH) # 压缩图像 comp_frame = tk.LabelFrame(display_frame, text="压缩图像", padx=5, pady=5) comp_frame.pack(side=tk.LEFT, expand=True, fill=tk.BOTH) self.comp_canvas = tk.Canvas(comp_frame, bg="#f0f0f0", bd=0, highlightthickness=0) self.comp_canvas.pack(expand=True, fill=tk.BOTH) # 残差图像 res_frame = tk.LabelFrame(display_frame, text="残差图像", padx=5, pady=5) res_frame.pack(side=tk.LEFT, expand=True, fill=tk.BOTH) self.res_canvas = tk.Canvas(res_frame, bg="#f0f0f0", bd=0, highlightthickness=0) self.res_canvas.pack(expand=True, fill=tk.BOTH) def open_image(self): filetypes = [("图像文件", "*.bmp;*.png;*.jpg;*.jpeg"), ("所有文件", "*.*")] filename = filedialog.askopenfilename(filetypes=filetypes) if filename: try: self.filename = filename self.original_image = Image.open(filename).convert("RGB") self.compressed_image = None self.residual_image = None self.display_images() self.update_info() except Exception as e: messagebox.showerror("错误", f"无法打开图像:\n{str(e)}") def save_compressed(self): if self.compressed_image: filetypes = [("PNG", "*.png"), ("JPEG", "*.jpg"), ("BMP", "*.bmp")] filename = filedialog.asksaveasfilename(defaultextension=".png", filetypes=filetypes) if filename: try: self.compressed_image.save(filename) messagebox.showinfo("保存成功", "压缩图像已保存") except Exception as e: messagebox.showerror("错误", f"保存失败:\n{str(e)}") def save_residual(self): if self.residual_image is not None: filetypes = [("PNG", "*.png"), ("BMP", "*.bmp")] filename = filedialog.asksaveasfilename(defaultextension=".png", filetypes=filetypes) if filename: try: # 将残差图像转换为可视化的灰度图 residual = cv2.normalize(self.residual_image, None, 0, 255, cv2.NORM_MINMAX) Image.fromarray(residual.astype(np.uint8)).save(filename) messagebox.showinfo("保存成功", "残差图像已保存") except Exception as e: messagebox.showerror("错误", f"保存失败:\n{str(e)}") def compress_image(self): if not self.original_image: return def process(): try: # 转换为OpenCV格式 (BGR) img_cv = cv2.cvtColor(np.array(self.original_image), cv2.COLOR_RGB2BGR) height, width = img_cv.shape[:2] # 转换为YCrCb色彩空间 (类似JPEG) img_yuv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2YCrCb) # 获取块大小 block_size = self.block_size.get() # 创建量化矩阵 (基于JPEG标准调整) Q = self.create_quantization_matrix(block_size, self.quality.get()) # 对每个通道进行处理 compressed_channels = [] dct_blocks = [] # 声明在外部循环,以便后续使用 for ch in range(3): channel = img_yuv[:, :, ch].astype(np.float32) - 128 # 中心化 # 分块DCT ch_dct_blocks = [] for i in range(0, height, block_size): for j in range(0, width, block_size): block = channel[i:i + block_size, j:j + block_size] if block.shape[0] < block_size or block.shape[1] < block_size: # 边缘块填充 padded = np.zeros((block_size, block_size), dtype=np.float32) padded[:block.shape[0], :block.shape[1]] = block block = padded # DCT变换 dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho') ch_dct_blocks.append((i, j, dct_block)) # 存储当前通道的DCT块 dct_blocks.append(ch_dct_blocks) # 量化和反量化 for ch in range(3): channel = img_yuv[:, :, ch].astype(np.float32) - 128 rec_channel = np.zeros_like(channel) ch_dct_blocks = dct_blocks[ch] # 获取当前通道的DCT块 for i, j, dct_block in ch_dct_blocks: # 量化 quantized = np.round(dct_block / Q) # 反量化 dequantized = quantized * Q # 逆DCT idct_block = idct(idct(dequantized.T, norm='ortho').T, norm='ortho') # 存储重建块 block_h = min(block_size, height - i) block_w = min(block_size, width - j) rec_channel[i:i + block_h, j:j + block_w] = idct_block[:block_h, :block_w] + 128 compressed_channels.append(rec_channel) # 合并通道并转换回RGB compressed_yuv = np.stack(compressed_channels, axis=-1) compressed_yuv = np.clip(compressed_yuv, 0, 255).astype(np.uint8) compressed_bgr = cv2.cvtColor(compressed_yuv, cv2.COLOR_YCrCb2BGR) self.compressed_image = Image.fromarray(cv2.cvtColor(compressed_bgr, cv2.COLOR_BGR2RGB)) # 计算残差 original_array = np.array(self.original_image).astype(np.float32) compressed_array = np.array(self.compressed_image).astype(np.float32) # 确保尺寸完全一致 min_h = min(original_array.shape[0], compressed_array.shape[0]) min_w = min(original_array.shape[1], compressed_array.shape[1]) original_array = original_array[:min_h, :min_w, :] compressed_array = compressed_array[:min_h, :min_w, :] # 计算残差 self.residual_image = np.abs(original_array - compressed_array).mean(axis=2) # 转灰度残差 # 确保残差图像尺寸有效 if self.residual_image.size == 0 or self.residual_image.shape[0] <= 0 or self.residual_image.shape[1] <= 0: messagebox.showerror("残差计算错误", "无法生成有效的残差图像,请尝试其他图像或参数") return # 计算压缩比 original_size = width * height * 24 # 24 bits per pixel (RGB) nonzero_count = sum(np.count_nonzero(np.round(dct_block / Q)) for ch in range(3) for _, _, dct_block in dct_blocks[ch]) compressed_size = nonzero_count * (8 + 8) # 假设每个非零系数用8位存值+8位存位置 self.compression_ratio = original_size / max(1, compressed_size) # 更新显示 self.display_images() self.update_info() except Exception as e: messagebox.showerror("处理错误", f"DCT压缩失败:\n{str(e)}") threading.Thread(target=process).start() def create_quantization_matrix(self, size, quality): """创建量化矩阵 (基于JPEG标准调整)""" # 基础矩阵 (JPEG亮度量化表扩展) base_Q = np.array([ [16, 11, 10, 16, 24, 40, 51, 61], [12, 12, 14, 19, 26, 58, 60, 55], [14, 13, 16, 24, 40, 57, 69, 56], [14, 17, 22, 29, 51, 87, 80, 62], [18, 22, 37, 56, 68, 109, 103, 77], [24, 35, 55, 64, 81, 104, 113, 92], [49, 64, 78, 87, 103, 121, 120, 101], [72, 92, 95, 98, 112, 100, 103, 99] ], dtype=np.float32) # 调整到指定大小 Q = cv2.resize(base_Q, (size, size), interpolation=cv2.INTER_LINEAR) # 根据质量调整量化步长 scale = 1.0 if quality < 50: scale = 5000 / quality else: scale = 200 - 2 * quality Q = np.floor((Q * scale + 50) / 100) Q = np.clip(Q, 1, 255) return Q def calculate_psnr(self, original, compressed): """计算峰值信噪比 (PSNR)""" mse = np.mean((original - compressed) ** 2) if mse == 0: return float('inf') max_pixel = 255.0 return 20 * np.log10(max_pixel / np.sqrt(mse)) def display_images(self): """显示所有图像""" self.tk_images = [] # 清除旧引用 # 显示原始图像 if self.original_image: self.display_image(self.original_image, self.orig_canvas, "原始图像") # 显示压缩图像 if self.compressed_image: self.display_image(self.compressed_image, self.comp_canvas, "压缩图像") # 显示残差图像 if self.residual_image is not None: # 确保残差图像尺寸有效 if self.residual_image.size == 0 or self.residual_image.shape[0] <= 0 or self.residual_image.shape[1] <= 0: messagebox.showwarning("残差警告", "残差图像尺寸无效,跳过显示") return # 将残差归一化到0-255范围 residual = cv2.normalize(self.residual_image, None, 0, 255, cv2.NORM_MINMAX) residual_img = Image.fromarray(residual.astype(np.uint8)) self.display_image(residual_img, self.res_canvas, "残差图像") def display_image(self, image, canvas, title): """在指定画布上显示图像""" try: # 检查图像宽高是否有效 img_width, img_height = image.size if img_width <= 0 or img_height <= 0: raise ValueError(f"{title} 宽高无效: {img_width}x{img_height}") canvas_width = canvas.winfo_width() canvas_height = canvas.winfo_height() if canvas_width <= 1 or canvas_height <= 1: return # 计算保持比例的缩放 ratio = min( (canvas_width - 20) / img_width, (canvas_height - 20) / img_height ) new_size = (int(img_width * ratio), int(img_height * ratio)) # 高质量缩放 resized_img = image.resize(new_size, Image.LANCZOS) tk_image = ImageTk.PhotoImage(resized_img) # 存储引用防止被回收 self.tk_images.append(tk_image) # 更新画布 canvas.delete("all") canvas.create_image( canvas_width // 2, canvas_height // 2, anchor=tk.CENTER, image=tk_image ) except Exception as e: messagebox.showerror("显示错误", f"显示{title}失败:\n{str(e)}") def update_info(self): """更新压缩信息""" if self.compressed_image and self.original_image: # 计算PSNR original = np.array(self.original_image).astype(np.float32) compressed = np.array(self.compressed_image).astype(np.float32) min_h = min(original.shape[0], compressed.shape[0]) min_w = min(original.shape[1], compressed.shape[1]) original = original[:min_h, :min_w, :] compressed = compressed[:min_h, :min_w, :] psnr = self.calculate_psnr(original, compressed) info_text = (f"压缩比: {self.compression_ratio:.2f}:1\n" f"PSNR: {psnr:.2f} dB\n" f"块大小: {self.block_size.get()}x{self.block_size.get()}\n" f"质量: {self.quality.get()}/100") self.info_label.config(text=info_text) else: self.info_label.config(text="压缩比: -\nPSNR: - dB") def reset_image(self): """重置为原始图像""" if self.original_image: self.compressed_image = None self.residual_image = None self.display_images() self.update_info() if __name__ == "__main__": root = tk.Tk() app = DCTCompressorApp(root) root.mainloop() "height and width must be > 0" 错误

import tensorflow as tf import numpy as np import cv2 import os import json from tqdm import tqdm class ObjectRecognitionDeployer: def __init__(self, model_path, class_labels): """ 初始化部署器 :param model_path: 模型文件路径 (Keras或TFLite) :param class_labels: 类别标签列表 """ self.class_labels = class_labels self.model_path = model_path self.interpreter = None self.input_details = None self.output_details = None # 根据模型类型加载 if model_path.endswith('.tflite'): self.load_tflite_model(model_path) else: self.model = tf.keras.models.load_model(model_path) self.input_shape = self.model.input_shape[1:3] def load_tflite_model(self, model_path): """加载并配置TFLite模型""" # 加载模型 self.interpreter = tf.lite.Interpreter(model_path=model_path) self.interpreter.allocate_tensors() # 获取输入输出详细信息 self.input_details = self.interpreter.get_input_details() self.output_details = self.interpreter.get_output_details() # 保存输入形状 self.input_shape = tuple(self.input_details[0]['shape'][1:3]) # 安全地打印模型元数据 self.print_model_metadata(model_path) def print_model_metadata(self, model_path): """安全地打印TFLite模型元数据""" try: from tflite_support import metadata displayer = metadata.MetadataDisplayer.with_model_file(model_path) print("--- 模型元数据 ---") print(displayer.get_metadata_json()) print("--- 关联文件 ---") print(displayer.get_packed_associated_file_list()) except (ImportError, ValueError) as e: print(f"警告: 无法获取模型元数据 - {str(e)}") print("使用输入/输出详细信息代替:") print(f"输入: {self.input_details}") print(f"输出: {self.output_details}") def preprocess_image(self, image, input_size, input_dtype=np.float32): """ 预处理图像 :param image: 输入图像 (numpy数组或文件路径) :param input_size: 模型输入尺寸 (height, width) :param input_dtype: 期望的输入数据类型 :return: 预处理后的图像张量 """ if isinstance(image, str): if not os.path.exists(image): raise FileNotFoundError(f"图像文件不存在: {image}") img = cv2.imread(image) if img is None: raise ValueError(f"无法读取图像: {image}") else: img = image # 调整尺寸和颜色空间 img = cv2.resize(img, (input_size[1], input_size[0])) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # 根据数据类型进行归一化 if input_dtype == np.uint8: img = img.astype(np.uint8) # 量化模型使用uint8 else: # 浮点模型使用float32 img = img.astype(np.float32) / 255.0 # 添加批次维度 img = np.expand_dims(img, axis=0) return img def predict(self, image): """ 执行预测 :param image: 输入图像 (numpy数组或文件路径) :return: 预测结果 (类别名称, 置信度) """ if self.interpreter is not None: # TFLite模型推理 return self.predict_tflite(image) else: # Keras模型推理 return self.predict_keras(image) def predict_keras(self, image): """使用Keras模型预测""" # 预处理 img = self.preprocess_image(image, self.input_shape, np.float32) # 预测 predictions = self.model.predict(img, verbose=0)[0] class_idx = np.argmax(predictions) confidence = predictions[class_idx] class_name = self.class_labels[class_idx] return class_name, confidence def predict_tflite(self, image): """使用TFLite模型预测""" # 获取输入数据类型 input_dtype = self.input_details[0]['dtype'] # 预处理 img = self.preprocess_image(image, self.input_shape, input_dtype) # 设置输入张量 self.interpreter.set_tensor(self.input_details[0]['index'], img) # 执行推理 self.interpreter.invoke() # 获取输出 output_data = self.interpreter.get_tensor(self.output_details[0]['index']) predictions = output_data[0] # 解析结果 class_idx = np.argmax(predictions) confidence = predictions[class_idx] # 如果输出是量化数据,需要反量化 if self.output_details[0]['dtype'] == np.uint8: # 反量化输出 scale, zero_point = self.output_details[0]['quantization'] confidence = scale * (confidence - zero_point) class_name = self.class_labels[class_idx] return class_name, confidence def benchmark(self, image, runs=100): """ 模型性能基准测试 :param image: 测试图像 :param runs: 运行次数 :return: 平均推理时间(ms), 内存占用(MB) """ # 预热运行 self.predict(image) # 计时测试 start_time = tf.timestamp() for _ in range(runs): self.predict(image) end_time = tf.timestamp() avg_time_ms = (end_time - start_time).numpy() * 1000 / runs # 内存占用 if self.interpreter: # 计算输入张量内存占用 input_size = self.input_details[0]['shape'] dtype_size = np.dtype(self.input_details[0]['dtype']).itemsize mem_usage = np.prod(input_size) * dtype_size / (1024 * 1024) else: # 估算Keras模型内存 mem_usage = self.model.count_params() * 4 / (1024 * 1024) # 假设32位浮点数 return avg_time_ms, mem_usage def create_metadata(self, output_path): """ 创建并保存模型元数据文件 :param output_path: 元数据文件输出路径 """ metadata = { "model_type": "tflite" if self.model_path.endswith('.tflite') else "keras", "class_labels": self.class_labels, "input_size": self.input_shape, "input_dtype": str(self.input_details[0]['dtype']) if self.interpreter else "float32", "quantization": None } if self.interpreter and self.input_details[0]['dtype'] == np.uint8: metadata["quantization"] = { "input_scale": float(self.input_details[0]['quantization'][0]), "input_zero_point": int(self.input_details[0]['quantization'][1]), "output_scale": float(self.output_details[0]['quantization'][0]), "output_zero_point": int(self.output_details[0]['quantization'][1]) } with open(output_path, 'w') as f: json.dump(metadata, f, indent=4) return metadata def convert_to_tflite_with_metadata(self, output_path, quantize=False, representative_data_dir=None): """ 将Keras模型转换为TFLite格式并添加元数据 :param output_path: 输出TFLite文件路径 :param quantize: 是否进行量化 :param representative_data_dir: 代表性数据集目录 """ if not self.model_path.endswith(('.keras', '.h5')): raise ValueError("需要Keras模型格式进行转换") # 加载Keras模型 keras_model = tf.keras.models.load_model(self.model_path) # 创建转换器 converter = tf.lite.TFLiteConverter.from_keras_model(keras_model) if quantize: # 量化配置 converter.optimizations = [tf.lite.Optimize.DEFAULT] # 设置代表性数据集生成器 converter.representative_dataset = lambda: self.representative_dataset( representative_data_dir, input_size=self.input_shape ) # 设置输入输出类型 converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] converter.inference_input_type = tf.uint8 converter.inference_output_type = tf.uint8 # 转换模型 tflite_model = converter.convert() # 保存模型 with open(output_path, 'wb') as f: f.write(tflite_model) print(f"TFLite模型已保存到: {output_path}") # 添加元数据 self.add_tflite_metadata(output_path) return output_path def representative_dataset(self, data_dir=None, input_size=(224, 224), num_samples=100): """ 生成代表性数据集用于量化 :param data_dir: 真实数据目录 :param input_size: 输入尺寸 (height, width) :param num_samples: 样本数量 """ # 优先使用真实数据 if data_dir and os.path.exists(data_dir): image_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] # 限制样本数量 image_files = image_files[:min(len(image_files), num_samples)] print(f"使用 {len(image_files)} 张真实图像进行量化校准") for img_path in tqdm(image_files, desc="量化校准"): try: # 读取并预处理图像 img = cv2.imread(img_path) if img is None: continue img = cv2.resize(img, (input_size[1], input_size[0])) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(np.float32) / 255.0 # 转换为float32并归一化 img = np.expand_dims(img, axis=0) yield [img] except Exception as e: print(f"处理图像 {img_path} 时出错: {str(e)}") else: # 使用随机数据作为备选 print(f"使用随机数据生成 {num_samples} 个样本进行量化校准") for _ in range(num_samples): # 生成随机图像,归一化到[0,1]范围,使用float32类型 data = np.random.rand(1, input_size[0], input_size[1], 3).astype(np.float32) yield [data] def add_tflite_metadata(self, model_path): """为TFLite模型添加元数据""" # 创建标签文件 labels_path = os.path.join(os.path.dirname(model_path), "labels.txt") with open(labels_path, 'w') as f: for label in self.class_labels: f.write(f"{label}\n") # 创建元数据 metadata_path = os.path.join(os.path.dirname(model_path), "metadata.json") self.create_metadata(metadata_path) print(f"元数据已创建: {metadata_path}") print(f"标签文件已创建: {labels_path}") # 使用示例 if __name__ == "__main__": # 类别标签 CLASS_LABELS = ['book', 'cup', 'glasses', 'phone', 'shoe'] # 初始化部署器 deployer = ObjectRecognitionDeployer( model_path='optimized_model.keras', class_labels=CLASS_LABELS ) # 转换为带元数据的TFLite格式 tflite_path = 'model_quantized.tflite' # 使用真实数据目录进行量化校准 REPRESENTATIVE_DATA_DIR = 'path/to/representative_dataset' # 替换为实际路径 deployer.convert_to_tflite_with_metadata( tflite_path, quantize=True, representative_data_dir=REPRESENTATIVE_DATA_DIR ) # 重新加载带元数据的模型 tflite_deployer = ObjectRecognitionDeployer( model_path=tflite_path, class_labels=CLASS_LABELS ) # 测试预测 test_image = 'test_image.jpg' class_name, confidence = tflite_deployer.predict(test_image) print(f"预测结果: {class_name}, 置信度: {confidence:.2f}") # 性能测试 avg_time, mem_usage = tflite_deployer.benchmark(test_image) print(f"平均推理时间: {avg_time:.2f} ms") print(f"内存占用: {mem_usage:.2f} MB") # 创建元数据文件 metadata = deployer.create_metadata('model_metadata.json') print("模型元数据:", json.dumps(metadata, indent=4)) import tensorflow as tf import numpy as np import cv2 import os import json from tqdm import tqdm class ObjectRecognitionDeployer: def __init__(self, model_path, class_labels): """ 初始化部署器 :param model_path: 模型文件路径 (Keras或TFLite) :param class_labels: 类别标签列表 """ self.class_labels = class_labels self.model_path = model_path self.interpreter = None self.input_details = None self.output_details = None # 根据模型类型加载 if model_path.endswith('.tflite'): self.load_tflite_model(model_path) else: self.model = tf.keras.models.load_model(model_path) self.input_shape = self.model.input_shape[1:3] def load_tflite_model(self, model_path): """加载并配置TFLite模型""" # 加载模型 self.interpreter = tf.lite.Interpreter(model_path=model_path) self.interpreter.allocate_tensors() # 获取输入输出详细信息 self.input_details = self.interpreter.get_input_details() self.output_details = self.interpreter.get_output_details() # 保存输入形状 self.input_shape = tuple(self.input_details[0]['shape'][1:3]) # 安全地打印模型元数据 self.print_model_metadata(model_path) def print_model_metadata(self, model_path): """安全地打印TFLite模型元数据""" try: from tflite_support import metadata displayer = metadata.MetadataDisplayer.with_model_file(model_path) print("--- 模型元数据 ---") print(displayer.get_metadata_json()) print("--- 关联文件 ---") print(displayer.get_packed_associated_file_list()) except (ImportError, ValueError) as e: print(f"警告: 无法获取模型元数据 - {str(e)}") print("使用输入/输出详细信息代替:") print(f"输入: {self.input_details}") print(f"输出: {self.output_details}") def preprocess_image(self, image, input_size, input_dtype=np.float32): """ 预处理图像 :param image: 输入图像 (numpy数组或文件路径) :param input_size: 模型输入尺寸 (height, width) :param input_dtype: 期望的输入数据类型 :return: 预处理后的图像张量 """ if isinstance(image, str): if not os.path.exists(image): raise FileNotFoundError(f"图像文件不存在: {image}") img = cv2.imread(image) if img is None: raise ValueError(f"无法读取图像: {image}") else: img = image # 调整尺寸和颜色空间 img = cv2.resize(img, (input_size[1], input_size[0])) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # 根据数据类型进行归一化 if input_dtype == np.uint8: img = img.astype(np.uint8) # 量化模型使用uint8 else: # 浮点模型使用float32 img = img.astype(np.float32) / 255.0 # 添加批次维度 img = np.expand_dims(img, axis=0) return img def predict(self, image): """ 执行预测 :param image: 输入图像 (numpy数组或文件路径) :return: 预测结果 (类别名称, 置信度) """ if self.interpreter is not None: # TFLite模型推理 return self.predict_tflite(image) else: # Keras模型推理 return self.predict_keras(image) def predict_keras(self, image): """使用Keras模型预测""" # 预处理 img = self.preprocess_image(image, self.input_shape, np.float32) # 预测 predictions = self.model.predict(img, verbose=0)[0] class_idx = np.argmax(predictions) confidence = predictions[class_idx] class_name = self.class_labels[class_idx] return class_name, confidence def predict_tflite(self, image): """使用TFLite模型预测""" # 获取输入数据类型 input_dtype = self.input_details[0]['dtype'] # 预处理 img = self.preprocess_image(image, self.input_shape, input_dtype) # 设置输入张量 self.interpreter.set_tensor(self.input_details[0]['index'], img) # 执行推理 self.interpreter.invoke() # 获取输出 output_data = self.interpreter.get_tensor(self.output_details[0]['index']) predictions = output_data[0] # 解析结果 class_idx = np.argmax(predictions) confidence = predictions[class_idx] # 如果输出是量化数据,需要反量化 if self.output_details[0]['dtype'] == np.uint8: # 反量化输出 scale, zero_point = self.output_details[0]['quantization'] confidence = scale * (confidence - zero_point) class_name = self.class_labels[class_idx] return class_name, confidence def benchmark(self, image, runs=100): """ 模型性能基准测试 :param image: 测试图像 :param runs: 运行次数 :return: 平均推理时间(ms), 内存占用(MB) """ # 预热运行 self.predict(image) # 计时测试 start_time = tf.timestamp() for _ in range(runs): self.predict(image) end_time = tf.timestamp() avg_time_ms = (end_time - start_time).numpy() * 1000 / runs # 内存占用 if self.interpreter: # 计算输入张量内存占用 input_size = self.input_details[0]['shape'] dtype_size = np.dtype(self.input_details[0]['dtype']).itemsize mem_usage = np.prod(input_size) * dtype_size / (1024 * 1024) else: # 估算Keras模型内存 mem_usage = self.model.count_params() * 4 / (1024 * 1024) # 假设32位浮点数 return avg_time_ms, mem_usage def create_metadata(self, output_path): """ 创建并保存模型元数据文件 :param output_path: 元数据文件输出路径 """ metadata = { "model_type": "tflite" if self.model_path.endswith('.tflite') else "keras", "class_labels": self.class_labels, "input_size": self.input_shape, "input_dtype": str(self.input_details[0]['dtype']) if self.interpreter else "float32", "quantization": None } if self.interpreter and self.input_details[0]['dtype'] == np.uint8: metadata["quantization"] = { "input_scale": float(self.input_details[0]['quantization'][0]), "input_zero_point": int(self.input_details[0]['quantization'][1]), "output_scale": float(self.output_details[0]['quantization'][0]), "output_zero_point": int(self.output_details[0]['quantization'][1]) } with open(output_path, 'w') as f: json.dump(metadata, f, indent=4) return metadata def convert_to_tflite_with_metadata(self, output_path, quantize=False, representative_data_dir=None): """ 将Keras模型转换为TFLite格式并添加元数据 :param output_path: 输出TFLite文件路径 :param quantize: 是否进行量化 :param representative_data_dir: 代表性数据集目录 """ if not self.model_path.endswith(('.keras', '.h5')): raise ValueError("需要Keras模型格式进行转换") # 加载Keras模型 keras_model = tf.keras.models.load_model(self.model_path) # 创建转换器 converter = tf.lite.TFLiteConverter.from_keras_model(keras_model) if quantize: # 量化配置 converter.optimizations = [tf.lite.Optimize.DEFAULT] # 设置代表性数据集生成器 converter.representative_dataset = lambda: self.representative_dataset( representative_data_dir, input_size=self.input_shape ) # 设置输入输出类型 converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] converter.inference_input_type = tf.uint8 converter.inference_output_type = tf.uint8 # 转换模型 tflite_model = converter.convert() # 保存模型 with open(output_path, 'wb') as f: f.write(tflite_model) print(f"TFLite模型已保存到: {output_path}") # 添加元数据 self.add_tflite_metadata(output_path) return output_path def representative_dataset(self, data_dir=None, input_size=(224, 224), num_samples=100): """ 生成代表性数据集用于量化 :param data_dir: 真实数据目录 :param input_size: 输入尺寸 (height, width) :param num_samples: 样本数量 """ # 优先使用真实数据 if data_dir and os.path.exists(data_dir): image_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] # 限制样本数量 image_files = image_files[:min(len(image_files), num_samples)] print(f"使用 {len(image_files)} 张真实图像进行量化校准") for img_path in tqdm(image_files, desc="量化校准"): try: # 读取并预处理图像 img = cv2.imread(img_path) if img is None: continue img = cv2.resize(img, (input_size[1], input_size[0])) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(np.float32) / 255.0 # 转换为float32并归一化 img = np.expand_dims(img, axis=0) yield [img] except Exception as e: print(f"处理图像 {img_path} 时出错: {str(e)}") else: # 使用随机数据作为备选 print(f"使用随机数据生成 {num_samples} 个样本进行量化校准") for _ in range(num_samples): # 生成随机图像,归一化到[0,1]范围,使用float32类型 data = np.random.rand(1, input_size[0], input_size[1], 3).astype(np.float32) yield [data] def add_tflite_metadata(self, model_path): """为TFLite模型添加元数据""" # 创建标签文件 labels_path = os.path.join(os.path.dirname(model_path), "labels.txt") with open(labels_path, 'w') as f: for label in self.class_labels: f.write(f"{label}\n") # 创建元数据 metadata_path = os.path.join(os.path.dirname(model_path), "metadata.json") self.create_metadata(metadata_path) print(f"元数据已创建: {metadata_path}") print(f"标签文件已创建: {labels_path}") # 使用示例 if __name__ == "__main__": # 类别标签 CLASS_LABELS = ['book', 'cup', 'glasses', 'phone', 'shoe'] # 初始化部署器 deployer = ObjectRecognitionDeployer( model_path='optimized_model.keras', class_labels=CLASS_LABELS ) # 转换为带元数据的TFLite格式 tflite_path = 'model_quantized.tflite' # 使用真实数据目录进行量化校准 REPRESENTATIVE_DATA_DIR = 'path/to/representative_dataset' # 替换为实际路径 deployer.convert_to_tflite_with_metadata( tflite_path, quantize=True, representative_data_dir=REPRESENTATIVE_DATA_DIR ) # 重新加载带元数据的模型 tflite_deployer = ObjectRecognitionDeployer( model_path=tflite_path, class_labels=CLASS_LABELS ) # 测试预测 test_image = 'test_image.jpg' class_name, confidence = tflite_deployer.predict(test_image) print(f"预测结果: {class_name}, 置信度: {confidence:.2f}") # 性能测试 avg_time, mem_usage = tflite_deployer.benchmark(test_image) print(f"平均推理时间: {avg_time:.2f} ms") print(f"内存占用: {mem_usage:.2f} MB") # 创建元数据文件 metadata = deployer.create_metadata('model_metadata.json') print("模型元数据:", json.dumps(metadata, indent=4)) 上述代码我已经成功执行,并且我的ObjectRecognitionDeployer类路径导入代码是from 计算机视觉.test2 import ObjectRecognitionDeployer

class QuantLinear(nn.Module): """ Quantized Module that can perform quantized convolution or normal convolution. To activate quantization, please use set_quant_state function. """ def __init__( self, org_module: nn.Linear, weight_quant_params: dict = {}, act_quant_params: dict = {}, disable_input_quant=False, ): super().__init__() self.fwd_kwargs = dict() self.fwd_func = F.linear self.register_buffer('weight',org_module.weight) if org_module.bias is not None: self.register_buffer('bias',org_module.bias) else: self.bias = None self.in_features = org_module.in_features self.out_features = org_module.out_features # de-activate the quantized forward default self.use_weight_quant = False self.use_act_quant = False # initialize quantizer self.weight_quantizer = UniformAffineQuantizer(**weight_quant_params,shape=org_module.weight.shape) if not disable_input_quant: self.act_quantizer = UniformAffineQuantizer(**act_quant_params) else: self.act_quantizer = None self.disable_input_quant = disable_input_quant self.use_temporary_parameter = False def forward(self, input: torch.Tensor): if self.use_temporary_parameter: weight = self.temp_weight bias = self.temp_bias elif self.use_weight_quant: weight = self.weight_quantizer(self.weight) bias = self.bias else: weight = self.weight bias = self.bias if self.use_act_quant and not self.disable_input_quant: input = self.act_quantizer(input) out = self.fwd_func(input, weight, bias, **self.fwd_kwargs) return out def set_quant_state(self, weight_quant: bool = False, act_quant: bool = False): self.use_weight_quant = weight_quant self.use_act_quant = act_quant

import tensorrt as trt import numpy as np import time import pycuda.driver as cuda import pycuda.autoinit # 自定义INT8校准器类 class MyCalibrator(trt.IInt8EntropyCalibrator2): def __init__(self, batch_size=1, input_shape=(1, 3, 224, 224), cache_file="calibration.cache"): trt.IInt8EntropyCalibrator2.__init__(self) self.batch_size = batch_size self.input_shape = input_shape self.cache_file = cache_file # 生成随机校准数据(实际应用中应使用真实数据集) self.data = np.random.randn(100, *input_shape).astype(np.float32) self.current_index = 0 # 分配设备内存 self.device_input = cuda.mem_alloc(self.data.nbytes * self.batch_size) def get_batch_size(self): return self.batch_size def get_batch(self, names): if self.current_index + self.batch_size > len(self.data): return None batch = self.data[self.current_index:self.current_index+self.batch_size] self.current_index += self.batch_size # 将数据复制到设备 cuda.memcpy_htod(self.device_input, batch.tobytes()) return [int(self.device_input)] def read_calibration_cache(self): # 如果存在缓存,跳校准 try: with open(self.cache_file, "rb") as f: return f.read() except FileNotFoundError: return None def write_calibration_cache(self, cache): with open(self.cache_file, "wb") as f: f.write(cache) # 创建简单网络(使用 add_pooling_nd 替代 add_pooling) def create_network(): logger = trt.Logger(trt.Logger.WARNING) builder = trt.Builder(logger) network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) # 定义输入 input_tensor = network.add_input("input", trt.DataType.FLOAT, (1, 3, 224, 224)) # 添加卷积层(模拟ResNet-18第一层) conv1 = network.add_convolution_nd( input=input_tensor, num_output_maps=64, kernel_shape=(7, 7), kernel=np.random.randn(64, 3, 7, 7).astype(np.float32).ravel(), bias=np.random.randn(64).astype(np.float32) ) # 设置步长和填充(直接赋值属性) conv1.stride_nd = [2, 2] conv1.padding_nd = [3, 3] # 添加池化层(使用 add_pooling_nd) window_shape = trt.Dims([3, 3]) # 修改:使用 trt.Dims pool1 = network.add_pooling_nd( input=conv1.get_output(0), type=trt.PoolingType.MAX, window_size=window_shape ) pool1.stride_nd = [2, 2] # 设置池化步长 # 展平输出为二维张量(N, C*H*W) flatten = network.add_shuffle(pool1.get_output(0)) output_shape = (1, 64 * 56 * 56) # 假设输入是 (1, 64, 56, 56) flatten.reshape_dims = tuple(output_shape) # 定义权重和偏置 kernel = np.random.randn(1000, 64 * 56 * 56).astype(np.float32).ravel() bias = np.random.randn(1000).astype(np.float32) # 添加常量张量作为权重和偏置 w = network.add_constant((1000, 64 * 56 * 56), kernel) b = network.add_constant((1000,), bias) # 矩阵乘法:[input] @ [weights.T] + [bias] matmul = network.add_matrix_multiply(flatten.get_output(0), trt.MatrixOperation.NONE, w.get_output(0), trt.MatrixOperation.TRANSPOSE) bias_add = network.add_elementwise(matmul.get_output(0), b.get_output(0), trt.ElementWiseOperation.SUM) # 标记输出 bias_add.get_output(0).name = "output" network.mark_output(bias_add.get_output(0)) return builder, network, logger # 主测试函数 def int8_perf_test(): # 创建网络 builder, network, logger = create_network() # 配置INT8量化 config = builder.create_builder_config() config.set_flag(trt.BuilderFlag.INT8) config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 30) # 1GB工作空间 # 创建校准器并设置给配置 calib = MyCalibrator(batch_size=1, input_shape=(1, 3, 224, 224), cache_file="calibration.cache") config.int8_calibrator = calib # ✅ 正确方式:直接赋值属性 # 构建引擎(新API) engine = builder.build_engine(network, config) if not engine: print("Engine构建失败!") return # 创建执行上下文 context = engine.create_execution_context() # 准备输入输出缓冲区 input_shape = (1, 3, 224, 224) output_shape = (1, 1000) host_input = np.random.randn(*input_shape).astype(np.float32) host_output = np.empty(output_shape, dtype=np.float32) # 分配设备内存 d_input = cuda.mem_alloc(host_input.nbytes) d_output = cuda.mem_alloc(host_output.nbytes) bindings = [int(d_input), int(d_output)] # 创建CUDA流 stream = cuda.Stream() # 预热 for _ in range(5): cuda.memcpy_htod_async(d_input, host_input, stream) context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) cuda.memcpy_dtoh_async(host_output, d_output, stream) stream.synchronize() # 正式测试 start = time.time() for _ in range(100): cuda.memcpy_htod_async(d_input, host_input, stream) context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) cuda.memcpy_dtoh_async(host_output, d_output, stream) stream.synchronize() total_time = time.time() - start # 计算INT8 TOPS(基于ResNet-18的3.9G MACs) # 注意:INT8操作数 = MACs × 2(乘加各算一次操作) ops = 3.9e9 * 2 # ResNet-18的INT8操作数 tops = (ops * 100) / (total_time * 1e12) # 100次推理 print(f"INT8 TOPS: {tops:.2f} TOPS") # 清理资源 del context del engine if __name__ == "__main__": int8_perf_test() 报错 jp@jp-Super-Server:~/test$ python3 TensorRT_int8.py /home/jp/test/TensorRT_int8.py:113: DeprecationWarning: Use Deprecated in TensorRT 10.1. Superseded by explicit quantization. instead. config.int8_calibrator = calib # ✅ 正确方式:直接赋值属性 Traceback (most recent call last): File "/home/jp/test/TensorRT_int8.py", line 166, in <module> int8_perf_test() File "/home/jp/test/TensorRT_int8.py", line 116, in int8_perf_test engine = builder.build_engine(network, config) AttributeError: 'tensorrt_bindings.tensorrt.Builder' object has no attribute 'build_engine' 给出修改后的完整代码

import os import torch import transformers from transformers import ( AutoModelForCausalLM, AutoTokenizer, TrainingArguments, DataCollatorForLanguageModeling, BitsAndBytesConfig, Trainer ) from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training from datasets import load_dataset import logging import psutil import gc from datetime import datetime # === 配置区域 === MODEL_NAME = "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/models/Yi-6B" DATASET_PATH = "./data/train_lora_formatted.jsonl" OUTPUT_DIR = "./yi6b-lora-optimized" DEVICE_MAP = "auto" # 使用自动设备映射 # 确保输出目录存在 os.makedirs(OUTPUT_DIR, exist_ok=True) # === 内存优化配置 === os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # 减少内存碎片 torch.backends.cuda.cufft_plan_cache.clear() # 清理CUDA缓存 # === 增强的日志系统 === def setup_logging(output_dir): """配置日志系统,支持文件和TensorBoard""" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 文件日志处理器 file_handler = logging.FileHandler(os.path.join(output_dir, "training.log")) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) # TensorBoard日志目录 tensorboard_log_dir = os.path.join(output_dir, "logs", datetime.now().strftime("%Y%m%d-%H%M%S")) os.makedirs(tensorboard_log_dir, exist_ok=True) # 安装TensorBoard回调 tb_writer = None try: from torch.utils.tensorboard import SummaryWriter tb_writer = SummaryWriter(log_dir=tensorboard_log_dir) logger.info(f"TensorBoard日志目录: {tensorboard_log_dir}") except ImportError: logger.warning("TensorBoard未安装,可视化功能不可用") return logger, tb_writer logger, tb_writer = setup_logging(OUTPUT_DIR) # === 量化配置 - 使用更高效的配置 === quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) # === 加载模型 === logger.info("加载预训练模型...") model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, attn_implementation="flash_attention_2" # 使用FlashAttention优化内存 ) # === 分词器处理 === logger.info("加载分词器...") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) tokenizer.padding_side = "right" if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id # === 准备模型训练 === model = prepare_model_for_kbit_training( model, use_gradient_checkpointing=True # 启用梯度检查点以节省内存 ) # === LoRA 配置 - 优化内存使用 === logger.info("配置LoRA...") lora_config = LoraConfig( r=64, # 降低rank以减少内存使用 lora_alpha=32, # 降低alpha值 target_modules=["q_proj", "v_proj"], # 减少目标模块 lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) # 记录可训练参数 trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) total_params = sum(p.numel() for p in model.parameters()) logger.info(f"可训练参数: {trainable_params:,} / 总参数: {total_params:,} ({trainable_params / total_params:.2%})") # === 加载并预处理数据集 === logger.info("加载和预处理数据集...") dataset = load_dataset("json", data_files=DATASET_PATH, split="train") # 文本过滤函数 def is_valid_text(example): text = example.get("text", "") return text is not None and len(text.strip()) > 200 # 增加最小长度要求 dataset = dataset.filter(is_valid_text) logger.info(f"过滤后数据集大小: {len(dataset)} 条") # 动态填充的分词函数 - 节省内存 def tokenize_function(examples): tokenized = tokenizer( examples["text"], padding=True, # 使用动态填充 truncation=True, max_length=1024, # 降低上下文长度以减少内存使用 ) # 创建 labels - 因果语言建模需要 labels = input_ids tokenized["labels"] = tokenized["input_ids"].copy() return tokenized tokenized_dataset = dataset.map( tokenize_function, batched=True, remove_columns=["text"], batch_size=64, # 降低批处理大小以减少内存峰值 num_proc=4, # 减少进程数以降低内存开销 ) # === 数据整理器 === data_collator = DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False # 因果语言建模 ) # === 训练参数 - 优化内存使用 === report_to_list = ["tensorboard"] if tb_writer else [] training_args = TrainingArguments( output_dir=OUTPUT_DIR, per_device_train_batch_size=4, # 大幅降低批次大小 gradient_accumulation_steps=4, # 增加梯度累积步数以保持有效批次大小 learning_rate=2e-5, num_train_epochs=3, logging_steps=50, save_strategy="steps", save_steps=500, bf16=True, optim="paged_adamw_32bit", report_to=report_to_list, warmup_ratio=0.05, gradient_checkpointing=True, # 启用梯度检查点 fp16=False, max_grad_norm=0.3, # 降低梯度裁剪阈值 remove_unused_columns=True, # 移除未使用的列以节省内存 dataloader_num_workers=4, # 减少数据加载工作线程 evaluation_strategy="steps", eval_steps=500, save_total_limit=2, # 减少保存的检查点数量 logging_dir=os.path.join(OUTPUT_DIR, "logs"), load_best_model_at_end=True, ddp_find_unused_parameters=False, logging_first_step=True, group_by_length=True, lr_scheduler_type="cosine", weight_decay=0.01, ) # === GPU监控工具 === def monitor_gpu(): """监控GPU使用情况""" if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024 ** 3 mem_reserved = torch.cuda.memory_reserved(device) / 1024 ** 3 mem_total = torch.cuda.get_device_properties(device).total_memory / 1024 ** 3 return { "allocated": f"{mem_alloc:.2f} GB", "reserved": f"{mem_reserved:.2f} GB", "total": f"{mem_total:.2f} GB", "utilization": f"{mem_alloc / mem_total * 100:.1f}%" } return {} # === 创建训练器 === eval_dataset = None if len(tokenized_dataset) > 100: eval_dataset = tokenized_dataset.select(range(100)) trainer = Trainer( model=model, tokenizer=tokenizer, args=training_args, train_dataset=tokenized_dataset, eval_dataset=eval_dataset, data_collator=data_collator, ) # === 训练前验证 === def validate_data_and_model(): """验证数据和模型是否准备好训练""" logger.info("\n=== 训练前验证 ===") # 检查样本格式 sample = tokenized_dataset[0] logger.info(f"样本键: {list(sample.keys())}") logger.info(f"input_ids 长度: {len(sample['input_ids'])}") # 创建单个样本测试批次 test_batch = data_collator([sample]) # 移动数据到设备 test_batch = {k: v.to(model.device) for k, v in test_batch.items()} # 前向传播测试 model.train() outputs = model(**test_batch) loss_value = outputs.loss.item() logger.info(f"测试批次损失: {loss_value:.4f}") # 记录到TensorBoard if tb_writer: tb_writer.add_scalar("debug/test_loss", loss_value, 0) # 反向传播测试 outputs.loss.backward() logger.info("反向传播成功!") # 重置梯度 model.zero_grad() logger.info("验证完成,准备开始训练\n") # 记录初始GPU使用情况 gpu_status = monitor_gpu() logger.info(f"初始GPU状态: {gpu_status}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("system/initial_gpu", str(gpu_status), 0) validate_data_and_model() # === 自定义回调 - 监控资源使用 === class ResourceMonitorCallback(transformers.TrainerCallback): def __init__(self, tb_writer=None): self.tb_writer = tb_writer self.start_time = datetime.now() self.last_log_time = datetime.now() def on_step_end(self, args, state, control, **kwargs): current_time = datetime.now() time_diff = (current_time - self.last_log_time).total_seconds() # 每分钟记录一次资源使用情况 if time_diff > 60: self.last_log_time = current_time # GPU监控 gpu_status = monitor_gpu() logger.info(f"Step {state.global_step} - GPU状态: {gpu_status}") # CPU和内存监控 cpu_percent = psutil.cpu_percent() mem = psutil.virtual_memory() logger.info( f"CPU使用率: {cpu_percent}%, 内存使用: {mem.used / 1024 ** 3:.2f}GB/{mem.total / 1024 ** 3:.2f}GB") # 记录到TensorBoard if self.tb_writer: # GPU显存使用 if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024 ** 3 self.tb_writer.add_scalar("system/gpu_mem", mem_alloc, state.global_step) # CPU使用率 self.tb_writer.add_scalar("system/cpu_usage", cpu_percent, state.global_step) # 系统内存使用 self.tb_writer.add_scalar("system/ram_usage", mem.used / 1024 ** 3, state.global_step) def on_log(self, args, state, control, logs=None, **kwargs): """记录训练指标到TensorBoard""" if self.tb_writer and logs is not None: for metric_name, metric_value in logs.items(): if "loss" in metric_name or "lr" in metric_name or "grad_norm" in metric_name: self.tb_writer.add_scalar(f"train/{metric_name}", metric_value, state.global_step) def on_train_end(self, args, state, control, **kwargs): """训练结束时记录总时间""" training_time = datetime.now() - self.start_time logger.info(f"训练总时间: {training_time}") if self.tb_writer: self.tb_writer.add_text("system/total_time", str(training_time)) # 添加回调 trainer.add_callback(ResourceMonitorCallback(tb_writer=tb_writer)) # === 内存清理函数 === def clear_memory(): """清理内存和GPU缓存""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() logger.info("内存清理完成") # === 启动训练 === try: logger.info("开始训练...") # 分阶段训练以减少内存峰值 num_samples = len(tokenized_dataset) chunk_size = 1000 # 每次处理1000个样本 for i in range(0, num_samples, chunk_size): end_idx = min(i + chunk_size, num_samples) logger.info(f"训练样本 {i} 到 {end_idx - 1} / {num_samples}") # 创建子数据集 chunk_dataset = tokenized_dataset.select(range(i, end_idx)) # 更新训练器 trainer.train_dataset = chunk_dataset # 训练当前块 trainer.train() # 清理内存 clear_memory() # 保存训练指标 metrics = trainer.evaluate() trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) # 保存最佳模型 trainer.save_model(OUTPUT_DIR) tokenizer.save_pretrained(OUTPUT_DIR) logger.info(f"训练完成! 模型保存在: {OUTPUT_DIR}") # 记录最终指标到TensorBoard if tb_writer: for metric_name, metric_value in metrics.items(): tb_writer.add_scalar(f"final/{metric_name}", metric_value) tb_writer.close() except Exception as e: logger.error(f"训练出错: {e}") import traceback logger.error(traceback.format_exc()) # 尝试更小批量训练 logger.info("\n尝试更小批量训练...") small_dataset = tokenized_dataset.select(range(50)) trainer.train_dataset = small_dataset trainer.train() # 保存模型 trainer.save_model(f"{OUTPUT_DIR}_small") tokenizer.save_pretrained(f"{OUTPUT_DIR}_small") logger.info(f"小批量训练完成! 模型保存在: {OUTPUT_DIR}_small") # 记录错误到TensorBoard if tb_writer: tb_writer.add_text("error/exception", traceback.format_exc()) # 清理内存 clear_memory() # === 训练后验证 === def validate_final_model(): """验证训练后的模型""" logger.info("\n=== 训练后验证 ===") # 加载保存的模型 from peft import PeftModel # 仅加载基础模型配置 base_model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, load_in_4bit=True ) # 加载LoRA适配器 peft_model = PeftModel.from_pretrained(base_model, OUTPUT_DIR) # 不再合并LoRA权重,直接使用 peft_model 推理 peft_model.eval() # 测试生成 prompt = "中国的首都是" inputs = tokenizer(prompt, return_tensors="pt").to(peft_model.device) outputs = peft_model.generate( **inputs, max_new_tokens=50, # 减少生成长度 temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"提示: {prompt}") logger.info(f"生成结果: {generated}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("validation/sample", f"提示: {prompt}\n生成: {generated}") # 更全面的测试 test_prompts = [ "人工智能的未来发展趋势是", "如何学习深度学习?", "写一个关于太空探索的短故事:" ] for i, test_prompt in enumerate(test_prompts): inputs = tokenizer(test_prompt, return_tensors="pt").to(peft_model.device) outputs = peft_model.generate( **inputs, max_new_tokens=100, # 减少生成长度 temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"\n提示: {test_prompt}\n生成: {generated_text}\n{'=' * 50}") # 记录到TensorBoard if tb_writer: tb_writer.add_text(f"validation/test_{i}", f"提示: {test_prompt}\n生成: {generated_text}") logger.info("验证完成") # 执行验证 validate_final_model() # 关闭TensorBoard写入器 if tb_writer: tb_writer.close() logger.info("TensorBoard日志已关闭")

import logging import torch from datasets import load_dataset from peft import get_peft_model, LoraConfig, TaskType from transformers import ( AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling, ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class LoRATrainer: def __init__(self, config): self.config = config self.model_path = config['model_path'] self.train_file = config['train_file'] self.output_dir = config['output_dir'] self.max_steps = config.get('max_steps', 100) self._load_tokenizer() self._load_dataset() self._load_model() def _load_tokenizer(self): logger.info("[1] 加载 Tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained( self.model_path, trust_remote_code=True ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token def _load_dataset(self): logger.info("[2] 加载并 Tokenize 数据...") raw_dataset = load_dataset("text", data_files={"train": self.train_file}) def tokenize_function(example): return self.tokenizer( example["text"], truncation=True, max_length=2048, # 移除padding padding=False ) self.tokenized_dataset = raw_dataset.map( tokenize_function, batched=True, remove_columns=["text"] )["train"] def _load_model(self): logger.info("[3] 加载模型 + LoRA 适配...") # 配置4-bit量化 self.model = AutoModelForCausalLM.from_pretrained( self.model_path, device_map="auto", torch_dtype=torch.float16, trust_remote_code=True ) lora_config = LoraConfig( r=8, lora_alpha=32, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none", task_type=TaskType.CAUSAL_LM ) self.model = get_peft_model(self.model, lora_config) # 启用梯度检查点 self.model.gradient_checkpointing_enable() self.data_collator = DataCollatorForLanguageModeling( tokenizer=self.tokenizer, mlm=False, pad_to_multiple_of=8 # 提升GPU效率 ) def train(self): logger.info("[4] 开始 LoRA 微调训练...") training_args = TrainingArguments( output_dir=self.output_dir, per_device_train_batch_size=1, gradient_accumulation_steps=4, logging_steps=10, max_steps=self.max_steps, save_strategy="steps", save_steps=50, # 定期保存 report_to="none", fp16=True # 混合精度训练 ) trainer = Trainer( model=self.model, args=training_args, train_dataset=self.tokenized_dataset, tokenizer=self.tokenizer, data_collator=self.data_collator ) trainer.train() if __name__ == "__main__": config = { "model_path": r"C:\Users\vipuser\Documents\ai_writer_project_final_with_fixed_output_ui\models\Yi-6B", "train_file": "./data/train_cleaned.txt", "output_dir": "./output/lora", "max_steps": 100 } trainer = LoRATrainer(config) trainer.train()

class UniformAffineQuantizer(nn.Module): def __init__( self, n_bits: int = 8, symmetric: bool = False, per_channel_axes=[], metric="minmax", dynamic=False, dynamic_method="per_cluster", group_size=None, shape=None, lwc=False, disable_zero_point=False, ): """ support cluster quantize dynamic_method support per_token and per_cluster """ super().__init__() self.symmetric = symmetric self.disable_zero_point = disable_zero_point assert 2 <= n_bits <= 16, "bitwidth not supported" self.n_bits = n_bits if self.disable_zero_point: self.qmin = -(2 ** (n_bits - 1)) self.qmax = 2 ** (n_bits - 1) - 1 else: self.qmin = 0 self.qmax = 2 ** (n_bits) - 1 self.per_channel_axes = per_channel_axes self.metric = metric self.cluster_counts = None self.cluster_dim = None self.scale = None self.zero_point = None self.round_zero_point = None self.cached_xmin = None self.cached_xmax = None self.dynamic = dynamic self.dynamic_method = dynamic_method self.deficiency = 0 self.lwc = lwc init_value = 4. # inti value of learnable weight clipping if lwc: if group_size: dim1 = int(shape[0]*math.ceil(shape[1]/group_size)) self.deficiency = shape[-1]%group_size if self.deficiency > 0: self.deficiency = group_size - self.deficiency assert self.symmetric # support for mlc-llm symmetric quantization else: dim1 = shape[0] self.upbound_factor = nn.Parameter(torch.ones((dim1,1))*init_value) self.lowbound_factor = nn.Parameter(torch.ones((dim1,1))*init_value) self.sigmoid = nn.Sigmoid() self.enable = True self.group_size = group_size def change_n_bits(self, n_bits): self.n_bits = n_bits if self.disable_zero_point: self.qmin = -(2 ** (n_bits - 1)) self.qmax = 2 ** (n_bits - 1) - 1 else: self.qmin = 0 self.qmax = 2 ** (n_bits) - 1 def fake_quant(self, x, scale, round_zero_point): if self.deficiency > 0: pad_zeros = torch.zeros((x.shape[0],self.deficiency),dtype=x.dtype,device=x.device) x = torch.cat((x,pad_zeros),dim=1) if self.group_size: assert len(x.shape)==2, "only support linear layer now" dim1, dim2 = x.shape x = x.reshape(-1, self.group_size) x_int = round_ste(x / scale) if round_zero_point is not None: x_int = x_int.add(round_zero_point) x_int = x_int.clamp(self.qmin, self.qmax) x_dequant = x_int if round_zero_point is not None: x_dequant = x_dequant.sub(round_zero_point) x_dequant = x_dequant.mul(scale) if self.group_size: x_dequant = x_dequant.reshape(dim1, dim2) if self.deficiency > 0: x_dequant = x_dequant[:,:-self.deficiency] return x_dequant def forward(self, x: torch.Tensor): if self.n_bits >= 16 or not self.enable: return x if self.metric == "fix0to1": return x.mul_(2**self.n_bits-1).round_().div_(2**self.n_bits-1) if self.dynamic_method == "per_token" or self.dynamic_method == "per_channel": self.per_token_dynamic_calibration(x) else: raise NotImplementedError() x_dequant = self.fake_quant(x, self.scale, self.round_zero_point) return x_dequant def per_token_dynamic_calibration(self, x): if self.group_size: if self.deficiency == 0: x = x.reshape(-1,self.group_size) else: pad_zeros = torch.zeros((x.shape[0],self.deficiency),dtype=x.dtype,device=x.device) x = torch.cat((x,pad_zeros),dim=1) x = x.reshape(-1,self.group_size) reduce_shape = [-1] xmin = x.amin(reduce_shape, keepdim=True) xmax = x.amax(reduce_shape, keepdim=True) if self.lwc: xmax = self.sigmoid(self.upbound_factor)*xmax xmin = self.sigmoid(self.lowbound_factor)*xmin if self.symmetric: abs_max = torch.max(xmax.abs(),xmin.abs()) scale = abs_max / (2**(self.n_bits-1)-1) self.scale = scale.clamp(min=CLIPMIN, max=1e4) zero_point = (2**(self.n_bits-1)-1)*torch.ones_like(self.scale) else: range = xmax - xmin scale = range / (2**self.n_bits-1) self.scale = scale.clamp(min=CLIPMIN, max=1e4) zero_point = -(xmin) / (self.scale) if self.disable_zero_point: self.round_zero_point = None else: self.round_zero_point = zero_point.clamp(min=-1e4, max=1e4).round() def register_scales_and_zeros(self): self.register_buffer('scales', self.scale) self.register_buffer('zeros', self.round_zero_point) del self.scale del self.round_zero_point

import cv2 import numpy as np from rknn.api import RKNN import os ONNX_MODEL = 'model_Mnist_fixed.onnx' RKNN_MODEL = 'mnist.rknn' class RKNNConverterV1: def __init__(self): self.rknn = RKNN(verbose=True) self.sdk_version = "1.6.0" # 手动指定版本 def convert(self): """兼容1.x版本的转换流程""" try: self._check_files() self._configure_model() self._load_model() self._build_model() self._export_model() self._validate() except Exception as e: print(f"转换失败: {str(e)}") finally: self.rknn.release() def _check_files(self): """文件检查""" if not os.path.exists(ONNX_MODEL): raise FileNotFoundError(f"ONNX文件 {ONNX_MODEL} 不存在") def _configure_model(self): """版本兼容的配置方法""" print('\n[1/5] 配置模型参数') # 根据训练参数计算预处理值 # 原始归一化参数:mean=0.1307, std=0.3081 # 转换公式:input = (raw/255 - mean) / std # 等效RKNN配置:mean=0.1307*255=33.3285 → 33 # std=0.3081 → 1/0.3081=3.246 → 1/3.246≈0.308 config = { 'mean_values': [[33]], 'std_values': [[3.246]], # 修正后的正确值 'target_platform': 'rk3588', 'dynamic_input': [[1, 1, 28, 28]] # 1.x版本参数名 } # 版本特定配置 if self.sdk_version.startswith('1.'): config['optimization_level'] = 0 # 1.x版本需要关闭优化 ret = self.rknn.config(**config) if ret != 0: raise RuntimeError(f"配置失败,错误码: {ret}") print('✅ 配置完成') def _load_model(self): """加载ONNX模型""" print('\n[2/5] 加载模型') ret = self.rknn.load_onnx( model=ONNX_MODEL, input_size_list=[[1, 1, 28, 28]] # 固定输入形状 ) if ret != 0: raise RuntimeError(f"加载失败,错误码: {ret}") print('✅ 加载完成') def _build_model(self): """构建模型""" print('\n[3/5] 构建模型') ret = self.rknn.build( do_quantization=False, # 1.x版本需要额外参数 batch_size=-1, enable_mask=False ) if ret != 0: raise RuntimeError(f"构建失败,错误码: {ret}") print('✅ 构建完成') def _export_model(self): """导出模型""" print('\n[4/5] 导出模型') ret = self.rknn.export_rknn(RKNN_MODEL) if ret != 0: raise RuntimeError(f"导出失败,错误码: {ret}") print(f'✅ 导出完成: {RKNN_MODEL}') def _validate(self): """验证模型""" print('\n[5/5] 验证模型') if not os.path.exists('test_3.png'): print('⚠️ 跳过验证(缺少测试图片)') return # 初始化运行时 ret = self.rknn.init_runtime() if ret != 0: print("⚠️ 运行时初始化失败") return # 预处理测试图片 img = cv2.imread('test_3.png', cv2.IMREAD_GRAYSCALE) img = cv2.resize(img, (28, 28)) input_data = np.expand_dims(img, axis=(0, 1)).astype(np.float32) # 执行推理 outputs = self.rknn.inference(inputs=[input_data]) pred = np.argmax(outputs[0][0]) print(f'预测结果: {pred} (期望值: 3)') if __name__ == '__main__': converter = RKNNConverterV1() converter.convert() 分析这部分代码

import os import cv2 import sys import argparse import time # add path realpath = os.path.abspath(__file__) _sep = os.path.sep realpath = realpath.split(_sep) try: zoo_root_index = next(i for i, part in enumerate(realpath) if 'rknn_model_zoo' in part) rknn_model_zoo_path = os.path.join(realpath[0]+_sep, *realpath[1:zoo_root_index+1]) sys.path.append(rknn_model_zoo_path) except StopIteration: raise ValueError("Could not find 'rknn_model_zoo' directory in the path: {}".format(os.path.abspath(__file__))) from py_utils.coco_utils import COCO_test_helper import numpy as np OBJ_THRESH = 0.25 NMS_THRESH = 0.45 # The follew two param is for map test # OBJ_THRESH = 0.001 # NMS_THRESH = 0.65 IMG_SIZE = (640, 640) # (width, height), such as (1280, 736) CLASSES = ("car","white") coco_id_list = [1,2] def filter_boxes(boxes, box_confidences, box_class_probs): """Filter boxes with object threshold. """ box_confidences = box_confidences.reshape(-1) candidate, class_num = box_class_probs.shape class_max_score = np.max(box_class_probs, axis=-1) classes = np.argmax(box_class_probs, axis=-1) _class_pos = np.where(class_max_score* box_confidences >= OBJ_THRESH) scores = (class_max_score* box_confidences)[_class_pos] boxes = boxes[_class_pos] classes = classes[_class_pos] return boxes, classes, scores def nms_boxes(boxes, scores): """Suppress non-maximal boxes. # Returns keep: ndarray, index of effective boxes. """ x = boxes[:, 0] y = boxes[:, 1] w = boxes[:, 2] - boxes[:, 0] h = boxes[:, 3] - boxes[:, 1] areas = w * h order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x[i], x[order[1:]]) yy1 = np.maximum(y[i], y[order[1:]]) xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]]) yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]]) w1 = np.maximum(0.0, xx2 - xx1 + 0.00001) h1 = np.maximum(0.0, yy2 - yy1 + 0.00001) inter = w1 * h1 ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= NMS_THRESH)[0] order = order[inds + 1] keep = np.array(keep) return keep def dfl(position): # Distribution Focal Loss (DFL) import torch x = torch.tensor(position) n,c,h,w = x.shape p_num = 4 mc = c//p_num y = x.reshape(n,p_num,mc,h,w) y = y.softmax(2) acc_metrix = torch.tensor(range(mc)).float().reshape(1,1,mc,1,1) y = (y*acc_metrix).sum(2) return y.numpy() def box_process(position): grid_h, grid_w = position.shape[2:4] col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h)) col = col.reshape(1, 1, grid_h, grid_w) row = row.reshape(1, 1, grid_h, grid_w) grid = np.concatenate((col, row), axis=1) stride = np.array([IMG_SIZE[1]//grid_h, IMG_SIZE[0]//grid_w]).reshape(1,2,1,1) position = dfl(position) box_xy = grid +0.5 -position[:,0:2,:,:] box_xy2 = grid +0.5 +position[:,2:4,:,:] xyxy = np.concatenate((box_xy*stride, box_xy2*stride), axis=1) return xyxy def post_process(input_data): boxes, scores, classes_conf = [], [], [] defualt_branch=3 pair_per_branch = len(input_data)//defualt_branch # Python 忽略 score_sum 输出 for i in range(defualt_branch): boxes.append(box_process(input_data[pair_per_branch*i])) classes_conf.append(input_data[pair_per_branch*i+1]) scores.append(np.ones_like(input_data[pair_per_branch*i+1][:,:1,:,:], dtype=np.float32)) def sp_flatten(_in): ch = _in.shape[1] _in = _in.transpose(0,2,3,1) return _in.reshape(-1, ch) boxes = [sp_flatten(_v) for _v in boxes] classes_conf = [sp_flatten(_v) for _v in classes_conf] scores = [sp_flatten(_v) for _v in scores] boxes = np.concatenate(boxes) classes_conf = np.concatenate(classes_conf) scores = np.concatenate(scores) # filter according to threshold boxes, classes, scores = filter_boxes(boxes, scores, classes_conf) # nms nboxes, nclasses, nscores = [], [], [] for c in set(classes): inds = np.where(classes == c) b = boxes[inds] c = classes[inds] s = scores[inds] keep = nms_boxes(b, s) if len(keep) != 0: nboxes.append(b[keep]) nclasses.append(c[keep]) nscores.append(s[keep]) if not nclasses and not nscores: return None, None, None boxes = np.concatenate(nboxes) classes = np.concatenate(nclasses) scores = np.concatenate(nscores) return boxes, classes, scores def draw(image, boxes, scores, classes): for box, score, cl in zip(boxes, scores, classes): top, left, right, bottom = [int(_b) for _b in box] print("%s @ (%d %d %d %d) %.3f" % (CLASSES[cl], top, left, right, bottom, score)) cv2.rectangle(image, (top, left), (right, bottom), (255, 0, 0), 2) cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score), (top, left - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) def setup_model(args): model_path = args.model_path if model_path.endswith('.pt') or model_path.endswith('.torchscript'): platform = 'pytorch' from py_utils.pytorch_executor import Torch_model_container model = Torch_model_container(args.model_path) elif model_path.endswith('.rknn'): platform = 'rknn' from py_utils.rknn_executor import RKNN_model_container model = RKNN_model_container(args.model_path, args.target, args.device_id) elif model_path.endswith('onnx'): platform = 'onnx' from py_utils.onnx_executor import ONNX_model_container model = ONNX_model_container(args.model_path) else: assert False, "{} is not rknn/pytorch/onnx model".format(model_path) print('Model-{} is {} model, starting val'.format(model_path, platform)) return model, platform def img_check(path): img_type = ['.jpg', '.jpeg', '.png', '.bmp'] for _type in img_type: if path.endswith(_type) or path.endswith(_type.upper()): return True return False if __name__ == '__main__': # Create a dummy args object class Args: pass args = Args() args.model_path = '/home/cat/NPU/rknn_model_zoo-main/examples/yolov8/model/whitenu8.rknn' args.target = 'rk3576' args.device_id = None # init model model, platform = setup_model(args) co_helper = COCO_test_helper(enable_letter_box=True) # init camera cap = cv2.VideoCapture(0) if not cap.isOpened(): print("Error: Could not open camera.") exit() print("Press 'q' to quit.") # run test while True: start_time = time.time() ret, img_src = cap.read() if not ret: print("Error: Failed to capture frame.") break # Due to rga init with (0,0,0), we using pad_color (0,0,0) instead of (114, 114, 114) pad_color = (0,0,0) img = co_helper.letter_box(im= img_src.copy(), new_shape=(IMG_SIZE[1], IMG_SIZE[0]), pad_color=(0,0,0)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # preprocee if not rknn model if platform in ['pytorch', 'onnx']: input_data = img.transpose((2,0,1)) input_data = input_data.reshape(1,*input_data.shape).astype(np.float32) input_data = input_data/255. else: input_data = img outputs = model.run([input_data]) boxes, classes, scores = post_process(outputs) img_p = img_src.copy() if boxes is not None: draw(img_p, co_helper.get_real_box(boxes), scores, classes) end_time = time.time() fps = 1 / (end_time - start_time) cv2.putText(img_p, f"FPS: {int(fps)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.imshow("yolov8 detection", img_p) if cv2.waitKey(1) & 0xFF == ord('q'): break # release cap.release() cv2.destroyAllWindows() model.release()

import os import urllib import traceback import time import sys import numpy as np import cv2 from rknn.api import RKNN ONNX_MODEL = '/home/vincent/桌面/x66/final_model/best_sim.onnx' RKNN_MODEL = '/home/vincent/桌面/x66/final_model/rk-yolov5-dual.rknn' IMG_PATH = './bus.jpg' DATASET = '/home/vincent/桌面/x66/lianghua_testdata/lianghua_ceshi.txt' QUANTIZE_ON = True BOX_THESH = 0.6 NMS_THRESH = 0.01 IMG_SIZE = 416 CLASSES = ("tank_1", "tank_2", "zjc_1", "zjc_2", "fsc_1", "fsc_2", "bus", "car", "truck") def sigmoid(x): return 1 / (1 + np.exp(-x)) def xywh2xyxy(x): # Convert [x, y, w, h] to [x1, y1, x2, y2] y = np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def process(input, mask, anchors): anchors = [anchors[i] for i in mask] grid_h, grid_w = map(int, input.shape[0:2]) box_confidence = sigmoid(input[..., 4]) box_confidence = np.expand_dims(box_confidence, axis=-1) box_class_probs = sigmoid(input[..., 5:]) box_xy = sigmoid(input[..., :2])*2 - 0.5 col = np.tile(np.arange(0, grid_w), grid_w).reshape(-1, grid_w) row = np.tile(np.arange(0, grid_h).reshape(-1, 1), grid_h) col = col.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2) row = row.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2) grid = np.concatenate((col, row), axis=-1) box_xy += grid box_xy *= int(IMG_SIZE/grid_h) box_wh = pow(sigmoid(input[..., 2:4])*2, 2) box_wh = box_wh * anchors box = np.concatenate((box_xy, box_wh), axis=-1) return box, box_confidence, box_class_probs def filter_boxes(boxes, box_confidences, box_class_probs): """Filter boxes with box threshold. It's a bit different with origin yolov5 post process! # Arguments boxes: ndarray, boxes of objects. box_confidences: ndarray, confidences of objects. box_class_probs: ndarray, class_probs of objects. # Returns boxes: ndarray, filtered boxes. classes: ndarray, classes for boxes. scores: ndarray, scores for boxes. """ box_classes = np.argmax(box_class_probs, axis=-1) box_class_scores = np.max(box_class_probs, axis=-1) pos = np.where(box_confidences[..., 0] >= BOX_THESH) boxes = boxes[pos] classes = box_classes[pos] scores = box_class_scores[pos] return boxes, classes, scores def nms_boxes(boxes, scores): """Suppress non-maximal boxes. # Arguments boxes: ndarray, boxes of objects. scores: ndarray, scores of objects. # Returns keep: ndarray, index of effective boxes. """ x = boxes[:, 0] y = boxes[:, 1] w = boxes[:, 2] - boxes[:, 0] h = boxes[:, 3] - boxes[:, 1] areas = w * h order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x[i], x[order[1:]]) yy1 = np.maximum(y[i], y[order[1:]]) xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]]) yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]]) w1 = np.maximum(0.0, xx2 - xx1 + 0.00001) h1 = np.maximum(0.0, yy2 - yy1 + 0.00001) inter = w1 * h1 ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= NMS_THRESH)[0] order = order[inds + 1] keep = np.array(keep) return keep def yolov5_post_process(input_data): masks = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] anchors = [[10, 13], [16, 30], [33, 23], [30, 61], [62, 45], [59, 119], [116, 90], [156, 198], [373, 326]] boxes, classes, scores = [], [], [] for input, mask in zip(input_data, masks): b, c, s = process(input, mask, anchors) b, c, s = filter_boxes(b, c, s) boxes.append(b) classes.append(c) scores.append(s) boxes = np.concatenate(boxes) boxes = xywh2xyxy(boxes) classes = np.concatenate(classes) scores = np.concatenate(scores) nboxes, nclasses, nscores = [], [], [] for c in set(classes): inds = np.where(classes == c) b = boxes[inds] c = classes[inds] s = scores[inds] keep = nms_boxes(b, s) nboxes.append(b[keep]) nclasses.append(c[keep]) nscores.append(s[keep]) if not nclasses and not nscores: return None, None, None boxes = np.concatenate(nboxes) classes = np.concatenate(nclasses) scores = np.concatenate(nscores) return boxes, classes, scores def draw(image, boxes, scores, classes): """Draw the boxes on the image. # Argument: image: original image. boxes: ndarray, boxes of objects. classes: ndarray, classes of objects. scores: ndarray, scores of objects. all_classes: all classes name. """ for box, score, cl in zip(boxes, scores, classes): top, left, right, bottom = box print('class: {}, score: {}'.format(CLASSES[cl], score)) print('box coordinate left,top,right,down: [{}, {}, {}, {}]'.format(top, left, right, bottom)) top = int(top) left = int(left) right = int(right) bottom = int(bottom) cv2.rectangle(image, (top, left), (right, bottom), (255, 0, 0), 2) cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score), (top, left - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) def letterbox(im, new_shape=(640, 640), color=(0, 0, 0)): # Resize and pad image while meeting stride-multiple constraints shape = im.shape[:2] # current shape [height, width] if isinstance(new_shape, int): new_shape = (new_shape, new_shape) # Scale ratio (new / old) r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) # Compute padding ratio = r, r # width, height ratios new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding dw /= 2 # divide padding into 2 sides dh /= 2 if shape[::-1] != new_unpad: # resize im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border return im, ratio, (dw, dh) if __name__ == '__main__': ''' # Create RKNN object rknn = RKNN(verbose=True) # pre-process config print('--> Config model') rknn.config(mean_values=[[0, 0, 0]], std_values=[[1, 1, 1]], target_platform='rk3588') print('done') # Load ONNX model print('--> Loading model') ret = rknn.load_onnx(model=ONNX_MODEL, outputs=['output0', '376', '377']) if ret != 0: print('Load model failed!') exit(ret) print('done') # Build model print('--> Building model') ret = rknn.build(do_quantization=QUANTIZE_ON, dataset=DATASET) if ret != 0: print('Build model failed!') exit(ret) print('done') # Export RKNN model print('--> Export rknn model') ret = rknn.export_rknn(RKNN_MODEL) if ret != 0: print('Export rknn model failed!') exit(ret) print('done') # Init runtime environment print('--> Init runtime environment') ret = rknn.init_runtime(target='rk3588') # ret = rknn.init_runtime('rk3566') if ret != 0: print('Init runtime environment failed!') exit(ret) print('done') ''' # load rknn model RKNN_MODEL = '/home/vincent/桌面/x66/end_model/end_0516_sim_255_500data-416-416_rk3588.rknn' rknn = RKNN(verbose=True) rknn.load_rknn(path=RKNN_MODEL) print('Finished loading model!') # Init runtime environment print('--> Init runtime environment') ret = rknn.init_runtime(target='rk3588', core_mask=RKNN.NPU_CORE_0_1_2, async_mode=True) CCD_PATH = '/home/vincent/wh/code/20240910_x66_code/pyqt_tensorrt_thread/data/XN4_car/JPEGImages_ccd' HW_PATH = '/home/vincent/wh/code/20240910_x66_code/pyqt_tensorrt_thread/data/XN4_car/JPEGImages_hw' total_infer_time=0 count=0 # rknn inference for ccd_png, hw_png in zip(os.listdir(CCD_PATH), os.listdir(HW_PATH)): print(os.path.join(CCD_PATH, ccd_png)) img_ccd = cv2.imread(os.path.join(CCD_PATH, ccd_png), cv2.IMREAD_GRAYSCALE) img_hw = cv2.imread(os.path.join(HW_PATH, hw_png), cv2.IMREAD_GRAYSCALE) ori_h, ori_w = img_ccd.shape # im_zero = np.zeros((ori_h,ori_w)).astype(np.uint8) im_zero = np.zeros_like(img_ccd) # img_yolo_ori = np.dstack((img_hw, im_zero, img_ccd))#BGR img_yolo_ori = cv2.merge([img_hw, im_zero, img_ccd]) img_yolo_resize=cv2.resize(img_yolo_ori, (416, 416)) # img_yolo = img_yolo_resize[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB img_yolo=img_yolo_resize.astype(np.float32) img_yolo = np.ascontiguousarray(img_yolo) # img_yolo /= 255.0 # 0 - 255 to 0.0 - 1.0 img_yolo = np.expand_dims(img_yolo, 0) # (1, 416, 416, 3) # Inference start_time = time.time() print('--> Running model') outputs = rknn.inference(inputs=[img_yolo], data_format=['nhwc']) # print('!!!!', outputs[0].shape) end_time = time.time() print('process_time:', (end_time - start_time) * 1000) total_infer_time+=(end_time-start_time) count+=1 np.save('./onnx_yolov5_0.npy', outputs[0]) np.save('./onnx_yolov5_1.npy', outputs[1]) np.save('./onnx_yolov5_2.npy', outputs[2]) print('done') # post process input0_data = outputs[0] input1_data = outputs[1] input2_data = outputs[2] input0_data = input0_data.reshape([3, -1]+list(input0_data.shape[-2:])) input1_data = input1_data.reshape([3, -1]+list(input1_data.shape[-2:])) input2_data = input2_data.reshape([3, -1]+list(input2_data.shape[-2:])) input_data = list() input_data.append(np.transpose(input0_data, (2, 3, 0, 1))) input_data.append(np.transpose(input1_data, (2, 3, 0, 1))) input_data.append(np.transpose(input2_data, (2, 3, 0, 1))) boxes, classes, scores = yolov5_post_process(input_data) img_ = np.squeeze(img_yolo, axis=0) img_ = img_ / 255 img_1 = cv2.cvtColor(img_, cv2.COLOR_RGB2BGR) if boxes is not None: draw(img_1, boxes, scores, classes) # show output cv2.imshow("post process result", img_1) cv2.waitKey(1) # cv2.destroyAllWindows() rknn.release() print("avg infer time->{:.2f}ms".format((total_infer_time) * 1000/count))

#include <opencv2/opencv.hpp> #include <vector> #include <random> #include <bitset> void f5EmbedColorImage(cv::Mat& image, const std::string& secret_message) { // 量化矩阵 (需要转置为OpenCV的存储顺序) cv::Mat QUANTIZATION_MATRIX = (cv::Mat_<float>(8,8) << 16, 11, 10, 16, 24, 40, 51, 61, 12, 12, 14, 19, 26, 58, 60, 55, 14, 13, 16, 24, 40, 57, 69, 56, 14, 17, 22, 29, 51, 87, 80, 62, 18, 22, 37, 56, 68, 109, 103, 77, 24, 35, 55, 64, 81, 104, 113, 92, 49, 64, 78, 87, 103, 121, 120, 101, 72, 92, 95, 98, 112, 100, 103, 99); // 转换秘密信息为二进制位序列 std::string secret_bits; for (char c : secret_message) { secret_bits += std::bitset<8>(c).to_string(); } // 转换到YCrCb颜色空间 cv::Mat yuv_image; cv::cvtColor(image, yuv_image, cv::COLOR_BGR2YCrCb); std::vector<cv::Mat> channels(3); cv::split(yuv_image, channels); cv::Mat& y_channel = channels[0]; const int block_size = 8; const int rows = y_channel.rows; const int cols = y_channel.cols; // 生成块坐标列表并洗牌 std::vector<cv::Point> blocks; for (int y = 0; y < rows; y += block_size) { for (int x = 0; x < cols; x += block_size) { blocks.emplace_back(x, y); } } std::random_device rd; std::shuffle(blocks.begin(), blocks.end(), std::mt19937(rd())); size_t bit_idx = 0; for (const auto& block : blocks) { if (bit_idx >= secret_bits.size()) break; cv::Rect roi(block.x, block.y, block_size, block_size); if (roi.x + roi.width > cols || roi.y + roi.height > rows) continue; cv::Mat block_data = y_channel(roi).clone(); block_data.convertTo(block_data, CV_32F); // DCT变换 cv::Mat dct_coeffs; cv::dct(block_data, dct_coeffs); // 量化和嵌入 for (int i = 0; i < block_size; ++i) { for (int j = 0; j < block_size; ++j) { if (i == 0 && j == 0) continue; // 跳过直流分量 // 量化 float q_val = QUANTIZATION_MATRIX.at<float>(i,j); float quantized = std::round(dct_coeffs.at<float>(i,j) / q_val); if (quantized == 0) continue; if (bit_idx >= secret_bits.size()) break; // 嵌入信息 int target_bit = secret_bits[bit_idx] - '0'; int current_bit = static_cast<int>(quantized) % 2; if (current_bit != target_bit) { quantized += (quantized > 0) ? -1 : 1; } // 反量化并更新系数 dct_coeffs.at<float>(i,j) = quantized * q_val; ++bit_idx; } } // 逆DCT cv::Mat idct_block; cv::idct(dct_coeffs, idct_block); idct_block.convertTo(idct_block, y_channel.type()); idct_block.copyTo(y_channel(roi)); } // 合并通道并转换回BGR cv::merge(channels, yuv_image); cv::cvtColor(yuv_image, image, cv::COLOR_YCrCb2BGR); }根据上面代码转换提取代码std::string f5ExtractColorImage(Mat& img)

// 嵌入时写入长度 std::string message = "hello"; uint32_t msg_length = htonl(message.size()); // 转换为网络字节序 fwrite(&msg_length, sizeof(uint32_t), 1, output_file); // 先写长度头 fwrite(message.data(), 1, message.size(), output_file); // 再写数据 // 提取时读取长度头 uint32_t extracted_length; fread(&extracted_length, sizeof(uint32_t), 1, input_file); extracted_length = ntohl(extracted_length); 将上面代码加入我下面代码中,再返回完整代码给我 void f5EmbedColorImage(cv::Mat& image, const std::string& secret_message) { // 转换秘密信息为二进制位序列 std::string secret_bits; for (char c : secret_message) { secret_bits += std::bitset<8>(c).to_string(); } // 转换到YCrCb颜色空间 cv::Mat yuv_image; cv::cvtColor(image, yuv_image, cv::COLOR_BGR2YCrCb); std::vector<cv::Mat> channels(3); cv::split(yuv_image, channels); cv::Mat& y_channel = channels[0]; const int block_size = 8; const int rows = y_channel.rows; const int cols = y_channel.cols; // 生成块坐标列表并洗牌 std::vector<cv::Point> blocks; for (int y = 0; y < rows; y += block_size) { for (int x = 0; x < cols; x += block_size) { blocks.emplace_back(x, y); } } std::random_device rd; std::shuffle(blocks.begin(), blocks.end(), std::mt19937(12345)); size_t bit_idx = 0; for (const auto& block : blocks) { if (bit_idx >= secret_bits.size()) break; cv::Rect roi(block.x, block.y, block_size, block_size); if (roi.x + roi.width > cols || roi.y + roi.height > rows) continue; cv::Mat block_data = y_channel(roi).clone(); block_data.convertTo(block_data, CV_32F); // DCT变换 cv::Mat dct_coeffs; cv::dct(block_data, dct_coeffs); // 量化和嵌入 for (int i = 0; i < block_size; ++i) { for (int j = 0; j < block_size; ++j) { if (i == 0 && j == 0) continue; // 跳过直流分量 // 量化 float q_val = QUANTIZATION_MATRIX.at<float>(i,j); float quantized = std::round(dct_coeffs.at<float>(i,j) / q_val); if (quantized == 0) continue; if (bit_idx >= secret_bits.size()) break; // 嵌入信息 int target_bit = secret_bits[bit_idx] - '0'; int current_bit = static_cast<int>(quantized) % 2; if (current_bit != target_bit) { quantized += (quantized > 0) ? -1 : 1; } // 反量化并更新系数 dct_coeffs.at<float>(i,j) = quantized * q_val; ++bit_idx; } } // 逆DCT cv::Mat idct_block; cv::idct(dct_coeffs, idct_block); idct_block.convertTo(idct_block, y_channel.type()); idct_block.copyTo(y_channel(roi)); } // 合并通道并转换回BGR cv::merge(channels, yuv_image); cv::cvtColor(yuv_image, image, cv::COLOR_YCrCb2BGR); }

uint32_t extracted_length; fread(&extracted_length, sizeof(uint32_t), 1, input_file); extracted_length = ntohl(extracted_length);提取代码也加入上面代码,进行修改std::string f5ExtractColorImage(cv::Mat& img) { // 转换到YCrCb颜色空间 cv::Mat yuv_image; cv::cvtColor(img, yuv_image, cv::COLOR_BGR2YCrCb); std::vector<cv::Mat> channels; cv::split(yuv_image, channels); cv::Mat y_channel = channels[0].clone(); // 生成块坐标并洗牌(必须与嵌入时相同的随机逻辑) const int block_size = 8; std::vector<cv::Point> blocks; for (int y = 0; y < y_channel.rows; y += block_size) { for (int x = 0; x < y_channel.cols; x += block_size) { blocks.emplace_back(x, y); } } std::mt19937 rng(12345); // 固定种子确保洗牌顺序一致 std::shuffle(blocks.begin(), blocks.end(), rng); std::string extracted_bits; // 遍历每个块提取信息 for (const auto& block : blocks) { cv::Rect roi(block.x, block.y, block_size, block_size); if (roi.br().x > y_channel.cols || roi.br().y > y_channel.rows) continue; cv::Mat block_data = y_channel(roi).clone(); block_data.convertTo(block_data, CV_32F); // DCT变换 cv::Mat dct_coeffs; cv::dct(block_data, dct_coeffs); // 提取量化系数的LSB for (int i = 0; i < block_size; ++i) { for (int j = 0; j < block_size; ++j) { if (i == 0 && j == 0) continue; // 跳过直流分量 float q_val = QUANTIZATION_MATRIX.at<float>(i, j); float quantized = std::round(dct_coeffs.at<float>(i, j) / q_val); if (quantized == 0) continue; // 跳过未嵌入的系数 // 提取最低有效位 extracted_bits += (static_cast<int>(quantized) % 2) ? '1' : '0'; } } } // 二进制转字符串 std::string secret_message; for (size_t i = 0; i < extracted_bits.size(); i += 8) { if (i + 8 > extracted_bits.size()) break; secret_message += static_cast<char>( std::bitset<8>(extracted_bits.substr(i, 8)).to_ulong() ); } // 移除可能存在的填充空字符 size_t null_pos = secret_message.find('\0'); return (null_pos != std::string::npos) ? secret_message.substr(0, null_pos) : secret_message; }提取代码也修改一下

# -*- coding: utf-8 -*- """ Created on Fri Mar 5 19:13:21 2021 @author: LXM """ import torch import torch.nn as nn from torch.autograd import Function class UpdateRange(nn.Module): def __init__(self, device): super(UpdateRange, self).__init__() self.device = device self.flag = 0 self.fmin = torch.zeros((1), dtype = torch.float32, device = self.device) self.fmax = torch.zeros((1), dtype = torch.float32, device = self.device) def Update(self, fmin, fmax): if self.flag == 0: self.flag = 1 new_fmin = fmin new_fmax = fmax else: new_fmin = torch.min(fmin, self.fmin) new_fmax = torch.max(fmax, self.fmax) self.fmin.copy_(new_fmin) self.fmax.copy_(new_fmax) @torch.no_grad() def forward(self, input): fmin = torch.min(input) fmax = torch.max(input) self.Update(fmin, fmax) class Round(Function): @staticmethod def forward(self, input): # output = torch.round(input) # output = torch.floor(input) output = input.int().float() return output @staticmethod def backward(self, output): input = output.clone() return input class Quantizer(nn.Module): def __init__(self, bits, device): super(Quantizer, self).__init__() self.bits = bits self.scale = 1 self.UpdateRange = UpdateRange(device) self.qmin = torch.tensor((-((1 << (bits - 1)) - 1)), device = device) self.qmax = torch.tensor((+((1 << (bits - 1)) - 1)), device = device) def round(self, input): output = Round.apply(input) return output def Quantization(self): quant_range = float(1 << (self.bits - 1)) float_range = torch.max(torch.abs(self.UpdateRange.fmin), torch.abs(self.UpdateRange.fmax)) scale = 1 for i in range(32): if torch.round(float_range * (1 << i)) < quant_range: scale = 1 << i else: break self.scale = scale def forward(self, input): if self.training: self.UpdateRange(input) self.Quantization() output = (torch.clamp(self.round(input * self.scale), self.qmin, self.qmax)) / self.scale return output

import os import json from flask import Flask, jsonify, request, render_template from datetime import datetime import logging import glob import time app = Flask(__name__) # 配置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 定义两个基础路径 TRACK_BASE_PATH = r"C:\Users\l30078648\Desktop\250730\.track" # 原始路径,用于Pre Build ID JSON_BASE_PATH = r"C:\Users\l30078648\Desktop\250730\json" # 新路径,用于性能指标 # 定义芯片和模型组路径映射(两个不同的路径) TRACK_PATHS = { "Ascend610Lite": { "rl_nn": os.path.join(TRACK_BASE_PATH, "Ascend610Lite", "rl_nn"), "rsc_nn": os.path.join(TRACK_BASE_PATH, "Ascend610Lite", "rsc_nn"), "prediction_nn": os.path.join(TRACK_BASE_PATH, "Ascend610Lite", "prediction_nn") }, "BS9SX1A": { "rl_nn": os.path.join(TRACK_BASE_PATH, "BS9SX1A", "rl_nn"), "rsc_nn": os.path.join(TRACK_BASE_PATH, "BS9SX1A", "rsc_nn"), "prediction_nn": os.path.join(TRACK_BASE_PATH, "BS9SX1A", "prediction_nn") } } JSON_PATHS = { "Ascend610Lite": { "rl_nn": os.path.join(JSON_BASE_PATH, "Ascend610Lite", "rl_nn"), "rsc_nn": os.path.join(JSON_BASE_PATH, "Ascend610Lite", "rsc_nn"), "prediction_nn": os.path.join(JSON_BASE_PATH, "Ascend610Lite", "prediction_nn") }, "BS9SX1A": { "rl_nn": os.path.join(JSON_BASE_PATH, "BS9SX1A", "rl_nn"), "rsc_nn": os.path.join(JSON_BASE_PATH, "BS9SX1A", "rsc_nn"), "prediction_nn": os.path.join(JSON_BASE_PATH, "BS9SX1A", "prediction_nn") } } def get_prebuild_id_data(chip, group): """从原始路径获取Pre Build ID数据""" prebuild_data = {} # 检查芯片和组是否有效 if chip not in TRACK_PATHS: logger.error(f"不支持的芯片类型: {chip}") return prebuild_data if group not in TRACK_PATHS[chip]: logger.error(f"不支持的组: {group} for chip {chip}") return prebuild_data # 获取对应组的路径 group_path = TRACK_PATHS[chip][group] # 检查路径是否存在 if not os.path.exists(group_path): logger.error(f"原始路径不存在: {group_path}") return prebuild_data # 查找该路径下的所有JSON文件 try: json_files = glob.glob(os.path.join(group_path, "*.json")) if not json_files: logger.warning(f"在原始路径 {group_path} 下未找到JSON文件") return prebuild_data # 处理每个JSON文件,提取Pre Build ID for json_file in json_files: try: with open(json_file, 'r') as f: data = json.load(f) model_name = os.path.splitext(os.path.basename(json_file))[0] prebuild_id = data.get('prebuild_id') if prebuild_id: prebuild_data[model_name] = prebuild_id except Exception as e: logger.error(f"解析原始文件 {json_file} 时出错: {str(e)}") except Exception as e: logger.error(f"处理原始文件时出错: {str(e)}") return prebuild_data def parse_json_file(file_path): """解析新路径的JSON文件,提取性能数据""" try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) except Exception as e: logger.error(f"Error reading {file_path}: {e}") return None # 提取模型名称(文件名) model_name = os.path.splitext(os.path.basename(file_path))[0] # 提取时延:查找以'.om'结尾的键 latency = None for key in data: if key.endswith('.om'): latency = data[key] break # 提取带宽:从mean_ddr字段获取 bandwidth = None if 'mean_ddr' in data: bandwidth = data['mean_ddr'] else: # 尝试在嵌套结构中查找 for key, value in data.items(): if isinstance(value, dict) and 'mean_ddr' in value: bandwidth = value['mean_ddr'] break # 获取最后修改时间 file_stat = os.stat(file_path) last_modified = datetime.fromtimestamp(file_stat.st_mtime).isoformat() # 返回性能数据(Pre Build ID稍后合并) return { "model_name": model_name, "latency": latency, "bandwidth": bandwidth, "quantization_rate": None, # 示例JSON中没有量化率数据 "last_modified": last_modified } def get_performance_data(chip, group): """根据芯片类型和请求的组读取性能数据""" # 创建响应数据结构 performance_data = { "models": [], "timestamp": datetime.now().isoformat(), "chip_type": chip, "group": group, "json_path": "", "track_path": "", "file_count": 0 } # 1. 从原始路径获取Pre Build ID数据 prebuild_id_map = get_prebuild_id_data(chip, group) performance_data["track_path"] = TRACK_PATHS[chip].get(group, "") # 2. 从新路径获取性能数据 # 检查芯片和组是否有效 if chip not in JSON_PATHS: logger.error(f"不支持的芯片类型: {chip}") performance_data["error"] = f"无效芯片类型。可用选项: {', '.join(JSON_PATHS.keys())}" return performance_data if group not in JSON_PATHS[chip]: logger.error(f"不支持的组: {group} for chip {chip}") performance_data["error"] = f"无效模型组。可用选项: {', '.join(JSON_PATHS[chip].keys())}" return performance_data # 获取对应组的路径 group_path = JSON_PATHS[chip][group] performance_data["json_path"] = group_path # 检查路径是否存在 if not os.path.exists(group_path): logger.error(f"新路径不存在: {group_path}") performance_data["error"] = "新路径不存在" return performance_data # 查找该路径下的所有JSON文件 try: json_files = glob.glob(os.path.join(group_path, "*.json")) performance_data["file_count"] = len(json_files) if not json_files: logger.warning(f"在新路径 {group_path} 下未找到JSON文件") return performance_data # 处理每个JSON文件 for json_file in json_files: model_data = parse_json_file(json_file) if model_data: # 合并Pre Build ID model_name = model_data["model_name"] model_data["prebuild_id"] = prebuild_id_map.get(model_name, "") performance_data["models"].append(model_data) except Exception as e: logger.error(f"处理新文件时出错: {str(e)}") performance_data["error"] = str(e) return performance_data @app.route('/api/performance', methods=['GET']) def performance_api(): """性能数据API接口""" start_time = time.time() try: # 获取请求参数 device = request.args.get('device', 'Ascend610Lite') group = request.args.get('type', 'rl_nn') logger.info(f"API请求 - 设备: {device}, 组: {group}") performance_data = get_performance_data(device, group) # 记录处理时间 process_time = time.time() - start_time performance_data["process_time"] = round(process_time, 4) return jsonify(performance_data) except Exception as e: logger.exception("处理请求时出错") return jsonify({ "error": "服务器内部错误", "details": str(e), "process_time": round(time.time() - start_time, 4) }), 500 @app.route('/api/prebuild_ids', methods=['GET']) def prebuild_ids_api(): """专用接口:返回所有prebuild_id(保留原有功能)""" start_time = time.time() try: device = request.args.get('device', 'Ascend610Lite') group = request.args.get('type', 'rl_nn') logger.info(f"Prebuild ID请求 - 设备: {device}, 组: {group}") # 获取Pre Build ID数据 prebuild_id_map = get_prebuild_id_data(device, group) prebuild_ids = list(prebuild_id_map.values()) # 构建专用响应 response = { "status": "success", "prebuild_ids": prebuild_ids, "count": len(prebuild_ids), "timestamp": datetime.now().isoformat(), "process_time": round(time.time() - start_time, 4) } return jsonify(response) except Exception as e: logger.exception("获取prebuild_id时出错") return jsonify({ "status": "error", "error": "服务器内部错误", "details": str(e), "process_time": round(time.time() - start_time, 4) }), 500 @app.route('/health', methods=['GET']) def health_check(): """健康检查端点""" return jsonify({ "status": "ok", "timestamp": datetime.now().isoformat() }) @app.route('/debug/paths', methods=['GET']) def debug_paths(): """调试接口:返回路径配置信息""" debug_data = { "track_base_path": TRACK_BASE_PATH, "json_base_path": JSON_BASE_PATH, "track_paths": {}, "json_paths": {} } # 原始路径状态 for chip, group_dict in TRACK_PATHS.items(): debug_data["track_paths"][chip] = {} for group, path in group_dict.items(): exists = os.path.exists(path) debug_data["track_paths"][chip][group] = { "path": path, "exists": exists } if exists: debug_data["track_paths"][chip][group]["file_count"] = len( glob.glob(os.path.join(path, "*.json")) ) # 新路径状态 for chip, group_dict in JSON_PATHS.items(): debug_data["json_paths"][chip] = {} for group, path in group_dict.items(): exists = os.path.exists(path) debug_data["json_paths"][chip][group] = { "path": path, "exists": exists } if exists: debug_data["json_paths"][chip][group]["file_count"] = len( glob.glob(os.path.join(path, "*.json")) ) return jsonify(debug_data) @app.route('/') def home(): """首页路由 - 渲染前端页面""" return render_template('index.html') if __name__ == '__main__': app.run(host="127.0.0.1", port=8080, debug=True) 在这个后端脚本上修改

大家在看

recommend-type

电赛省一作品 盲盒识别 2022TI杯 10月联赛 D题

本系统以stm32作为控制核心,设计并制作了盲盒识别装置,通过光电开关可以检测盲盒的有无,并且包含语音播报模块,就是在切换任务时会有声音提示,通过电磁感应检测技术判断不同种类盲盒内硬币的种类以及摆放方式。系统通过传感器对不同的谐振频率测量出不同种类的硬币,并且系统通过扩展板lcd屏显示传感区域盲盒“有”“无”,以及工作状态,识别完成后能够显示识别完成和硬币种类和硬币组合。
recommend-type

铁磁材料的铁损耗-电机与电力拖动基础第一讲绪论

四、铁磁材料的铁损耗 带铁心的交流线圈中,除了线圈电阻上的功率损耗(铜损耗)外,由于其铁心处于反复磁化下,铁心中也将产生功率损耗,以发热的方式表现出来,称为铁磁损耗,简称铁耗。 铁耗有磁滞损耗和涡流损耗两部分。
recommend-type

HCIE DC数据中心认证培训教材

HCIE DC数据中心认证培训教材 2000页 
recommend-type

中科大版苏淳概率论答案

本资料是中科大版本 苏淳编著的概率论答案,此为本书前半部分答案,其中包含书中部分习题,系老师所布置的重点习题答案。包含初等概率论,随机变量,随机向量,数字特征与特征函数极限定理几章的内容
recommend-type

虚拟光驱DAEMON(支持2000/XP/2003)

非常好用的虚拟光驱软件,此版本完美支持2003操作系统。

最新推荐

recommend-type

西门子1200多轴伺服步进FB块程序详解及其工业自动化应用 - 工业自动化 实战版

西门子1200伺服步进FB块程序的特点和应用。该程序由两个FB组成,分别采用Sc L和梯形图编写,支持PTO脉冲和PN网口模式,适用于多种伺服和步进电机。文中提供了详细的中文注释和关键代码片段,展示了其在不同品牌设备如西门子s120、v90、雷赛步进、三菱伺服等的成功应用案例。此外,还强调了程序的兼容性和灵活性,使其能适应多轴控制和复杂控制需求。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些需要深入了解和应用西门子1200伺服步进FB块程序的人群。 使用场景及目标:①用于多轴伺服和步进电机的精确控制;②适用于PTO脉冲和PN网口模式的控制需求;③帮助工程师快速理解和调试程序,提高工作效率。 其他说明:本文不仅提供了理论讲解,还有实际操作指导,确保读者能够在实际项目中顺利应用该FB块程序。
recommend-type

【C语言编程】函数调用规则与实现:函数声明、调用方式及参数传递详解

内容概要:本文详细介绍了C语言中函数调用的相关知识。首先阐述了函数调用的一般形式,强调即使无参函数也需保留括号;当存在多个实参时,它们之间需用逗号分隔且数量与类型须匹配形参。文中特别指出不同编译器对实参求值顺序可能存在差异,如Turbo C++采用从右至左求值。其次,讲解了三种函数调用方式:作为语句执行特定操作、作为表达式返回值参与运算、作为参数传入另一函数。再者,强调了函数声明的重要性,包括库函数需通过预处理指令引入头文件,用户自定义函数若定义在调用之后则需要提前声明,明确了函数声明与定义的区别。最后提供了几个练习题,帮助读者巩固所学知识。; 适合人群:正在学习C语言编程,尤其是对函数调用机制感兴趣的初学者或有一定基础的学习者。; 使用场景及目标:①理解函数调用的基本规则,包括实参与形参的对应关系;②掌握不同编译环境下实参求值顺序的差异;③学会正确地声明和定义函数以确保程序正确运行。; 其他说明:文中还提供了几个实践题目,鼓励读者动手实现pow()、sqrt()函数及字符统计程序,以加深理解。此外,提及了fishc.com网站VIP会员可获取相关资源,支持网站运营和个人发展。
recommend-type

MATLABSimuLink环境下三相STATCOM无功补偿技术的研究与仿真

内容概要:本文探讨了在MATLAB/SimuLink环境中进行三相STATCOM(静态同步补偿器)无功补偿的技术方法及其仿真过程。首先介绍了STATCOM作为无功功率补偿装置的工作原理,即通过调节交流电压的幅值和相位来实现对无功功率的有效管理。接着详细描述了在MATLAB/SimuLink平台下构建三相STATCOM仿真模型的具体步骤,包括创建新模型、添加电源和负载、搭建主电路、加入控制模块以及完成整个电路的连接。然后阐述了如何通过对STATCOM输出电压和电流的精确调控达到无功补偿的目的,并展示了具体的仿真结果分析方法,如读取仿真数据、提取关键参数、绘制无功功率变化曲线等。最后指出,这种技术可以显著提升电力系统的稳定性与电能质量,展望了STATCOM在未来的发展潜力。 适合人群:电气工程专业学生、从事电力系统相关工作的技术人员、希望深入了解无功补偿技术的研究人员。 使用场景及目标:适用于想要掌握MATLAB/SimuLink软件操作技能的人群,特别是那些专注于电力电子领域的从业者;旨在帮助他们学会建立复杂的电力系统仿真模型,以便更好地理解STATCOM的工作机制,进而优化实际项目中的无功补偿方案。 其他说明:文中提供的实例代码可以帮助读者直观地了解如何从零开始构建一个完整的三相STATCOM仿真环境,并通过图形化的方式展示无功补偿的效果,便于进一步的学习与研究。
recommend-type

基于CNN-LSTM-Attention神经网络的高精度时间序列预测程序:风电功率与电力负荷预测应用 - 深度学习

内容概要:本文介绍了一个基于卷积神经网络(CNN)、长短期记忆网络(LSTM)和注意力机制(Attention)的时间序列预测模型。该模型主要用于电力负荷和风电功率的高精度预测。文中详细描述了模型的构建步骤,包括数据预处理、模型搭建、训练和评估。首先,通过Pandas和Matplotlib等工具进行数据处理和可视化,接着利用Keras库构建CNN-LSTM-Attention架构的神经网络,最后通过均方根误差(RMSE)、平均绝对误差(MAE)等多个评价指标对模型进行了全面评估。 适合人群:对深度学习有一定了解的研究人员和技术开发者,特别是从事能源领域数据分析的专业人士。 使用场景及目标:适用于需要对未来电力负荷或风电功率进行精准预测的应用场景,如电网调度、能源管理等。目标是提高预测准确性,从而优化资源配置,减少不必要的浪费。 其他说明:该模型不仅展示了强大的预测能力,还提供了详细的代码实现和注释,便于使用者理解和修改。此外,文中提到的所有技术和方法都可以根据具体的业务需求进行灵活调整。
recommend-type

MATLAB滑动窗口函数:高效生成机器学习样本数据的技术实现与应用

内容概要:本文介绍了一种高效的MATLAB滑动窗口函数,用于从一维原始数据中生成机器学习所需的样本数据。该函数能够快速处理大量时序数据,避免了传统的for循环方法带来的效率低下问题。文中详细解释了函数的工作原理,展示了如何利用矢量化操作提高数据处理速度,并提供了具体的使用案例,如振动数据分析和无人机飞控数据处理。此外,还提到了一些使用注意事项以及高级应用场景,如嵌套使用滑动窗口函数来提取多尺度特征。 适合人群:从事数据科学、机器学习领域的研究人员和技术人员,特别是需要处理大量时序数据的人群。 使用场景及目标:① 需要将一维原始数据转换为机器学习模型训练所需的样本数据;② 处理大规模时序数据,如振动信号、语音信号等;③ 提取不同尺度的时间序列特征,支持复杂的数据分析任务。 其他说明:该函数不仅提高了数据处理效率,还能简化代码编写,使数据科学家可以专注于更高层次的任务。同时,它也为后续的深度学习模型(如LSTM)准备好了高质量的输入数据。
recommend-type

Notes App API开发与使用指南

### API基础知识 #### 标题分析:“notes-app-api” 从标题“notes-app-api”可以推断,此API(Application Programming Interface,应用程序接口)是专为一个名为“notes-app”的应用程序设计的。这种API通常被用来允许不同的软件组件之间进行通信。在这个案例中,“notes-app”可能是一款笔记应用,该API提供了笔记数据的获取、更新、删除等操作的接口。 #### 描述分析:“API休息说明” 在提供的“API休息说明”中,我们可以看到几个重要的操作指令: 1. **指令“dev”:** `npm run dev` - 这是一个用于启动开发模式的命令。通常情况下,`npm run dev`会使用Node.js环境下的某种热重载功能,让开发者在开发过程中实时看到代码更改的效果。 - `npm`是Node.js的包管理器,用于安装项目所需的依赖、运行脚本等。 - `dev`是脚本命令的缩写,实际对应的是`package.json`文件中定义的某个开发环境下的脚本命令。 2. **指令“服务”:** `npm start` - 这是一个用于启动应用程序服务的命令。 - 同样利用Node.js的`npm`包管理器执行,其目的是部署应用程序,使其对外提供服务。 3. **指令“构建”:** `npm run build` - 这是用于构建项目的命令,通常会将源代码进行压缩、转译等操作,生成用于生产环境的代码。 - 例如,如果项目使用了TypeScript,构建过程可能包括将TypeScript代码编译成JavaScript,因为浏览器不能直接运行TypeScript代码。 #### 标签分析:“TypeScript” TypeScript是JavaScript的超集,提供了静态类型检查和ES6+的特性。使用TypeScript可以提高代码的可读性和可维护性,同时在编译阶段发现潜在的错误。 1. **TypeScript的特性:** - **静态类型检查:** 有助于在开发阶段捕捉类型错误,降低运行时错误的概率。 - **ES6+特性支持:** TypeScript支持最新的JavaScript语法和特性,可以使用装饰器、异步编程等现代JavaScript特性。 - **丰富的配置选项:** 开发者可以根据项目需求进行各种配置,如模块化系统、编译目标等。 2. **TypeScript的使用场景:** - 大型项目:在大型项目中,TypeScript有助于维护和扩展代码库。 - 多人协作:团队开发时,类型定义有助于减少沟通成本,提高代码一致性。 - 错误敏感应用:如金融、医疗等领域的应用,可以利用TypeScript的静态类型检查减少bug。 #### 文件分析:“压缩包子文件的文件名称列表: notes-app-api-develop” 这个文件列表中包含了“notes-app-api-develop”,它表明存在一个与开发相关的压缩包或存档文件。这个文件很可能包含了应用程序的源代码,通常还会包括`package.json`文件,这个文件定义了项目的依赖关系和可运行的脚本命令。在开发和部署过程中,开发者通常会根据`package.json`中定义的脚本来执行不同的任务,如`npm run dev`或`npm start`等。 ### Docker使用说明 在描述中还提到了使用Docker的命令: 1. **构建镜像:** `docker build -t notes-api .` - 这个命令用于构建一个名为`notes-api`的Docker镜像。 - `.`表示Dockerfile在当前目录。 - `-t`指定镜像的名称和标签。 2. **运行容器:** `docker run -d -it -p 3005:3005 notes-api` - 该命令用于从`notes-api`镜像启动一个容器,并在后台运行。 - `-d`表示后台运行容器。 - `-it`则是将容器的标准输入打开,并分配一个伪终端。 - `-p 3005:3005`将容器内部的3005端口映射到宿主机的3005端口,允许宿主机与容器内部的服务进行通信。 Docker作为容器化平台,它允许开发者打包应用及其依赖环境为一个轻量级、可移植的容器,这样无论在什么环境下,都能保证应用运行的一致性。这一点对于开发和部署API来说尤为重要,因为它确保了开发环境与生产环境的一致性,减少了所谓的“在我的机器上可以运行”的问题。 ### 总结 综上所述,“notes-app-api”是一个为笔记应用程序设计的API,开发者可以通过一系列npm脚本命令进行开发、服务和构建操作。此外,该API还涉及到使用Docker进行容器化部署的流程。最后,提及的TypeScript标签意味着该项目在前端开发上采用了TypeScript,以期获得更加健壮和可维护的代码库。了解这些知识点对开发、部署和维护现代Web应用至关重要。
recommend-type

【PMSM建模与测试最佳实践】:MATLAB电机仿真模型的权威指导

# 1. PMSM电机模型的基础知识 在现代工业自动化和电动车辆领域,永磁同步电机(PMSM
recommend-type

如何通过四元数避免万向节死锁?

<think>好的,用户想了解如何使用四元数来避免万向节锁问题,尤其是在3D旋转中保持姿态的稳定性。首先,我需要回忆万向节锁的基本概念和四元数如何解决这个问题。万向节锁发生在使用欧拉角时,当两个旋转轴对齐导致失去一个自由度。而四元数通过四维空间的旋转避免了这种顺序依赖。 接下来,我应该解释万向节锁的原因,比如三个轴依次旋转时,某个轴可能与其他轴对齐,导致无法正确旋转。然后对比四元数的优势,比如四元数的连续性和无奇异性。需要提到四元数的数学表示,如单位四元数和旋转插值方法(如球面线性插值),以及它们如何避免万向节锁。 还要考虑用户可能的实际应用场景,比如游戏开发或机器人学,是否需要示例代码?
recommend-type

Python实现Couchbase大规模数据复制技术

标题中提到的技术“couchbase-massive-replication”是一种针对Couchbase数据库的开源Python开发工具,专门用于高效地实现跨集群的大量存储桶和索引的复制。Couchbase是一个高性能、可扩展、容错的NoSQL文档数据库,它支持同步分布式复制(XDCR),能够实现跨地域的数据复制。 描述部分详细阐述了该技术的主要用途和优势。它解决了一个常见问题:在进行XDCR复制时,迁移大量存储桶可能会遇到需要手动检查并迁移缺失存储桶的繁琐步骤。Couchbase-massive-replication技术则允许用户在源和目标集群之间无需进行存储桶配置,简化了迁移过程。开发者可以通过简单的curl请求,向集群发送命令,从而实现大规模存储桶的自动化迁移。 此外,为了帮助用户更容易部署和使用该技术,项目提供了一个Dockerfile,允许用户通过Docker容器来运行程序。Docker是一种流行的容器化平台,可以将应用及其依赖打包到一个可移植的容器中,便于部署和扩展。用户只需执行几个Docker命令,即可快速启动一个名为“cbmigrator”的容器,版本为0.1。启动容器后,可以通过发送简单的POST请求来操作迁移任务。 项目中还提到了Docker Hub,这是一个公共的Docker镜像注册中心,用户可以在其中找到并拉取其他用户分享的镜像,其中就包括了“cbmigrator”镜像,即demir94/cbmigrator:0.1。这大大降低了部署和使用该技术的门槛。 根据标签“Python”,我们可以推断出该项目是使用Python开发的。Python是一种广泛使用的高级编程语言,以其简洁的语法和强大的库支持而闻名。该项目中Python的使用意味着用户可能需要具备一定的Python基础知识,以便对项目进行定制或故障排除。Python的动态类型系统和解释执行机制,使得开发过程中可以快速迭代和测试。 最后,从提供的压缩包子文件的文件名称列表“couchbase-massive-replication-main”来看,该项目的源代码文件夹可能遵循了通用的开源项目结构,其中“main”文件夹通常包含了项目的主要代码和入口文件。用户在获取项目后,可以在这个文件夹中找到相关的代码文件,包括配置文件、数据库模型、业务逻辑实现以及API接口等。 综合来看,这个项目涉及的技术点包括: - Couchbase数据库:一种文档数据库,广泛用于构建可扩展的应用程序。 - XDCR(Cross-Datacenter Replication):Couchbase提供的跨数据中心数据复制机制,实现数据的无缝迁移和灾难恢复。 - Python编程语言:用来开发该项目的高级编程语言,以其易读性和简洁的语法著称。 - Docker容器化技术:用于打包、分发和运行应用程序的平台,提供了一种便捷的部署方式。 - Docker Hub:一个存放和分享Docker镜像的平台,可以简化镜像的查找、下载和管理过程。 这个项目对于需要在多个Couchbase集群间迁移大量数据的开发者和运维人员来说是一个宝贵的资源,因为它大大简化了存储桶迁移的过程,并提高了操作的便利性和效率。
recommend-type

【MATLAB电机性能评估案例】:仿真环境下的深度研究

# 1. MATLAB在电机性能评估中的应用概述 电机作为现代工业中不可或缺的电力传动设备,其性能优劣直接影响整个系统的可靠性和效率。在众多的电机性能评估工具中,MATLAB凭借其强大的数值计算能力和丰富的工具箱资源,成为该领域研究和工程实践中的有力工具。本章将对MATLAB在电机性能评估中的应用进行概述,并介绍其在电机仿真、故障诊断和性能优化等方面的具体应用前景和价值。MA