怎样将订单信息保存到文件中,并且在程序启动时读取文件中的信息?

时间: 2025-09-04 15:13:22 AIGC 浏览: 3
在C语言中,可以使用文件操作函数将宅急送订单管理系统中的订单信息保存到文件中,并且在程序启动时读取文件中的订单信息。以下是在之前代码基础上进行修改,添加文件保存和读取功能的示例: ```c #include <stdio.h> #include <stdlib.h> #include <string.h> #define MAX_ORDERS 100 // 定义订单结构体 typedef struct { int order_id; char sender[50]; char receiver[50]; float weight; char status[20]; } Order; Order orders[MAX_ORDERS]; int order_count = 0; // 添加订单 void add_order() { if (order_count >= MAX_ORDERS) { printf("订单数量已达到上限,无法添加!\n"); return; } Order new_order; printf("请输入订单编号: "); scanf("%d", &new_order.order_id); printf("请输入发货人姓名: "); scanf("%s", new_order.sender); printf("请输入收货人姓名: "); scanf("%s", new_order.receiver); printf("请输入货物重量: "); scanf("%f", &new_order.weight); printf("请输入订单状态: "); scanf("%s", new_order.status); orders[order_count++] = new_order; printf("订单添加成功!\n"); } // 显示所有订单 void display_orders() { if (order_count == 0) { printf("暂无订单信息!\n"); return; } printf("订单编号\t发货人\t收货人\t货物重量\t订单状态\n"); for (int i = 0; i < order_count; i++) { printf("%d\t%s\t%s\t%.2f\t%s\n", orders[i].order_id, orders[i].sender, orders[i].receiver, orders[i].weight, orders[i].status); } } // 将订单信息保存到文件 void save_orders_to_file() { FILE *file = fopen("orders.txt", "w"); if (file == NULL) { printf("无法打开文件!\n"); return; } for (int i = 0; i < order_count; i++) { fprintf(file, "%d %s %s %.2f %s\n", orders[i].order_id, orders[i].sender, orders[i].receiver, orders[i].weight, orders[i].status); } fclose(file); printf("订单信息已保存到文件!\n"); } // 从文件中读取订单信息 void load_orders_from_file() { FILE *file = fopen("orders.txt", "r"); if (file == NULL) { printf("未找到订单文件,将使用空订单列表。\n"); return; } order_count = 0; while (order_count < MAX_ORDERS && fscanf(file, "%d %s %s %f %s", &orders[order_count].order_id, orders[order_count].sender, orders[order_count].receiver, &orders[order_count].weight, orders[order_count].status) != EOF) { order_count++; } fclose(file); printf("订单信息已从文件中加载!\n"); } int main() { load_orders_from_file(); int choice; do { printf("\n宅急送订单管理系统\n"); printf("1. 添加订单\n"); printf("2. 显示所有订单\n"); printf("3. 保存订单信息到文件\n"); printf("4. 退出\n"); printf("请输入你的选择: "); scanf("%d", &choice); switch (choice) { case 1: add_order(); break; case 2: display_orders(); break; case 3: save_orders_to_file(); break; case 4: save_orders_to_file(); printf("退出系统!\n"); break; default: printf("无效的选择,请重新输入!\n"); } } while (choice != 4); return 0; } ``` ### 代码解释 - `save_orders_to_file` 函数:使用 `fopen` 以写入模式打开文件 `orders.txt`,然后使用 `fprintf` 将每个订单的信息逐行写入文件,最后使用 `fclose` 关闭文件[^1]。 - `load_orders_from_file` 函数:使用 `fopen` 以读取模式打开文件 `orders.txt`,使用 `fscanf` 逐行读取文件中的订单信息,并将其存储到 `orders` 数组中,直到文件结束或达到最大订单数,最后使用 `fclose` 关闭文件[^1]。
阅读全文

相关推荐

【题目描述】航空客运订票的核心业务包括:查询航线、机票预定和办理退票等。每条航线所涉及的基本信息有:航班号、航空公司名称、起点及终点站名、飞机型号、飞行日期、最大乘客数量、余票量、实际票价(注意票价应根据离飞机飞行时间长短或其它原因进行动态折扣),原价等。客户基本信息包括身份证号、姓名、电话号码等。客户登录系统可进行线上订票或退票,管理员登录系统可查询某航班已订票的所有乘客信息,也可编辑航班信息。试设计一个航空客运订票管理系统,模拟航班查询、订票、退票等业务。 【具体要求】 1.建立航班信息,保存到文件,运行时能从文件读取航班信息,运行结束后能自动更新数据到文件。航班信息的读入和输出利用重载的运算符“》”和“《”进行。 2.编辑航班信息:可查询航班信息,可修改、新增、删除航班信息,也可选择根据航班号或航班名称等对航班信息进行排序。 3.订票业务:根据客户提出的要求(航班号、订票数额)查询该航班票额情况,若尚有余票,则可为客户办理订票手续,输出座位号;若已满员或余票额少于订票额,则需重新询问客户要求:提示用户重新选择其它航班,若需要可选择排队候补; 4.退票业务:根据客户提供的情况(日期、航班),为客户办理退票手续,然后查询该航班是否有人排队候补,如果有,首先询问排在第一的客户,若所退票额能满足他的要求,则为他办理订票手续,否则依次询问其他排队候补的客户。 5.进行面向对象程序设计,需用到继承、多态、重载、虚基类、虚函数、文件读写等特性。采用菜单功能,界面友好。

#include<bits/stdc++.h> using namespace std; #define MAX_USER 10000 //最大用户数量 #define MAX_SHOP 850 //最大商家数量 #define MAX_SPECIALITY 100 //各商家最大特色菜数量 #define MAX_COMMENTS 20 //各商家最大评论数量 #define M 1000 //散列表长度 #define P 997 //小于表长的最大质数 struct User{ char *account; char *password; char *phone; };//用户 typedef struct LNode{ User *users; int length; }UserList;//用户顺序表 void InitialUserList(UserList &user_list){ //初始化user_list } void LoadUser(UserList &user_list,const char *filename){ //从文件中读取用户信息 } int UserLogin(UserList &user_list){ //实现用户登录功能 //使用折半查找方式用户账号是否存在 } struct Food{ char *food_id; char *food_name; double food_price; };//菜品 struct Shop{ char *shop_type; char *shop_id; char *shop_name; char *shop_password; double avg_score; double ave_price; char *address; char *phone; Food *speciality; char **comments; };//商店 typedef struct Node{ Shop *shops; int length; }ShopList;//商店的顺序表 void InitialShopList(ShopList &shop_list){ //商家顺序表初始化 } void LoadShop(ShopList &shop_list,const char *filename){ //读取文件,将商家信息插入顺序表shopL中 } typedef struct HNode{ Shop data; //商家结点的数据域 struct HNode *next; }HNode,*HLink; HLink HashList[M]; //链地址散列表的存储表示,M个单链表 int Hash(const char *name){ //散列函数:采用除留余数法,key值取前两个汉字的国标编码 int key=0; for(int i=0;i<4;i++) //取前两个汉字的国标编码作为关键值key key=key*100+(int)name[i]+96; if(key<0) key*=-1; return key%P; //返回哈希函数值 } void InitialHashList(){ //链地址法散列表初始化 } void CreateHashList(const ShopList &shop_list){ //根据顺序表shopL创建散列表 } HNode *SearchHashLink(const char *key){ //在散列表HashList中查找商家名称等于key的元素 //若找到,则返回当前结点,否则空结点 } struct Order{ char *user; char *shop_name; char *date; bool is_finish; };//订单 typedef struct QNode{ Order data; struct QNode *next; }QNode,*QueuePtr;//队列节点 typedef struct{ QueuePtr front; QueuePtr rear; }LinkQueue;//订单队列 void InitialQueue(LinkQueue &Q){ //构造一个空队列Q } void LoadQueue(LinkQueue &Q,const char *filename){ //读文件,调用EnQueue函数将所有订单信息插入队列Q中 } void SaveOrder(LinkQueue &Q,const char *filename){ //将新的订单信息写入文件order.txt } void CreateOrderByUser(LinkQueue &Q,char *user){ //为当前用户创建新的预定订单 } 补全代码 平台会对你编写的代码进行测试: 测试输入:190010,234345,麻里麻里(融科店),12月24日晚餐; 预期输出: 预定成功!

eco.gateway.enablePcWebApiAppCode=true eco.gateway.pcWebApiAppCode=alipay.planet.xxxapp The configuration you provided appears to be related to enabling and setting up a specific application code for a gateway service, likely in a backend or microservices architecture. Here's a breakdown of what each line might mean: Configuration Breakdown: eco.gateway.enablePcWebApiAppCode=true Purpose: This setting likely enables the use of an application-specific API code for the "PC Web" gateway. Implication: When set to true, the gateway will allow or process requests associated with the pcWebApiAppCode. If set to false, this feature would be disabled. eco.gateway.pcWebApiAppCode=alipay.planet.xxxapp Purpose: This specifies the application code (alipay.planet.xxxapp) that the gateway should recognize when handling PC Web API requests. Implication: The gateway will use this code to identify, route, or authenticate requests related to the "PC Web" platform. The placeholder xxxapp suggests this is a placeholder and should be replaced with the actual application identifier. Possible Use Case: Gateway Service: This configuration seems to belong to a gateway service that handles API requests for different platforms (e.g., PC Web, Mobile). Application Code: The pcWebApiAppCode is likely used to distinguish between different applications or modules accessing the gateway. For example, an "Alipay" app might have a specific code to identify its requests. Enable/Disable: The enablePcWebApiAppCode flag allows toggling this feature, which could be useful for testing or security purposes. Common Questions: What happens if enablePcWebApiAppCode is set to false? The gateway might reject or ignore requests with the pcWebApiAppCode, depending on how the service is implemented. Is alipay.planet.xxxapp a valid application code? The xxxapp part seems like a placeholder. It should be replaced with a valid, unique identifier for the application (e.g., alipay.planet.myapp). Where is this configuration used? This is typically part of a backend configuration file (e.g., application.properties, application.yml, or environment variables). It would be read by the gateway service at runtime. What other related configurations might exist? Similar settings for other platforms (e.g., Mobile Web, iOS, Android) might exist, such as eco.gateway.enableMobileWebApiAppCode or eco.gateway.mobileWebApiAppCode. Security Considerations: Ensure that the pcWebApiAppCode is kept confidential if it is sensitive (e.g., used for authentication or authorization). Verify that enabling this feature does not inadvertently expose the gateway to unauthorized requests. If you need further clarification or help with implementing this configuration, feel free to provide more context! 第二行这个是什么 第二行配置: eco.gateway.pcWebApiAppCode=alipay.planet.xxxapp 解释: eco.gateway.pcWebApiAppCode 这是一个配置键(Key),用于定义某个特定的应用代码(Application Code)。 eco.gateway 可能是一个命名空间(Namespace),用于网关(Gateway)相关的配置。 pcWebApiAppCode 表示这个配置项是用于 "PC Web API" 的应用代码。 alipay.planet.xxxapp 这是配置的值(Value),表示一个具体的应用代码(App Code)。 在这个例子中,alipay.planet.xxxapp 可能是一个占位符(Placeholder),表示某个具体的应用标识。 实际使用时,通常需要替换为真实的应用程序标识符(例如 alipay.planet.myapplication)。 作用: 这个配置项可能是用于标识某个特定的应用程序或服务,当网关处理 "PC Web API" 请求时,会根据这个应用代码进行相应的处理(例如路由、鉴权等)。 示例场景: 假设这是一个支付网关服务,alipay.planet.xxxapp 可能是某个支付宝支付应用的应用代码。网关会根据这个代码识别请求的来源,并进行相应的处理。 如果你有更多上下文或具体问题,可以进一步说明! # ------------ PcWebApi ------------- eco.gateway.enablePcWebApiAppCode=true eco.gateway.pcWebApiAppCode=alipay.planet.${spring.application.name} 这段配置是关于一个名为 PcWebApi 的功能模块,具体说明如下: 配置项解析 1. eco.gateway.enablePcWebApiAppCode=true 含义:启用 PcWebApi 相关的应用代码(App Code)功能。 作用:当设置为 true 时,网关会开启对 PcWebApiAppCode 的处理逻辑(例如验证、路由等)。如果设置为 false,则禁用该功能。 2. eco.gateway.pcWebApiAppCode=alipay.planet.${spring.application.name} 含义: 这是一个动态配置项,${spring.application.name} 是 Spring Boot 的内置变量,表示当前应用的名称(在 application.properties 或环境变量中定义)。 配置值的最终结果是 alipay.planet.<当前应用名>。例如: 如果 spring.application.name=my-app,则 eco.gateway.pcWebApiAppCode 的值为 alipay.planet.my-app。 如果 spring.application.name=payment-service,则值为 alipay.planet.payment-service。 作用:为每个应用生成唯一的应用代码(App Code),便于网关根据代码识别请求来源。 可能的使用场景 应用标识: 网关通过 pcWebApiAppCode 来标识不同应用的请求,确保请求被正确路由或处理。 例如:不同业务模块(如支付、订单、会员)的应用会使用不同的 spring.application.name, @RequestMapping(value = "/test/CertificateListQueryReq", method = RequestMethod.POST)如果我想访问这个,我需要如何拼接url 要访问这个接口,你需要按照以下步骤拼接 URL: 1. 基础 URL 首先,你需要知道你的服务的基础 URL(通常是服务的域名或 IP 地址,例如 http://localhost:8080 或 https://api.example.com)。 2. 接口路径 根据你的代码,接口的路径是: /test/CertificateListQueryReq 3. 完整的 URL 将基础 URL 和接口路径拼接起来。例如: 如果是本地服务(端口 8080): http://localhost:8080/test/CertificateListQueryReq 如果是生产环境(假设域名是 https://api.example.com): https://api.example.com/test/CertificateListQueryReq 4. HTTP 方法 这个接口使用的是 POST 方法,所以你需要使用 POST 请求来访问它。 示例请求 使用 curl 发送请求: bash curl -X POST \ http://localhost:8080/test/CertificateListQueryReq \ -H "Content-Type: application/json" \ -d '{"key":"value"}' 使用 Postman: 选择 POST 方法。 输入 URL(例如 http://localhost:8080/test/CertificateListQueryReq)。 在 Body 部分输入 JSON 数据(如果接口需要参数)。 设置 Content-Type 为 application/json。 注意事项 如果你的服务部署在某个上下文路径(context path)下(例如 /myapp),需要在 URL 中加上。例如: http://localhost:8080/myapp/test/CertificateListQueryReq 确保接口的权限和认证配置正确,否则可能会返回 403 或 401 错误。 如果你的服务使用了 HTTPS,记得使用 https:// 而不是 http://。 如果你还有其他问题(例如如何测试、如何传递参数等),可以继续提问! quarzt如何从持久化的数据库中读取需要执行的任务吗,来个代码实例 在使用 Quartz 调度器时,可以通过持久化存储(如数据库)来管理任务的调度。以下是一个示例代码,展示如何从数据库中读取任务并启动 Quartz 调度器。 1. 配置 Quartz 数据源 首先,需要在 quartz.properties 文件中配置 Quartz 使用数据库存储: properties # quartz.properties org.quartz.scheduler.instanceName = MyScheduler org.quartz.scheduler.instanceId = AUTO org.quartz.threadPool.threadCount = 3 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties = false org.quartz.jobStore.dataSource = myDS org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.dataSource.myDS.driver = com.mysql.cj.jdbc.Driver org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/quartz org.quartz.dataSource.myDS.user = root org.quartz.dataSource.myDS.password = password org.quartz.dataSource.myDS.maxConnections = 5 2. 创建 Quartz 数据库表 Quartz 提供了一些 SQL 脚本来创建所需的表(在 quartz 数据库中运行这些脚本)。以下是 MySQL 的例子: sql -- 创建 Quartz 数据表的 SQL 脚本 CREATE TABLE QRTZ_JOB_DETAILS ( SCHED_NAME VARCHAR(120) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, JOB_CLASS_NAME VARCHAR(250) NOT NULL, IS_DURABLE VARCHAR(1) NOT NULL, IS_NONCONCURRENT VARCHAR(1) NOT NULL, IS_UPDATE_DATA VARCHAR(1) NOT NULL, REQUESTS_RECOVERY VARCHAR(1) NOT NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME, JOB_NAME, JOB_GROUP) ); CREATE TABLE QRTZ_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, NEXT_FIRE_TIME BIGINT NULL, PREV_FIRE_TIME BIGINT NULL, PRIORITY INTEGER NULL, TRIGGER_STATE VARCHAR(16) NOT NULL, TRIGGER_TYPE VARCHAR(8) NOT NULL, START_TIME BIGINT NOT NULL, END_TIME BIGINT NULL, CALENDAR_NAME VARCHAR(200) NULL, MISFIRE_INSTR SMALLINT NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME, JOB_NAME, JOB_GROUP) REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME, JOB_NAME, JOB_GROUP) ); -- 其他表可以参考 Quartz 官方文档 3. Java 代码示例 以下是一个完整的示例代码,展示如何从数据库中读取任务并启动 Quartz 调度器: 3.1 创建一个简单的任务类 java import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class HelloJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("Hello, Quartz! 时间: " + new java.util.Date()); } } 3.2 创建一个 Quartz 调度器并启动任务 java import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; public class QuartzExample { public static void main(String[] args) { try { // 1. 创建 Scheduler Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); // 2. 启动 Scheduler scheduler.start(); // 3. 定义一个 JobDetail JobDetail job = JobBuilder.newJob(HelloJob.class) .withIdentity("job1", "group1") .build(); // 4. 定义一个 Trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(5) .repeatForever()) .build(); // 5. 将 Job 和 Trigger 注册到 Scheduler scheduler.scheduleJob(job, trigger); // 6. Scheduler 会根据配置的表存储任务 System.out.println("Scheduler 已启动..."); } catch (SchedulerException e) { e.printStackTrace(); } } } 3.3 从数据库中读取任务 当 Quartz 调度器启动时,它会自动从 QRTZ_JOB_DETAILS 和 QRTZ_TRIGGERS 表中读取任务和触发器的定义,并开始执行。如果你在数据库中手动插入了任务,Quartz 会自动加载它们。 4. 数据库中的任务示例 假设你在数据库中插入了以下任务: sql INSERT INTO QRTZ_JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP, JOB_CLASS_NAME, IS_DURABLE, IS_NONCONCURRENT, IS_UPDATE_DATA, REQUESTS_RECOVERY, JOB_DATA) VALUES ('MyScheduler', 'job1', 'group1', 'com.example.HelloJob', 'Y', 'N', 'N', 'N', NULL); INSERT INTO QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, START_TIME, NEXT_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, REPEAT_INTERVAL, REPEAT_COUNT) VALUES ('MyScheduler', 'trigger1', 'group1', 'job1', 'group1', 1697040000000, 1697040005000, 'WAITING', 'SIMPLE', 5000, -1); 当 Quartz 调度器启动时,它会自动加载这些任务并开始执行。 总结 配置 Quartz 使用数据库存储(通过 quartz.properties)。 创建 Quartz 数据表。 使用 Java 代码定义任务并启动调度器。 Quartz 会自动从数据库中读取任务和触发器的定义。 希望这个示例对你有帮助!如果有任何问题,请随时提问。 我的问题是,如果我将一些任务信息保存到数据库,quartz如何从数据库中读取这些配置,然后去依次执行这些任务

我正在编写一个闲鱼客服,以上是XianyuAgent.py代码,以下是两个子代码文件:context_manager.py:import sqlite3 import os import json from datetime import datetime from loguru import logger class ChatContextManager: """ 聊天上下文管理器 负责存储和检索用户与商品之间的对话历史,使用SQLite数据库进行持久化存储。 支持按会话ID检索对话历史,以及议价次数统计。 """ def __init__(self, max_history=100, db_path="data/chat_history.db"): """ 初始化聊天上下文管理器 Args: max_history: 每个对话保留的最大消息数 db_path: SQLite数据库文件路径 """ self.max_history = max_history self.db_path = db_path self._init_db() def _init_db(self): """初始化数据库表结构""" # 确保数据库目录存在 db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建消息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, item_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, chat_id TEXT ) ''') # 检查是否需要添加chat_id字段(兼容旧数据库) cursor.execute("PRAGMA table_info(messages)") columns = [column[1] for column in cursor.fetchall()] if 'chat_id' not in columns: cursor.execute('ALTER TABLE messages ADD COLUMN chat_id TEXT') logger.info("已为messages表添加chat_id字段") # 创建索引以加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_user_item ON messages (user_id, item_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_chat_id ON messages (chat_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_timestamp ON messages (timestamp) ''') # 创建基于会话ID的议价次数表 cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_bargain_counts ( chat_id TEXT PRIMARY KEY, count INTEGER DEFAULT 0, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建商品信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS items ( item_id TEXT PRIMARY KEY, data TEXT NOT NULL, price REAL, description TEXT, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建人工标准回答表 cursor.execute(''' CREATE TABLE IF NOT EXISTS manual_answers ( id INTEGER PRIMARY KEY AUTOINCREMENT, question TEXT NOT NULL, answer TEXT NOT NULL, item_id TEXT NOT NULL, chat_id TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(question, item_id) ) ''') # 创建索引加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_manual_answers_question ON manual_answers (question) ''') # 创建索引加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_manual_answers_item_id ON manual_answers (item_id) ''') conn.commit() conn.close() logger.info(f"聊天历史数据库初始化完成: {self.db_path}") def save_item_info(self, item_id, item_data): """ 保存商品信息到数据库 Args: item_id: 商品ID item_data: 商品信息字典 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 从商品数据中提取有用信息 price = float(item_data.get('soldPrice', 0)) description = item_data.get('desc', '') # 将整个商品数据转换为JSON字符串 data_json = json.dumps(item_data, ensure_ascii=False) cursor.execute( """ INSERT INTO items (item_id, data, price, description, last_updated) VALUES (?, ?, ?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET data = ?, price = ?, description = ?, last_updated = ? """, ( item_id, data_json, price, description, datetime.now().isoformat(), data_json, price, description, datetime.now().isoformat() ) ) conn.commit() logger.debug(f"商品信息已保存: {item_id}") except Exception as e: logger.error(f"保存商品信息时出错: {e}") conn.rollback() finally: conn.close() def get_item_info(self, item_id): """ 从数据库获取商品信息 Args: item_id: 商品ID Returns: dict: 商品信息字典,如果不存在返回None """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "SELECT data FROM items WHERE item_id = ?", (item_id,) ) result = cursor.fetchone() if result: return json.loads(result[0]) return None except Exception as e: logger.error(f"获取商品信息时出错: {e}") return None finally: conn.close() def add_message_by_chat(self, chat_id, user_id, item_id, role, content): """ 基于会话ID添加新消息到对话历史 Args: chat_id: 会话ID user_id: 用户ID (用户消息存真实user_id,助手消息存卖家ID) item_id: 商品ID role: 消息角色 (user/assistant) content: 消息内容 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 插入新消息,使用chat_id作为额外标识 cursor.execute( "INSERT INTO messages (user_id, item_id, role, content, timestamp, chat_id) VALUES (?, ?, ?, ?, ?, ?)", (user_id, item_id, role, content, datetime.now().isoformat(), chat_id) ) # 检查是否需要清理旧消息(基于chat_id) cursor.execute( """ SELECT id FROM messages WHERE chat_id = ? ORDER BY timestamp DESC LIMIT ?, 1 """, (chat_id, self.max_history) ) oldest_to_keep = cursor.fetchone() if oldest_to_keep: cursor.execute( "DELETE FROM messages WHERE chat_id = ? AND id < ?", (chat_id, oldest_to_keep[0]) ) conn.commit() except Exception as e: logger.error(f"添加消息到数据库时出错: {e}") conn.rollback() finally: conn.close() def get_context_by_chat(self, chat_id): """ 基于会话ID获取对话历史 Args: chat_id: 会话ID Returns: list: 包含对话历史的列表 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( """ SELECT role, content FROM messages WHERE chat_id = ? ORDER BY timestamp ASC LIMIT ? """, (chat_id, self.max_history) ) messages = [{"role": role, "content": content} for role, content in cursor.fetchall()] # 获取议价次数并添加到上下文中 bargain_count = self.get_bargain_count_by_chat(chat_id) if bargain_count > 0: messages.append({ "role": "system", "content": f"议价次数: {bargain_count}" }) except Exception as e: logger.error(f"获取对话历史时出错: {e}") messages = [] finally: conn.close() return messages def increment_bargain_count_by_chat(self, chat_id): """ 基于会话ID增加议价次数 Args: chat_id: 会话ID """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 使用UPSERT语法直接基于chat_id增加议价次数 cursor.execute( """ INSERT INTO chat_bargain_counts (chat_id, count, last_updated) VALUES (?, 1, ?) ON CONFLICT(chat_id) DO UPDATE SET count = count + 1, last_updated = ? """, (chat_id, datetime.now().isoformat(), datetime.now().isoformat()) ) conn.commit() logger.debug(f"会话 {chat_id} 议价次数已增加") except Exception as e: logger.error(f"增加议价次数时出错: {e}") conn.rollback() finally: conn.close() def get_bargain_count_by_chat(self, chat_id): """ 基于会话ID获取议价次数 Args: chat_id: 会话ID Returns: int: 议价次数 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "SELECT count FROM chat_bargain_counts WHERE chat_id = ?", (chat_id,) ) result = cursor.fetchone() return result[0] if result else 0 except Exception as e: logger.error(f"获取议价次数时出错: {e}") return 0 finally: conn.close() def update_manual_answer(self, question, new_answer, item_id, chat_id=None): """ 更新已保存的人工标准回答 Args: question: 原始问题 new_answer: 新的回答内容 item_id: 商品ID chat_id: 会话ID(可选) Returns: bool: 更新是否成功 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "UPDATE manual_answers SET answer = ?, created_at = ? WHERE question = ? AND item_id = ?", (new_answer, datetime.now().isoformat(), question, item_id) ) conn.commit() if cursor.rowcount > 0: logger.info(f"人工标准回答已更新: {question[:30]}...") return True else: logger.warning(f"未找到匹配的人工标准回答: {question[:30]}...") return False except Exception as e: logger.error(f"更新人工标准回答时出错: {e}") conn.rollback() return False finally: conn.close() def save_manual_answer(self, question, answer, item_id, chat_id=None): """ 保存人工标准回答到数据库 Args: question: 用户问题 answer: 人工回答 item_id: 商品ID chat_id: 会话ID(可选) """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "INSERT OR IGNORE INTO manual_answers (question, answer, item_id, chat_id, created_at) VALUES (?, ?, ?, ?, ?)", (question, answer, item_id, chat_id, datetime.now().isoformat()) ) conn.commit() logger.info(f"人工标准回答已保存: {question[:30]}...") except Exception as e: logger.error(f"保存人工标准回答时出错: {e}") conn.rollback() finally: conn.close() def get_manual_answer(self, question, item_id): """ 从数据库获取人工标准回答 Args: question: 用户问题 item_id: 商品ID Returns: str: 匹配的人工回答,如果没有找到则返回None """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 使用LIKE进行模糊匹配 cursor.execute( "SELECT answer FROM manual_answers WHERE item_id = ? AND question LIKE ? ORDER BY created_at DESC LIMIT 1", (item_id, f"%{question}%") ) result = cursor.fetchone() return result[0] if result else None except Exception as e: logger.error(f"获取人工标准回答时出错: {e}") return None finally: conn.close() main.py: import base64 import json import asyncio import time import os import websockets from loguru import logger from dotenv import load_dotenv from XianyuApis import XianyuApis import sys from utils.xianyu_utils import generate_mid, generate_uuid, trans_cookies, generate_device_id, decrypt from XianyuAgent import XianyuReplyBot from context_manager import ChatContextManager class XianyuLive: def __init__(self, cookies_str): self.xianyu = XianyuApis() self.base_url = 'wss://wss-goofish.dingtalk.com/' self.cookies_str = cookies_str self.cookies = trans_cookies(cookies_str) self.xianyu.session.cookies.update(self.cookies) # 直接使用 session.cookies.update self.myid = self.cookies['unb'] self.device_id = generate_device_id(self.myid) self.context_manager = ChatContextManager() # 心跳相关配置 self.heartbeat_interval = int(os.getenv("HEARTBEAT_INTERVAL", "15")) # 心跳间隔,默认15秒 self.heartbeat_timeout = int(os.getenv("HEARTBEAT_TIMEOUT", "5")) # 心跳超时,默认5秒 self.last_heartbeat_time = 0 self.last_heartbeat_response = 0 self.heartbeat_task = None self.ws = None # Token刷新相关配置 self.token_refresh_interval = int(os.getenv("TOKEN_REFRESH_INTERVAL", "3600")) # Token刷新间隔,默认1小时 self.token_retry_interval = int(os.getenv("TOKEN_RETRY_INTERVAL", "300")) # Token重试间隔,默认5分钟 self.last_token_refresh_time = 0 self.current_token = None self.token_refresh_task = None self.connection_restart_flag = False # 连接重启标志 # 人工接管相关配置 self.manual_mode_conversations = set() # 存储处于人工接管模式的会话ID self.manual_mode_timeout = int(os.getenv("MANUAL_MODE_TIMEOUT", "3600")) # 人工接管超时时间,默认1小时 self.manual_mode_timestamps = {} # 记录进入人工模式的时间 # 消息过期时间配置 self.message_expire_time = int(os.getenv("MESSAGE_EXPIRE_TIME", "300000")) # 消息过期时间,默认5分钟 # 人工接管关键词,从环境变量读取 self.toggle_keywords = os.getenv("TOGGLE_KEYWORDS", "。") async def refresh_token(self): """刷新token""" try: logger.info("开始刷新token...") # 获取新token(如果Cookie失效,get_token会直接退出程序) token_result = self.xianyu.get_token(self.device_id) if 'data' in token_result and 'accessToken' in token_result['data']: new_token = token_result['data']['accessToken'] self.current_token = new_token self.last_token_refresh_time = time.time() logger.info("Token刷新成功") return new_token else: logger.error(f"Token刷新失败: {token_result}") return None except Exception as e: logger.error(f"Token刷新异常: {str(e)}") return None async def token_refresh_loop(self): """Token刷新循环""" while True: try: current_time = time.time() # 检查是否需要刷新token if current_time - self.last_token_refresh_time >= self.token_refresh_interval: logger.info("Token即将过期,准备刷新...") new_token = await self.refresh_token() if new_token: logger.info("Token刷新成功,准备重新建立连接...") # 设置连接重启标志 self.connection_restart_flag = True # 关闭当前WebSocket连接,触发重连 if self.ws: await self.ws.close() break else: logger.error("Token刷新失败,将在{}分钟后重试".format(self.token_retry_interval // 60)) await asyncio.sleep(self.token_retry_interval) # 使用配置的重试间隔 continue # 每分钟检查一次 await asyncio.sleep(60) except Exception as e: logger.error(f"Token刷新循环出错: {e}") await asyncio.sleep(60) async def send_msg(self, ws, cid, toid, text): text = { "contentType": 4, "html": { "html": text } } text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8') msg = { "lwp": "/r/MessageSend/sendByReceiverScope", "headers": { "mid": generate_mid() }, "body": [ { "uuid": generate_uuid(), "cid": f"{cid}@goofish", "conversationType": 1, "content": { "contentType": 101, "custom": { "type": 1, "data": text_base64 } }, "redPointPolicy": 0, "extension": { "extJson": "{}" }, "ctx": { "appVersion": "1.0", "platform": "web" }, "mtags": {}, "msgReadStatusSetting": 1 }, { "actualReceivers": [ f"{toid}@goofish", f"{self.myid}@goofish" ] } ] } await ws.send(json.dumps(msg)) async def init(self, ws): # 如果没有token或者token过期,获取新token if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: logger.info("获取初始token...") await self.refresh_token() if not self.current_token: logger.error("无法获取有效token,初始化失败") raise Exception("Token获取失败") msg = { "lwp": "/reg", "headers": { "cache-header": "app-key token ua wv", "app-key": "444e9908a51d1cb236a27862abc769c9", "token": self.current_token, "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5", "dt": "j", "wv": "im:3,au:3,sy:6", "sync": "0,0;0;0;", "did": self.device_id, "mid": generate_mid() } } await ws.send(json.dumps(msg)) # 等待一段时间,确保连接注册完成 await asyncio.sleep(1) msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [ {"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0, "pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]} await ws.send(json.dumps(msg)) logger.info('连接注册完成') def is_chat_message(self, message): """判断是否为用户聊天消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], dict) # 确保是字典类型 and "10" in message["1"] and isinstance(message["1"]["10"], dict) # 确保是字典类型 and "reminderContent" in message["1"]["10"] ) except Exception: return False def is_sync_package(self, message_data): """判断是否为同步包消息""" try: return ( isinstance(message_data, dict) and "body" in message_data and "syncPushPackage" in message_data["body"] and "data" in message_data["body"]["syncPushPackage"] and len(message_data["body"]["syncPushPackage"]["data"]) > 0 ) except Exception: return False def is_typing_status(self, message): """判断是否为用户正在输入状态消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], list) and len(message["1"]) > 0 and isinstance(message["1"][0], dict) and "1" in message["1"][0] and isinstance(message["1"][0]["1"], str) and "@goofish" in message["1"][0]["1"] ) except Exception: return False def is_system_message(self, message): """判断是否为系统消息""" try: return ( isinstance(message, dict) and "3" in message and isinstance(message["3"], dict) and "needPush" in message["3"] and message["3"]["needPush"] == "false" ) except Exception: return False def check_toggle_keywords(self, message): """检查消息是否包含切换关键词""" message_stripped = message.strip() return message_stripped in self.toggle_keywords def is_manual_mode(self, chat_id): """检查特定会话是否处于人工接管模式""" if chat_id not in self.manual_mode_conversations: return False # 检查是否超时 current_time = time.time() if chat_id in self.manual_mode_timestamps: if current_time - self.manual_mode_timestamps[chat_id] > self.manual_mode_timeout: # 超时,自动退出人工模式 self.exit_manual_mode(chat_id) return False return True def enter_manual_mode(self, chat_id): """进入人工接管模式""" self.manual_mode_conversations.add(chat_id) self.manual_mode_timestamps[chat_id] = time.time() def exit_manual_mode(self, chat_id): """退出人工接管模式""" self.manual_mode_conversations.discard(chat_id) if chat_id in self.manual_mode_timestamps: del self.manual_mode_timestamps[chat_id] def toggle_manual_mode(self, chat_id): """切换人工接管模式""" if self.is_manual_mode(chat_id): self.exit_manual_mode(chat_id) return "auto" else: self.enter_manual_mode(chat_id) return "manual" async def handle_message(self, message_data, websocket): """处理所有类型的消息""" try: try: message = message_data ack = { "code": 200, "headers": { "mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(), "sid": message["headers"]["sid"] if "sid" in message["headers"] else '', } } if 'app-key' in message["headers"]: ack["headers"]["app-key"] = message["headers"]["app-key"] if 'ua' in message["headers"]: ack["headers"]["ua"] = message["headers"]["ua"] if 'dt' in message["headers"]: ack["headers"]["dt"] = message["headers"]["dt"] await websocket.send(json.dumps(ack)) except Exception as e: pass # 如果不是同步包消息,直接返回 if not self.is_sync_package(message_data): return # 获取并解密数据 sync_data = message_data["body"]["syncPushPackage"]["data"][0] # 检查是否有必要的字段 if "data" not in sync_data: logger.debug("同步包中无data字段") return # 解密数据 try: data = sync_data["data"] try: data = base64.b64decode(data).decode("utf-8") data = json.loads(data) # logger.info(f"无需解密 message: {data}") return except Exception as e: # logger.info(f'加密数据: {data}') decrypted_data = decrypt(data) message = json.loads(decrypted_data) except Exception as e: logger.error(f"消息解密失败: {e}") return try: # 判断是否为订单消息,需要自行编写付款后的逻辑 if message['3']['redReminder'] == '等待买家付款': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'等待买家 {user_url} 付款') return elif message['3']['redReminder'] == '交易关闭': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'买家 {user_url} 交易关闭') return elif message['3']['redReminder'] == '等待卖家发货': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'交易成功 {user_url} 等待卖家发货') return except: pass # 判断消息类型 if self.is_typing_status(message): logger.debug("用户正在输入") return elif not self.is_chat_message(message): logger.debug("其他非聊天消息") logger.debug(f"原始消息: {message}") return # 处理聊天消息 create_time = int(message["1"]["5"]) send_user_name = message["1"]["10"]["reminderTitle"] send_user_id = message["1"]["10"]["senderUserId"] send_message = message["1"]["10"]["reminderContent"] # 时效性验证(过滤5分钟前消息) if (time.time() * 1000 - create_time) > self.message_expire_time: logger.debug("过期消息丢弃") return # 获取商品ID和会话ID url_info = message["1"]["10"]["reminderUrl"] item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None chat_id = message["1"]["2"].split('@')[0] if not item_id: logger.warning("无法获取商品ID") return # 检查是否为卖家(自己)发送的控制命令 if send_user_id == self.myid: logger.debug("检测到卖家消息,检查是否为控制命令") # 检查切换命令 if self.check_toggle_keywords(send_message): mode = self.toggle_manual_mode(chat_id) if mode == "manual": logger.info(f"🔴 已接管会话 {chat_id} (商品: {item_id})") else: logger.info(f"🟢 已恢复会话 {chat_id} 的自动回复 (商品: {item_id})") return # 记录卖家人工回复 self.context_manager.add_message_by_chat(chat_id, self.myid, item_id, "assistant", send_message) logger.info(f"卖家人工回复 (会话: {chat_id}, 商品: {item_id}): {send_message}") # 获取用户问题并保存人工标准回答 context = self.context_manager.get_context_by_chat(chat_id) user_question = None for msg in reversed(context): if msg['role'] == 'user': user_question = msg['content'] break if user_question: self.context_manager.save_manual_answer(user_question, send_message, item_id) logger.info(f"已保存人工标准回答 (商品: {item_id}): 问题: {user_question}, 回答: {send_message}") return logger.info(f"用户: {send_user_name} (ID: {send_user_id}), 商品: {item_id}, 会话: {chat_id}, 消息: {send_message}") # 添加用户消息到上下文 self.context_manager.add_message_by_chat(chat_id, send_user_id, item_id, "user", send_message) # 如果当前会话处于人工接管模式,不进行自动回复 if self.is_manual_mode(chat_id): logger.info(f"🔴 会话 {chat_id} 处于人工接管模式,跳过自动回复") return if self.is_system_message(message): logger.debug("系统消息,跳过处理") return # 从数据库中获取商品信息,如果不存在则从API获取并保存 item_info = self.context_manager.get_item_info(item_id) if not item_info: logger.info(f"从API获取商品信息: {item_id}") api_result = self.xianyu.get_item_info(item_id) if 'data' in api_result and 'itemDO' in api_result['data']: item_info = api_result['data']['itemDO'] # 保存商品信息到数据库 self.context_manager.save_item_info(item_id, item_info) else: logger.warning(f"获取商品信息失败: {api_result}") return else: logger.info(f"从数据库获取商品信息: {item_id}") item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}" # 获取完整的对话上下文 context = self.context_manager.get_context_by_chat(chat_id) # 生成回复 bot_reply = bot.generate_reply( send_message, item_description, context=context ) # 检查是否为价格意图,如果是则增加议价次数 if bot.last_intent == "price": self.context_manager.increment_bargain_count_by_chat(chat_id) bargain_count = self.context_manager.get_bargain_count_by_chat(chat_id) logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}") # 添加机器人回复到上下文 self.context_manager.add_message_by_chat(chat_id, self.myid, item_id, "assistant", bot_reply) logger.info(f"机器人回复: {bot_reply}") await self.send_msg(websocket, chat_id, send_user_id, bot_reply) except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") logger.debug(f"原始消息: {message_data}") async def send_heartbeat(self, ws): """发送心跳包并等待响应""" try: heartbeat_mid = generate_mid() heartbeat_msg = { "lwp": "/!", "headers": { "mid": heartbeat_mid } } await ws.send(json.dumps(heartbeat_msg)) self.last_heartbeat_time = time.time() logger.debug("心跳包已发送") return heartbeat_mid except Exception as e: logger.error(f"发送心跳包失败: {e}") raise async def heartbeat_loop(self, ws): """心跳维护循环""" while True: try: current_time = time.time() # 检查是否需要发送心跳 if current_time - self.last_heartbeat_time >= self.heartbeat_interval: await self.send_heartbeat(ws) # 检查上次心跳响应时间,如果超时则认为连接已断开 if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout): logger.warning("心跳响应超时,可能连接已断开") break await asyncio.sleep(1) except Exception as e: logger.error(f"心跳循环出错: {e}") break async def handle_heartbeat_response(self, message_data): """处理心跳响应""" try: if ( isinstance(message_data, dict) and "headers" in message_data and "mid" in message_data["headers"] and "code" in message_data and message_data["code"] == 200 ): self.last_heartbeat_response = time.time() logger.debug("收到心跳响应") return True except Exception as e: logger.error(f"处理心跳响应出错: {e}") return False async def main(self): while True: try: # 重置连接重启标志 self.connection_restart_flag = False headers = { "Cookie": self.cookies_str, "Host": "wss-goofish.dingtalk.com", "Connection": "Upgrade", "Pragma": "no-cache", "Cache-Control": "no-cache", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", "Origin": "https://www.goofish.com", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", } async with websockets.connect(self.base_url, extra_headers=headers) as websocket: self.ws = websocket await self.init(websocket) # 初始化心跳时间 self.last_heartbeat_time = time.time() self.last_heartbeat_response = time.time() # 启动心跳任务 self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket)) # 启动token刷新任务 self.token_refresh_task = asyncio.create_task(self.token_refresh_loop()) async for message in websocket: try: # 检查是否需要重启连接 if self.connection_restart_flag: logger.info("检测到连接重启标志,准备重新建立连接...") break message_data = json.loads(message) # 处理心跳响应 if await self.handle_heartbeat_response(message_data): continue # 发送通用ACK响应 if "headers" in message_data and "mid" in message_data["headers"]: ack = { "code": 200, "headers": { "mid": message_data["headers"]["mid"], "sid": message_data["headers"].get("sid", "") } } # 复制其他可能的header字段 for key in ["app-key", "ua", "dt"]: if key in message_data["headers"]: ack["headers"][key] = message_data["headers"][key] await websocket.send(json.dumps(ack)) # 处理其他消息 await self.handle_message(message_data, websocket) except json.JSONDecodeError: logger.error("消息解析失败") except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") logger.debug(f"原始消息: {message}") except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket连接已关闭") except Exception as e: logger.error(f"连接发生错误: {e}") finally: # 清理任务 if self.heartbeat_task: self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass if self.token_refresh_task: self.token_refresh_task.cancel() try: await self.token_refresh_task except asyncio.CancelledError: pass # 如果是主动重启,立即重连;否则等待5秒 if self.connection_restart_flag: logger.info("主动重启连接,立即重连...") else: logger.info("等待5秒后重连...") await asyncio.sleep(5) if __name__ == '__main__': # 加载环境变量 load_dotenv() # 配置日志级别 log_level = os.getenv("LOG_LEVEL", "DEBUG").upper() logger.remove() # 移除默认handler logger.add( sys.stderr, level=log_level, format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>" ) logger.info(f"日志级别设置为: {log_level}") cookies_str = os.getenv("COOKIES_STR") bot = XianyuReplyBot() xianyuLive = XianyuLive(cookies_str) # 常驻进程 asyncio.run(xianyuLive.main()) 现在在我加了techagent知识库检索(knowledge_base中product_summary.excel)的功能之后,客服机器人的消息发布出去了,只是可以把消息回复记录进日志

再加一个功能:1.创建一个按钮显示“完成首拼”,再新创建的excel文件里,会自动生成首拼,用python代码实现import sys import pandas as pd from 连接池到实验 import * from PyQt5.QtWidgets import ( QApplication, QWidget, QLabel, QLineEdit, QPushButton, QFileDialog, QMessageBox, QVBoxLayout, QComboBox, QHBoxLayout ) import numpy as np import os import tempfile import win32com.client as win32 # 用于操作Excel # 主窗口类 class ExcelImporter(QWidget): def __init__(self): super().__init__() self.setWindowTitle("Excel 导入 MySQL 工具") self.resize(600, 400) # 增加窗口高度以容纳新控件 self.last_created_excel = None # 记录最后创建的Excel文件路径 self.templates = { "客户信息": [ "姓名","首拼"], "产品信息": [ "产品名称", "首拼"], "订单记录": ["客户名","首拼"] } self.init_ui() def init_ui(self): layout = QVBoxLayout() # 模板选择区域 layout.addWidget(QLabel("选择模板:")) template_layout = QHBoxLayout() self.template_combo = QComboBox() self.template_combo.addItems(self.templates.keys()) template_layout.addWidget(self.template_combo) self.create_button = QPushButton("创建Excel") self.create_button.clicked.connect(self.create_excel_template) template_layout.addWidget(self.create_button) self.complete_button = QPushButton("我已编辑完成") self.complete_button.clicked.connect(self.on_complete_clicked) self.complete_button.setEnabled(False) # 初始禁用 template_layout.addWidget(self.complete_button) layout.addLayout(template_layout) # 文件路径输入框 layout.addWidget(QLabel("选择 Excel 文件:")) self.file_path_edit = QLineEdit() self.browse_button = QPushButton("浏览") self.browse_button.clicked.connect(self.select_file) file_layout = QHBoxLayout() file_layout.addWidget(self.file_path_edit) file_layout.addWidget(self.browse_button) layout.addLayout(file_layout) # 表名输入框 layout.addWidget(QLabel("目标数据库表名:")) self.table_name_edit = QLineEdit() layout.addWidget(self.table_name_edit) # 导入按钮 self.import_button = QPushButton("导入数据库") self.import_button.clicked.connect(self.import_to_database) layout.addWidget(self.import_button) self.setLayout(layout) def create_excel_template(self): """创建Excel模板并打开""" try: # 获取选中的模板名称 template_name = self.template_combo.currentText() # 创建临时Excel文件 with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp: file_path = tmp.name self.last_created_excel = file_path # 创建DataFrame并写入列标题 headers = self.templates[template_name] df = pd.DataFrame(columns=headers) df.to_excel(self.last_created_excel, index=False) # 使用Excel 2013打开文件 excel = win32.gencache.EnsureDispatch('Excel.Application') excel.Visible = True workbook = excel.Workbooks.Open(os.path.abspath(self.last_created_excel)) # 启用"我已编辑完成"按钮 self.complete_button.setEnabled(True) QMessageBox.information(self, "成功", f"Excel模板已创建,请编辑后点击'我已编辑完成'") except Exception as e: QMessageBox.critical(self, "错误", f"创建Excel失败: {str(e)}") def on_complete_clicked(self): """处理编辑完成按钮点击事件""" if self.last_created_excel and os.path.exists(self.last_created_excel): self.file_path_edit.setText(self.last_created_excel) self.table_name_edit.setText(self.template_combo.currentText()) self.complete_button.setEnabled(False) # 重置按钮状态 else: QMessageBox.warning(self, "警告", "未找到创建的Excel文件") def select_file(self): file_path, _ = QFileDialog.getOpenFileName(self, "选择 Excel 文件", "", "Excel 文件 (*.xlsx *.xls)") if file_path: self.file_path_edit.setText(file_path) def import_to_database(self): file_path = self.file_path_edit.text() table_name = self.table_name_edit.text() if not file_path: QMessageBox.critical(self, "错误", "请选择一个 Excel 文件") return if not table_name: QMessageBox.critical(self, "错误", "请输入目标数据库表名") return try: with POOL.connection() as conn: with conn.cursor() as cursor: df = pd.read_excel(file_path) if df.empty: QMessageBox.critical(self, "错误", "Excel 文件为空") return # 替换所有 NaN、inf、-inf 为 None df.replace([np.nan, np.inf, -np.inf], None, inplace=True) # 构建插入语句 if not table_name.isidentifier(): raise ValueError("表名包含非法字符") columns = ', '.join(df.columns.tolist()) placeholders = ', '.join(['%s'] * len(df.columns)) insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 转换为元组列表 data = [tuple(row) for _, row in df.iterrows()] # 批量插入 cursor.executemany(insert_sql, data) conn.commit() QMessageBox.information(self, "成功", f"{cursor.rowcount} 条记录已成功导入到表 '{table_name}'") except Exception as e: try: conn.rollback() except: pass QMessageBox.critical(self, "错误", f"导入失败: {str(e)}") # 启动应用 if __name__ == "__main__": app = QApplication(sys.argv) window = ExcelImporter() window.show() sys.exit(app.exec_())

大家在看

recommend-type

tibco rv 发送与接收Demo

Tibco rv sender && Receiver .net3.5 .net4.0
recommend-type

串口调试助手 XCOM V2.6

如果网速可以,建议搭建去下载微软商店里的串口调试助手
recommend-type

瑞星卡卡kaka小狮子(不含杀软) For Mac,情怀小程序,有动画有声,亲测可用

MAC专用,解压放到「应用程序」里面即可,只有小狮子,不含杀毒软件;有动画有声音;体积小,占用内存小,适合对瑞星狮子有情怀的朋友下载玩,做个存档都是不错的。
recommend-type

convex optimiation教材及课后答案

stanford的stephen boyd教授的最经典教材凸优化及课后解答,学习机器学习同学的至宝。
recommend-type

Frequency-comb-DPLL:数字锁相环软件,用于使用Red Pitaya锁定频率梳

数字锁相环,用于使用红色火龙果锁定频率梳 固件/软件允许使用此硬件来锁相频率梳。 更一般而言,它与硬件一起提供了一个数字控制盒,该数字控制盒可以支持双通道锁相环,包括输入rf信号的前端IQ检测。 因此,虽然此数字控制盒可用于锁相其他系统,但下面的讨论假定用户正在操作频率梳。 入门 从“发布部分”( )下载所需的文件: 可以访问Python GUI的完整源代码存储库; b。 红火龙果的SD卡映像(red_pitaya_dpll_2017-05-31.zip) 阅读并遵循“ RedPitaya DPLL.pdf的说明和操作手册”文件。 软件版本 所需的Python发行版是WinPython-64bit-3.7.2( )。 FPGA Vivado项目在Vivado 2015.4中进行了编译,但是仅使用该软件就不需要安装Vivado。 附加信息 可以从NIST数字控制箱的说明手册中获得更多信

最新推荐

recommend-type

【微信小程序源码】图片预览带后端.zip

资源说明: 1:本资料仅用作交流学习参考,请切勿用于商业用途。 2:一套精品实用微信小程序源码资源,无论是入门练手还是项目复用都超实用,省去重复开发时间,让开发少走弯路! 更多精品资源请访问 https://blog.csdn.net/ashyyyy/article/details/146464041
recommend-type

Docker化部署TS3AudioBot教程与实践

### 标题知识点 #### TS3AudioBot_docker - **Dockerfile的用途与组成**:Dockerfile是一个文本文件,包含了所有构建Docker镜像的命令。开发者可以通过编辑Dockerfile来指定Docker镜像创建时所需的所有指令,包括基础镜像、运行时指令、环境变量、软件安装、文件复制等。TS3AudioBot_docker表明这个Dockerfile与TS3AudioBot项目相关,TS3AudioBot可能是一个用于TeamSpeak 3服务器的音频机器人,用于播放音频或与服务器上的用户进行交互。 - **Docker构建过程**:在描述中,有两种方式来获取TS3AudioBot的Docker镜像。一种是从Dockerhub上直接运行预构建的镜像,另一种是自行构建Docker镜像。自建过程会使用到docker build命令,而从Dockerhub运行则会用到docker run命令。 ### 描述知识点 #### Docker命令的使用 - **docker run**:这个命令用于运行一个Docker容器。其参数说明如下: - `--name tsbot`:为运行的容器指定一个名称,这里命名为tsbot。 - `--restart=always`:设置容器重启策略,这里是总是重启,确保容器在失败后自动重启。 - `-it`:这是一对参数,-i 表示交互式操作,-t 分配一个伪终端。 - `-d`:表示后台运行容器。 - `-v /home/tsBot/data:/data`:将宿主机的/home/tsBot/data目录挂载到容器内的/data目录上,以便持久化存储数据。 - `rofl256/tsaudiobot` 或 `tsaudiobot`:指定Docker镜像名称。前者可能是从DockerHub上获取的带有用户名命名空间的镜像,后者是本地构建或已重命名的镜像。 #### Docker构建流程 - **构建镜像**:使用docker build命令可以将Dockerfile中的指令转化为一个Docker镜像。`docker build . -t tsaudiobot`表示从当前目录中读取Dockerfile,并创建一个名为tsaudiobot的镜像。构建过程中,Docker会按顺序执行Dockerfile中的指令,比如FROM、RUN、COPY等,最终形成一个包含所有依赖和配置的应用镜像。 ### 标签知识点 #### Dockerfile - **Dockerfile的概念**:Dockerfile是一个包含创建Docker镜像所有命令的文本文件。它被Docker程序读取,用于自动构建Docker镜像。Dockerfile中的指令通常包括安装软件、设置环境变量、复制文件等。 - **Dockerfile中的命令**:一些常用的Dockerfile命令包括: - FROM:指定基础镜像。 - RUN:执行命令。 - COPY:将文件或目录复制到镜像中。 - ADD:类似于COPY,但是 ADD 支持从URL下载文件以及解压 tar 文件。 - ENV:设置环境变量。 - EXPOSE:声明端口。 - VOLUME:创建挂载点。 - CMD:容器启动时要运行的命令。 - ENTRYPOINT:配置容器启动时的执行命令。 ### 压缩包子文件的文件名称列表知识点 #### 文件命名 - **TS3AudioBot_docker-main**:此文件名表明了这是一个主要的代码库或Dockerfile的存放位置。在开发中,通常main分支代表当前的主版本或正在积极开发的分支。因此TS3AudioBot_docker-main可能表示这是在Dev分支上开发的Dockerfile的主要代码版本。主分支一般比较稳定,并作为新的特性开发的基础。 ### 综合知识点 - **Docker在DevOps中的角色**:Docker作为一种轻量级的容器化技术,在DevOps领域扮演重要角色。它可以快速部署、一致的运行环境、便于测试和迁移应用。通过Dockerfile的编写和docker build命令,开发者可以构建可移植的容器镜像,通过docker run命令则可以快速启动容器实例。 - **TS3AudioBot与TeamSpeak**:TS3AudioBot很可能是一个针对TeamSpeak 3服务器的自动化解决方案。TeamSpeak是一个语音通讯软件,广泛用于线上游戏团队进行沟通。一个音频机器人可以提供自动化的消息通知、音频流控制等功能,提高游戏社区的交流效率。 - **Docker镜像的版本管理**:镜像的命名通常包括用户名、项目名和标签。如rofl256/tsaudiobot中rofl256可能是一个用户名,tsaudiobot可能是项目名称,这样的命名规则有利于镜像的共享和管理。 在介绍以上知识点后,我们理解了标题、描述、标签及文件名称列表中所含的知识点。这些知识点涵盖了Dockerfile的构建、Docker镜像的使用、TS3AudioBot功能以及版本控制等多个方面,为IT专业人员在使用和开发Docker相关项目时提供了必要的信息。
recommend-type

零售销售数据的探索性分析与DeepAR模型预测

### 零售销售数据的探索性分析与DeepAR模型预测 #### 1. 探索性数据分析 在拥有45家商店的情况下,我们选择了第20号商店,来分析其不同部门在三年间的销售表现。借助DeepAR算法,我们可以了解不同部门商品的销售情况。 在SageMaker中,通过生命周期配置(Lifecycle Configurations),我们可以在笔记本实例启动前自定义安装Python包,避免在执行笔记本前手动跟踪所需的包。为了探索零售销售数据,我们需要安装最新版本(0.9.0)的seaborn库。具体操作步骤如下: 1. 在SageMaker的Notebook下,点击Lifecycle Config
recommend-type

llm agent平台

<think>好的,我现在需要帮用户寻找与LLM代理平台解决方案相关的信息。首先,我得明确用户的需求。用户提到了“LLM agent platform solutions”,也就是大型语言模型代理平台的解决方案。这可能涉及到如何构建、部署或优化基于LLM的代理系统。接下来,我应该回忆之前用户提供的引用内容,看看有没有相关的信息可以利用。 根据引用[1],提到构建LLM应用程序的步骤分解,可能涉及到代理平台的设计。引用[2]讨论了评估LLM的挑战,包括可重复性和开源模型的解决方案,这可能影响代理平台的稳定性和选择。引用[3]则提到大模型相关的岗位和面试题,可能涉及实际应用中的技术问题。 接下
recommend-type

Docker实现OAuth2代理:安全的HTTPS解决方案

### 知识点详细说明: #### Dockerfile基础 Dockerfile是一种文本文件,它包含了用户创建Docker镜像所需的命令和参数。Docker通过读取Dockerfile中的指令自动构建镜像。Dockerfile通常包含了如下载基础镜像、安装软件包、执行脚本等指令。 #### Dockerfile中的常用指令 1. **FROM**: 指定基础镜像,所有的Dockerfile都必须以FROM开始。 2. **RUN**: 在构建过程中执行命令,如安装软件。 3. **CMD**: 设置容器启动时运行的命令,可以被docker run命令后面的参数覆盖。 4. **EXPOSE**: 告诉Docker容器在运行时监听指定的网络端口。 5. **ENV**: 设置环境变量。 6. **ADD**: 将本地文件复制到容器中,如果是tar归档文件会自动解压。 7. **ENTRYPOINT**: 设置容器启动时的默认命令,不会被docker run命令覆盖。 8. **VOLUME**: 创建一个挂载点以挂载外部存储,如磁盘或网络文件系统。 #### OAuth 2.0 Proxy OAuth 2.0 Proxy 是一个轻量级的认证代理,用于在应用程序前提供OAuth认证功能。它主要通过HTTP重定向和回调机制,实现对下游服务的安全访问控制,支持多种身份提供商(IdP),如Google, GitHub等。 #### HTTPS和SSL/TLS HTTPS(HTTP Secure)是HTTP的安全版本,它通过SSL/TLS协议加密客户端和服务器之间的通信。使用HTTPS可以保护数据的机密性和完整性,防止数据在传输过程中被窃取或篡改。SSL(Secure Sockets Layer)和TLS(Transport Layer Security)是用来在互联网上进行通信时加密数据的安全协议。 #### Docker容器与HTTPS 为了在使用Docker容器时启用HTTPS,需要在容器内配置SSL/TLS证书,并确保使用443端口。这通常涉及到配置Nginx或Apache等Web服务器,并将其作为反向代理运行在Docker容器内。 #### 临时分叉(Fork) 在开源领域,“分叉”指的是一种特殊的复制项目的行为,通常是为了对原项目进行修改或增强功能。分叉的项目可以独立于原项目发展,并可选择是否合并回原项目。在本文的语境下,“临时分叉”可能指的是为了实现特定功能(如HTTPS支持)而在现有Docker-oauth2-proxy项目基础上创建的分支版本。 #### 实现步骤 要实现HTTPS支持的docker-oauth2-proxy,可能需要进行以下步骤: 1. **准备SSL/TLS证书**:可以使用Let's Encrypt免费获取证书或自行生成。 2. **配置Nginx/Apache服务器**:在Dockerfile中添加配置,以使用SSL证书和代理设置。 3. **修改OAuth2 Proxy设置**:调整OAuth2 Proxy配置以使用HTTPS连接。 4. **分叉Docker-oauth2-proxy项目**:创建项目的分支副本,以便进行修改。 5. **编辑Dockerfile**:在分叉的项目中编写或修改Dockerfile,包括下载基础镜像、设置环境变量、添加SSL证书、配置Nginx/Apache和OAuth2 Proxy等步骤。 6. **构建和测试新镜像**:使用Docker构建镜像,并在安全环境中进行测试,确保HTTPS配置正确,并且OAuth2 Proxy功能正常工作。 7. **部署到生产环境**:在确认无误后,将配置好的镜像部署到生产环境中。 #### 压缩包子文件的文件名称列表 - **docker-oauth2-proxy-master**: 这可能是指在GitHub等代码托管平台上,docker-oauth2-proxy项目的主分支或主仓库。名称列表中的“master”暗示了该文件夹包含的是主分支的代码。 总结来说,要实现一个支持HTTPS的docker-oauth2-proxy,开发者需要进行一系列的配置和编码工作,包括使用Dockerfile来构建自定义的Docker镜像,配置SSL/TLS证书,分叉并修改现有的开源项目代码。通过这些步骤,可以确保OAuth2 Proxy能够安全地处理HTTPS请求,并为下游服务提供安全认证功能。
recommend-type

利用AmazonSageMaker进行图像分类:从理论到实践

# 利用 Amazon SageMaker 进行图像分类:从理论到实践 ## 1. 主题建模与图像分类概述 在数据科学领域,从大量非结构化数据中提取信息和主题至关重要。像 SageMaker 的神经主题模型(NTM)这类主题建模技术,提供了线性和非线性学习方法,能帮助我们深入挖掘数据中的潜在主题。它通过特定的架构和内部机制工作,还支持分布式训练,将数据集分割成多个块进行并行处理。训练完成后,我们可以将模型部署为端点并进行推理,例如解读安然邮件中的主题。 图像分类在过去五年中一直是热门研究领域,因为它能解决众多行业的实际业务问题,如自动驾驶汽车行业就高度依赖图像分类和目标检测模型的准确性。A
recommend-type

wed前端开发网页设计

<think>好的,用户需要寻找关于Web前端开发和网页设计的相关资源或教程。首先,我需要理解用户的具体需求,他们可能刚入门,想要系统学习,或者有一定基础,需要进阶教程。根据用户的问题,他们提到了“教程”和“资源”,可能希望推荐书籍、在线课程、框架文档以及社区论坛等。 接下来,我需要参考用户提供的引用内容。引用[1]提到了周文洁的《HTML5网页前端设计实战》,这是一本配套的实战项目教程,适合有基础的读者,可能可以作为书籍推荐之一。引用[2]概述了Web前端开发的技术分类,包括客户端和服务器端技术,以及常用框架如Bootstrap、React等。引用[3]是关于Delphi的TMS WEB
recommend-type

eosforce下的scatter API应用实例教程

### eosforce使用分散API #### 知识点一:什么是EOSForce EOSForce是以EOSIO为技术基础,旨在为区块链应用提供高性能的公链解决方案。它类似于EOS,也使用了EOSIO软件套件,开发者可以基于EOSIO构建DAPP应用,同时它可能拥有与EOS不同的社区治理结构和经济模型。对于开发者来说,了解EOSForce的API和功能是非常关键的,因为它直接影响到应用的开发与部署。 #### 知识点二:scatter API的介绍 scatter API 是一个开源的JavaScript库,它的目的是为了简化EOSIO区块链上各类操作,包括账户管理和交易签名等。scatter旨在提供一个更为便捷、安全的用户界面,通过API接口与EOSIO区块链进行交互。用户无需保存私钥即可与区块链进行交互,使得整个过程更加安全,同时开发者也能够利用scatter实现功能更加强大的应用。 #### 知识点三:scatter API在EOSForce上的应用 在EOSForce上使用scatter API可以简化开发者对于区块链交互的工作,无需直接处理复杂的私钥和签名问题。scatter API提供了一整套用于与区块链交互的方法,包括但不限于账户创建、身份验证、签名交易、数据读取等。通过scatter API,开发者可以更加专注于应用逻辑的实现,而不必担心底层的区块链交互细节。 #### 知识点四:安装和运行scatter_demo项目 scatter_demo是基于scatter API的一个示例项目,通过它可以学习如何将scatter集成到应用程序中。根据提供的描述,安装该项目需要使用npm,即Node.js的包管理器。首先需要执行`npm install`来安装依赖,这个过程中npm会下载scatter_demo项目所需的所有JavaScript包。安装完成后,可以通过运行`npm run dev`命令启动项目,该命令通常与项目中的开发环境配置文件(如webpack.config.js)相对应,用于启动本地开发服务器和热重载功能,以便开发者实时观察代码修改带来的效果。 #### 知识点五:配置eosforce到scatter 在scatter_demo项目中,将eosforce配置到scatter需要进入scatter的设置界面。scatter提供了一个可视化的界面,允许用户管理自己的区块链网络配置。在scatter设置中选择“网络”一栏,然后选择“新建”,在此步骤中需要选择“eos”作为区块链类型。之后,将eosforce的节点配置信息填入对应区域,完成网络的设置。这样,scatter就能够连接到eosforce区块链,用户可以通过scatter API与eosforce区块链进行交互。 #### 知识点六:npm包管理器及安装命令 npm是Node.js的包管理器,它在Node.js项目中扮演着至关重要的角色,用来安装、管理和分享项目所需的代码包。`npm install`是npm的常用命令,用于安装项目依赖。在执行此命令时,npm会根据项目根目录中的`package.json`文件自动下载并安装所需的所有依赖包。这个文件描述了项目的依赖关系,包括每个依赖的版本范围。通过这种方式,开发者可以确保项目在不同的环境中都能够有相同的行为表现。 #### 结论 了解和掌握如何在EOSForce中使用scatter API对于区块链应用开发者来说是极其重要的。scatter简化了与EOSIO区块链的交互过程,提高开发效率,降低了实现复杂功能的技术门槛。在实际操作中,开发者需要关注如何正确安装scatter_demo项目,如何配置eosforce节点到scatter中,以及如何利用scatter API开发出功能完善的区块链应用。通过这些知识点的了解,开发者可以更好地把握EOSForce生态中的开发实践,并在项目中实现安全、便捷的区块链交互。
recommend-type

AI模型评估与应用拓展

# AI模型评估与应用拓展 ## 1. 模型性能评估 ### 1.1 实验假设 我们有三个模型,假设第一个基于周一和周二点击数据训练的模型,在预测周四后期应用下载量方面,不如第二和第三个模型。第二个基于周一到周三点击数据训练的模型,性能也不如第三个基于周一到周四大部分时间点击数据训练的模型。 ### 1.2 特征重要性分析 以下代码用于分析每个模型中重要的特征: ```python exp_lst = ['exp1', 'exp2', 'exp3'] for exp in exp_lst: model_file = os.path.join(sm_output_loc, exp,
recommend-type

elasticsearch serverless

<think> 首先,用户要求了解 Elasticsearch Serverless 的相关信息,包括介绍、使用和优势等。参考的引用内容提供了背景信息,我需要基于这些引用进行回答。引用 [1] 到 [5] 提供了关键点。 关键引用内容: - [^1]: Elasticsearch Serverless 基于 Elasticsearch 的优势,满足现代工作负载的需求,如大数据集、AI 搜索和不可预测的流量。 - [^2]: Elasticsearch Serverless 是云端全托管的服务,基于云原生 Serverless 技术,提供自动弹性和免运维能力,解决资源成本问题,兼容 ELK 生