# 找出 example_usage 中未匹配上的行 merged = example_usage.merge( example_myexample_usage[id_list], on=id_list, how='left', indicator=True ) example_not_data = merged[merged['_merge'] == 'left_only'][example_usage.columns] # 找出 myexample_usage 中未匹配上的行 merged_my = myexample_usage.merge( example_myexample_usage[id_list], on=id_list, how='left', indicator=True ) myexample_not_data = merged_my[merged_my['_merge'] == 'left_only'][myexample_usage.columns]帮我详细解释一下

时间: 2025-09-03 12:15:41 AIGC 浏览: 4
当然可以,下面我将 **逐行详细解释这段代码** 的含义和作用。 --- ## 🎯 目标 你有两个 DataFrame: - `example_usage`:主表 A - `myexample_usage`:主表 B 你已经通过 `id_list`(比如 `['id', 'date']`)进行了 **inner join**,得到了 `example_myexample_usage`,即两个表中 **都匹配上的数据**。 现在你想分别找出: - `example_usage` 中 **没有匹配上** 的行 - `myexample_usage` 中 **没有匹配上** 的行 --- ## 🧠 解决思路 ### ✅ 方法:使用 `merge(..., how='left', indicator=True)` 通过 **左连接(left join)** 将原表与“已匹配的表”进行连接,并使用 `indicator=True` 来标记每行的来源。 这样你就可以通过 `_merge` 列判断哪些行是只存在于原表但未匹配上的。 --- ## 📌 第一步:找出 `example_usage` 中未匹配上的行 ```python merged = example_usage.merge( example_myexample_usage[id_list], on=id_list, how='left', indicator=True ) example_not_data = merged[merged['_merge'] == 'left_only'][example_usage.columns] ``` ### 🔍 逐行解释: #### 1. `example_myexample_usage[id_list]` - 从 `example_myexample_usage` 中只取 `id_list` 指定的列(如 `['id', 'date']`)。 - 这些列是复合键,用于判断是否在 `example_usage` 中存在匹配。 #### 2. `merge(..., on=id_list, how='left')` - 将 `example_usage` 与 `example_myexample_usage[id_list]` 进行左连接。 - 左连接意味着:保留 `example_usage` 中的所有行。 - 如果某行在 `example_myexample_usage` 中找不到匹配的复合键,则相关列为 `NaN`。 #### 3. `indicator=True` - 添加一列 `_merge`,它的值可以是: - `'both'`:该行在左右两个表中都有匹配 - `'left_only'`:该行只在左表(`example_usage`)中存在,未在右表中匹配到 #### 4. `merged['_merge'] == 'left_only'` - 筛选出那些在 `example_usage` 中存在,但在 `example_myexample_usage` 中 **没有匹配上的行**。 #### 5. `[example_usage.columns]` - 最后只保留 `example_usage` 的原始列,去掉 `_merge` 和其他无关列。 --- ## 📌 第二步:找出 `myexample_usage` 中未匹配上的行 ```python merged_my = myexample_usage.merge( example_myexample_usage[id_list], on=id_list, how='left', indicator=True ) myexample_not_data = merged_my[merged_my['_merge'] == 'left_only'][myexample_usage.columns] ``` ### 🔍 逐行解释: 这一段逻辑与上面完全一致,只是将对象换成了 `myexample_usage`: - 用 `myexample_usage` 去和 `example_myexample_usage[id_list]` 做左连接 - 筛选出 `_merge == 'left_only'` 的行 - 保留 `myexample_usage` 的原始列 --- ## ✅ 总结对比 | 步骤 | 说明 | 使用的技术 | |------|------|-------------| | 找出 `example_usage` 中未匹配的行 | 使用左连接 + `_merge == 'left_only'` | `merge(..., how='left', indicator=True)` | | 找出 `myexample_usage` 中未匹配的行 | 同上,换一个主表 | 同上 | --- ## 🧪 示例说明 假设: ```python example_usage = pd.DataFrame({ 'id': [1, 2, 3, 4], 'date': ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04'], 'value1': [10, 20, 30, 40] }) myexample_usage = pd.DataFrame({ 'id': [1, 2, 5, 6], 'date': ['2024-01-01', '2024-01-02', '2024-01-05', '2024-01-06'], 'value2': [100, 200, 500, 600] }) id_list = ['id', 'date'] ``` 运行后: - `example_usage` 中未匹配上的行是: ```python id=1, date=2024-01-01 → 匹配上 id=2, date=2024-01-02 → 匹配上 id=3, date=2024-01-03 → 未匹配上 id=4, date=2024-01-04 → 未匹配上 ``` - `myexample_usage` 中未匹配上的行是: ```python id=5, date=2024-01-05 → 未匹配上 id=6, date=2024-01-06 → 未匹配上 ``` --- ## ✅ 推荐写法封装成函数(可选) 你可以将这个逻辑封装成函数,方便复用: ```python def get_unmatched_rows(left_df, matched_df, key_columns): merged = left_df.merge( matched_df[key_columns], on=key_columns, how='left', indicator=True ) return merged[merged['_merge'] == 'left_only'][left_df.columns] ``` 调用方式: ```python example_not_data = get_unmatched_rows(example_usage, example_myexample_usage, id_list) myexample_not_data = get_unmatched_rows(myexample_usage, example_myexample_usage, id_list) ``` --- ###
阅读全文

相关推荐

import pandas as pd import math as mt import numpy as np from sklearn.model_selection import train_test_split from Recommenders import SVDRecommender triplet_dataset_sub_song_merged = triplet_dataset_sub_song_mergedpd triplet_dataset_sub_song_merged_sum_df = triplet_dataset_sub_song_merged[['user','listen_count']].groupby('user').sum().reset_index() triplet_dataset_sub_song_merged_sum_df.rename(columns={'listen_count':'total_listen_count'},inplace=True) triplet_dataset_sub_song_merged = pd.merge(triplet_dataset_sub_song_merged,triplet_dataset_sub_song_merged_sum_df) triplet_dataset_sub_song_merged['fractional_play_count'] = triplet_dataset_sub_song_merged['listen_count']/triplet_dataset_sub_song_merged small_set = triplet_dataset_sub_song_merged user_codes = small_set.user.drop_duplicates().reset_index() song_codes = small_set.song.drop_duplicates().reset_index() user_codes.rename(columns={'index':'user_index'}, inplace=True) song_codes.rename(columns={'index':'song_index'}, inplace=True) song_codes['so_index_value'] = list(song_codes.index) user_codes['us_index_value'] = list(user_codes.index) small_set = pd.merge(small_set,song_codes,how='left') small_set = pd.merge(small_set,user_codes,how='left') mat_candidate = small_set[['us_index_value','so_index_value','fractional_play_count']] data_array = mat_candidate.fractional_play_count.values row_array = mat_candidate.us_index_value.values col_array = mat_candidate.so_index_value.values data_sparse = coo_matrix((data_array, (row_array, col_array)),dtype=float) K=50 urm = data_sparse MAX_PID = urm.shape[1] MAX_UID = urm.shape[0] recommender = SVDRecommender(K) U, S, Vt = recommender.fit(urm) Compute recommendations for test users uTest = [1,6,7,8,23] uTest_recommended_items = recommender.recommend(uTest, urm, 10) Output recommended songs in a dataframe recommendations = pd.DataFrame(columns=['user','song', 'score','rank']) for user in uTest: rank = 1 for song_index in uTest_recommended_items[user, 0:10]: song = small_set.loc[small_set['so_index_value'] == song_index].iloc[0] # Get song details recommendations = recommendations.append({'user': user, 'song': song['title'], 'score': song['fractional_play_count'], 'rank': rank}, ignore_index=True) rank += 1 display(recommendations)这段代码报错了,为什么?给出修改后的 代码

import torch from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel import os # === 配置路径 === BASE_MODEL_PATH = "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/models/Yi-6B" LORA_ADAPTER_PATH = "./yi6b-lora-optimized" MERGED_OUTPUT_PATH = "./yi6b-merged-fp16" # === 创建输出目录 === os.makedirs(MERGED_OUTPUT_PATH, exist_ok=True) # === 加载原始大模型 (FP16) === print("🔄 Loading base model...") base_model = AutoModelForCausalLM.from_pretrained( BASE_MODEL_PATH, torch_dtype=torch.float16, device_map="auto", trust_remote_code=True ) # === 加载 LoRA Adapter 并合并权重 === print("🔄 Loading LoRA adapter...") peft_model = PeftModel.from_pretrained(base_model, LORA_ADAPTER_PATH) print("🔁 Merging adapter into base model...") merged_model = peft_model.merge_and_unload() # === 保存合并后的模型 === print(f"💾 Saving merged model to: {MERGED_OUTPUT_PATH}") merged_model.save_pretrained(MERGED_OUTPUT_PATH) # === 保存分词器 === tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_PATH, trust_remote_code=True) tokenizer.save_pretrained(MERGED_OUTPUT_PATH) # === 推理验证 === print("🧪 Running inference test...") try: prompt = "中国的首都是" inputs = tokenizer(prompt, return_tensors="pt").to(merged_model.device) outputs = merged_model.generate( **inputs, max_new_tokens=50, temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) print(f"📝 Prompt: {prompt}") print(f"✅ Generated: {generated}") except RuntimeError as e: print("❌ RuntimeError during inference:") print(str(e)) if "mat1 and mat2 shapes cannot be multiplied" in str(e): print("🚨 Tip: Don't use merge_and_unload() on 4bit models. Try using full-precision (FP16) base model.") exit(1) print("🎉 Merge and test completed successfully!")

将上述代码放入了Recommenders.py文件中,作为一个自定义工具包。将下列代码中调用scipy包中svd的部分。转为使用Recommenders.py工具包中封装的svd方法。给出修改后的完整代码。import pandas as pd import math as mt import numpy as np from sklearn.model_selection import train_test_split from Recommenders import * from scipy.sparse.linalg import svds from scipy.sparse import coo_matrix from scipy.sparse import csc_matrix # Load and preprocess data triplet_dataset_sub_song_merged = triplet_dataset_sub_song_mergedpd # load dataset triplet_dataset_sub_song_merged_sum_df = triplet_dataset_sub_song_merged[['user','listen_count']].groupby('user').sum().reset_index() triplet_dataset_sub_song_merged_sum_df.rename(columns={'listen_count':'total_listen_count'},inplace=True) triplet_dataset_sub_song_merged = pd.merge(triplet_dataset_sub_song_merged,triplet_dataset_sub_song_merged_sum_df) triplet_dataset_sub_song_merged['fractional_play_count'] = triplet_dataset_sub_song_merged['listen_count']/triplet_dataset_sub_song_merged['total_listen_count'] # Convert data to sparse matrix format small_set = triplet_dataset_sub_song_merged user_codes = small_set.user.drop_duplicates().reset_index() song_codes = small_set.song.drop_duplicates().reset_index() user_codes.rename(columns={'index':'user_index'}, inplace=True) song_codes.rename(columns={'index':'song_index'}, inplace=True) song_codes['so_index_value'] = list(song_codes.index) user_codes['us_index_value'] = list(user_codes.index) small_set = pd.merge(small_set,song_codes,how='left') small_set = pd.merge(small_set,user_codes,how='left') mat_candidate = small_set[['us_index_value','so_index_value','fractional_play_count']] data_array = mat_candidate.fractional_play_count.values row_array = mat_candidate.us_index_value.values col_array = mat_candidate.so_index_value.values data_sparse = coo_matrix((data_array, (row_array, col_array)),dtype=float) # Compute SVD def compute_svd(urm, K): U, s, Vt = svds(urm, K) dim = (len(s), len(s)) S = np.zeros(dim, dtype=np.float32) for i in range(0, len(s)): S[i,i] = mt.sqrt(s[i]) U = csc_matrix(U, dtype=np.float32) S = csc_matrix(S, dtype=np.float32) Vt = csc_matrix(Vt, dtype=np.float32) return U, S, Vt def compute_estimated_matrix(urm, U, S, Vt, uTest, K, test): rightTerm = S*Vt max_recommendation = 10 estimatedRatings = np.zeros(shape=(MAX_UID, MAX_PID), dtype=np.float16) recomendRatings = np.zeros(shape=(MAX_UID,max_recommendation ), dtype=np.float16) for userTest in uTest: prod = U[userTest, :]*rightTerm estimatedRatings[userTest, :] = prod.todense() recomendRatings[userTest, :] = (-estimatedRatings[userTest, :]).argsort()[:max_recommendation] return recomendRatings K=50 # number of factors urm = data_sparse MAX_PID = urm.shape[1] MAX_UID = urm.shape[0] U, S, Vt = compute_svd(urm, K) # Compute recommendations for test users # Compute recommendations for test users uTest = [1,6,7,8,23] uTest_recommended_items = compute_estimated_matrix(urm, U, S, Vt, uTest, K, True) # Output recommended songs in a dataframe recommendations = pd.DataFrame(columns=['user','song', 'score','rank']) for user in uTest: rank = 1 for song_index in uTest_recommended_items[user, 0:10]: song = small_set.loc[small_set['so_index_value'] == song_index].iloc[0] # Get song details recommendations = recommendations.append({'user': user, 'song': song['title'], 'score': song['fractional_play_count'], 'rank': rank}, ignore_index=True) rank += 1 display(recommendations)

import pandas as pd import os def process_item_mapping(): try: # 路径配置 input_purchases = 'delete/TranLine_Item_Purchases.csv' input_mapping = 'Mapping.csv' output_file = 'add/Updated_TranLine_Item_Purchases.csv' output_unmatched = 'add/Unmatched_Records.csv' # 自动创建输出目录 os.makedirs('add', exist_ok=True) # 读取数据(处理常见编码问题) purchases = pd.read_csv(input_purchases, encoding='utf-8-sig') mapping = pd.read_csv(input_mapping, encoding='utf-8-sig') # 列存在性验证 required_columns = {'Line Unique Key', 'Item'} if not required_columns.issubset(purchases.columns): missing = required_columns - set(purchases.columns) raise ValueError(f"购买表中缺少必要列:{missing}") # 执行左连接 merged = pd.merge( purchases, mapping[['Line Unique Key', 'Item Internal ID']], on='Line Unique Key', how='left', validate='one_to_one' # 确保键唯一性 ) # 列位置调整 if 'Item' in merged.columns: item_pos = merged.columns.get_loc('Item') + 1 cols = list(merged.columns) cols.insert(item_pos, cols.pop(cols.index('Item Internal ID'))) merged = merged[cols] # 生成未匹配报告 unmatched = merged[merged['Item Internal ID'].isna()] if not unmatched.empty: unmatched.to_csv(output_unmatched, index=False) print(f"发现{len(unmatched)}条未匹配记录,已保存到:{output_unmatched}") # 保存结果 merged.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"处理完成!结果已保存到:{output_file}") print(f"总记录数:{len(merged)} | 匹配成功率:{merged['Item Internal ID'].notna().mean():.2%}") except FileNotFoundError as e: print(f"文件不存在错误:{str(e)}") except pd.errors.EmptyDataError: print("错误:CSV文件内容为空或格式错误") except Exception as e: print(f"未预料的错误:{str(e)}") if __name__ == "__main__": process_item_mapping(),帮我优化一下,遍历delete文件夹里的所有文件,不再是某一个文件

import pandas as pd import numpy as np from datetime import datetime, timedelta, time import os from typing import List, Dict, Optional # ====================== 基础参数 ====================== CHANGE_TIME = 3.5 # 换模时间(小时) REST_PERIODS = [ # 每日休息时段(小时) (12, 12.5), # 12:00-12:30 (16.5, 17), # 16:30-17:00 (0, 1) # 0:00-1:00 ] # ====================== 类定义 ====================== class Order: def __init__(self, order_id, product_id, quantity, delivery_date, capacity, weight, attr): self.order_id = order_id # 订单号 self.product_id = product_id # 产品号 self.quantity = quantity # 数量(pcs) self.delivery_date = delivery_date # 发货日期(datetime) self.capacity = capacity # 产能(pcs/h) self.weight = weight # 产品重量(g) self.processing_hours = quantity / capacity # 生产工时(h) self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 self.delay_days = 0 # 延期天数 self.attr = attr # 产品属性(1:易搭配,3:难生产) self.merged_id = None # 合单序号 self.plan_number = None # 计划序号 self.forward_plan = None # 前置任务计划序号 class MergedOrder: def __init__(self, product_id, total_quantity, delivery_dates, capacity, weight, attr, original_orders, merged_id): self.product_id = product_id # 产品号 self.total_quantity = total_quantity # 合并后总数量 self.delivery_dates = delivery_dates # 原始订单发货日期列表 self.capacity = capacity # 产能 self.weight = weight # 产品重量 self.processing_hours = total_quantity / capacity # 总生产工时 self.attr = attr # 产品属性 self.original_orders = original_orders # 原始订单对象列表 self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 self.merged_id = merged_id # 合单序号 self.plan_number = None # 计划序号 class Machine: def __init__(self, machine_id, initial_time=None): self.machine_id = machine_id # 机台号 self.available_time = initial_time or datetime(2025, 3, 1) # 初始可用时间 self.last_product = None # 上一生产产品 self.adjacent = [] # 相邻机台列表 self.schedule = [] # 排程计划(任务列表) self.last_plan_number = None # 上一任务计划序号 # ====================== 数据加载 ====================== def load_official_data(data_dir: str = "."): """ 加载附件数据 需修改data_dir为附件所在目录(例如:"C:/Users/你的用户名/Desktop/附件") """ try: # 读取附件1 attachment1 = pd.ExcelFile(os.path.join(data_dir, "附件1(1).xlsx")) orders_df = attachment1.parse( sheet_name="订单表", parse_dates=["发货日期(DeliveryDate)"], usecols=["订单号", "产品号", "订单数量", "发货日期(DeliveryDate)", "Weight/pcs(g)"] ) machine_initial_df = attachment1.parse( sheet_name="机台初始工作状态表", parse_dates=["生产开始时间"] ) holidays_df = attachment1.parse(sheet_name="放假日期表") holidays = {pd.to_datetime(row["放假日期"]).date() for _, row in holidays_df.iterrows()} # 读取附件2 attachment2 = pd.ExcelFile(os.path.join(data_dir, "附件2(1).xlsx")) product_capacity_df = attachment2.parse(sheet_name="产品工时计算参数表") product_machine_df = attachment2.parse(sheet_name="产品机台生产关系表") machine_relation_df = attachment2.parse(sheet_name="机台关系表") product_attr_df = attachment2.parse(sheet_name="产品属性表") # 构建数据映射 product_capacity = {row["产品号"]: row["Capacity(pcs/h)"] for _, row in product_capacity_df.iterrows()} product_weight = {row["产品号"]: row["Weight/pcs(g)"] for _, row in product_capacity_df.iterrows()} product_machines = {} for _, row in product_machine_df.iterrows(): valid_machines = [col for col in product_machine_df.columns if col != "产品号" and row[col] == 1] product_machines[row["产品号"]] = valid_machines machine_adjacent = {} for _, row in machine_relation_df.iterrows(): adjacent_machines = [col for col in machine_relation_df.columns if col != "机台号" and row[col] == 1] machine_adjacent[row["机台号"]] = adjacent_machines product_attrs = {row["产品号"]: row["属性"] for _, row in product_attr_df.iterrows()} # 补充订单表产能和重量 orders_df["Capacity(pcs/h)"] = orders_df["产品号"].map(product_capacity) orders_df["Weight/pcs(g)"] = orders_df["产品号"].map(product_weight) # 初始化机台 machine_initial_times = {row["机台号"]: row["生产开始时间"] for _, row in machine_initial_df.iterrows()} machines = [] for mid in machine_initial_times.keys(): machine = Machine(mid, machine_initial_times[mid]) machine.adjacent = machine_adjacent.get(mid, []) machines.append(machine) return orders_df, product_machines, product_attrs, machines, holidays except Exception as e: print(f"数据加载失败,请检查附件路径和格式:{e}") raise # ====================== 合单逻辑 ====================== def merge_orders(orders: List[Order], merge_days: int) -> tuple[List[MergedOrder], int]: merged_tasks = [] merge_count = 0 merged_id_counter = 1 product_groups = {} for order in orders: if order.product_id not in product_groups: product_groups[order.product_id] = [] product_groups[order.product_id].append(order) for product_id, group_orders in product_groups.items(): group_orders.sort(key=lambda x: x.delivery_date) current_merge = [group_orders[0]] for i in range(1, len(group_orders)): date_diff = (group_orders[i].delivery_date - current_merge[0].delivery_date).days if date_diff <= merge_days: current_merge.append(group_orders[i]) else: merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in current_merge), delivery_dates=[o.delivery_date for o in current_merge], capacity=current_merge[0].capacity, weight=current_merge[0].weight, attr=current_merge[0].attr, original_orders=current_merge, merged_id=merged_id_counter )) merge_count += 1 if len(current_merge) > 1 else 0 merged_id_counter += 1 current_merge = [group_orders[i]] merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in current_merge), delivery_dates=[o.delivery_date for o in current_merge], capacity=current_merge[0].capacity, weight=current_merge[0].weight, attr=current_merge[0].attr, original_orders=current_merge, merged_id=merged_id_counter )) if len(current_merge) > 1: merge_count += 1 merged_id_counter += 1 return merged_tasks, merge_count # ====================== 排程逻辑 ====================== def calculate_end_time(start_time: datetime, processing_hours: float, holidays: set) -> datetime: current = start_time remaining = processing_hours while remaining > 0: if current.date() in holidays: current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) continue day_start = datetime.combine(current.date(), time(0, 0)) intervals = [] prev_end = day_start for rest_start, rest_end in REST_PERIODS: rest_start_time = day_start + timedelta(hours=rest_start) rest_end_time = day_start + timedelta(hours=rest_end) if prev_end < rest_start_time: intervals.append((prev_end, rest_start_time)) prev_end = rest_end_time if prev_end < day_start + timedelta(hours=24): intervals.append((prev_end, day_start + timedelta(hours=24))) for (s, e) in intervals: if current < s: current = s if current >= e: continue available = (e - current).total_seconds() / 3600 use = min(remaining, available) current += timedelta(hours=use) remaining -= use if remaining <= 0: return current current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) return current def problem3_scheduling(merged_tasks: List[MergedOrder], product_machines: Dict, machines: List[Machine], holidays: set) -> tuple[List[Order], List[Dict]]: merged_tasks.sort(key=lambda x: (x.attr, min(x.delivery_dates))) machine_map = {m.machine_id: m for m in machines} plan_number = 1 detailed_results = [] for task in merged_tasks: candidate_machines = [m for m in machines if m.machine_id in product_machines.get(task.product_id, [])] if task.attr == 1: m03 = next((m for m in candidate_machines if m.machine_id == "M03"), None) if m03: candidate_machines = [m03] + [m for m in candidate_machines if m.machine_id != "M03"] best_machine = None best_end = None best_start = None for machine in candidate_machines: change = CHANGE_TIME if machine.last_product != task.product_id else 0 initial_start = machine.available_time + timedelta(hours=change) end_time = calculate_end_time(initial_start, task.processing_hours, holidays) adjusted_start = initial_start if task.attr == 3: for adj_id in machine.adjacent: adj_machine = machine_map.get(adj_id) if adj_machine and adj_machine.schedule: last_task = adj_machine.schedule[-1] if (last_task["product_attr"] == 3 and not (end_time <= last_task["start"] or adjusted_start >= last_task["end"])): adjusted_start = max(adjusted_start, last_task["end"]) end_time = calculate_end_time(adjusted_start, task.processing_hours, holidays) if best_end is None or end_time < best_end: best_end = end_time best_start = adjusted_start best_machine = machine if best_machine: task.start_time = best_start task.end_time = best_end task.machine = best_machine.machine_id task.plan_number = plan_number for original_order in task.original_orders: original_order.start_time = best_start original_order.end_time = best_end original_order.machine = best_machine.machine_id original_order.merged_id = task.merged_id original_order.plan_number = plan_number original_order.forward_plan = best_machine.last_plan_number original_order.delay_days = max(0, (best_end.date() - original_order.delivery_date.date()).days) detailed_results.append({ "计划序号(Plan Number)": plan_number, "生产计划安排机台号": best_machine.machine_id, "订单号PO": original_order.order_id, "产品号": original_order.product_id, "需要工时 (Requested Time--Hours)": round(original_order.processing_hours, 2), "生产计划开始时间": original_order.start_time.strftime("%Y-%m-%d %H:%M:%S"), "生产计划预计完成时间": original_order.end_time.strftime("%Y-%m-%d %H:%M:%S"), "发货日期(DeliveryDate)": original_order.delivery_date.strftime("%Y-%m-%d"), "订单数量": original_order.quantity, "Weight/pcs(g)": original_order.weight, "Capacity(pcs/h)": original_order.capacity, "最迟开始时间 (Late Start Time)": (original_order.delivery_date - timedelta( hours=original_order.processing_hours )).strftime("%Y-%m-%d %H:%M:%S"), "合单序号(Joined Order Number)": task.merged_id, "前置任务生产计划序号": original_order.forward_plan or "", "是否延期": "是" if original_order.delay_days > 0 else "否", "延期天数": original_order.delay_days }) best_machine.available_time = best_end best_machine.last_product = task.product_id best_machine.last_plan_number = plan_number best_machine.schedule.append({ "product": task.product_id, "start": best_start, "end": best_end, "product_attr": task.attr }) plan_number += 1 return detailed_results # ====================== 导出结果 ====================== def export_results(detailed_results: List[Dict], output_path: str = "附件5_生产排程计划表.xlsx"): """ 导出完整排程结果到Excel 需修改output_path为期望的导出路径(例如:"C:/Users/你的用户名/Desktop/附件5_结果.xlsx") """ df = pd.DataFrame(detailed_results) column_order = [ "计划序号(Plan Number)", "生产计划安排机台号", "订单号PO", "产品号", "需要工时 (Requested Time--Hours)", "生产计划开始时间", "生产计划预计完成时间", "发货日期(DeliveryDate)", "订单数量", "Weight/pcs(g)", "Capacity(pcs/h)", "最迟开始时间 (Late Start Time)", "合单序号(Joined Order Number)", "前置任务生产计划序号", "是否延期", "延期天数" ] df = df[column_order] df.to_excel(output_path, index=False) print(f"结果已导出至:{os.path.abspath(output_path)}") # ====================== 主函数 ====================== def main(): # ---------------------- 需要您修改的部分 ---------------------- # 1. 附件所在目录:将"."改为附件实际存放路径(例如:"C:/Users/你的用户名/Desktop/附件") DATA_DIR = "." # 2. 导出文件路径:将"附件5_生产排程计划表.xlsx"改为期望的导出路径(例如:"C:/Users/你的用户名/Desktop/结果.xlsx") OUTPUT_PATH = "附件5_生产排程计划表.xlsx" # 3. 合单天数:可根据需要修改(当前为7天,可改为15或30天) MERGE_DAYS = 7 # ------------------------------------------------------------- try: orders_df, product_machines, product_attrs, machines, holidays = load_official_data(DATA_DIR) original_orders = [] for _, row in orders_df.iterrows(): try: order = Order( order_id=row["订单号"], product_id=row["产品号"], quantity=row["订单数量"], delivery_date=row["发货日期(DeliveryDate)"], capacity=row["Capacity(pcs/h)"], weight=row["Weight/pcs(g)"], attr=product_attrs.get(row["产品号"], 2) ) original_orders.append(order) except Exception as e: print(f"订单初始化失败: {e} - 行数据: {row}") if not original_orders: raise ValueError("没有有效订单,请检查数据格式") merged_tasks, merge_count = merge_orders(original_orders, MERGE_DAYS) print(f"合单完成,共合并 {merge_count} 次") detailed_results = problem3_scheduling(merged_tasks, product_machines, machines, holidays) print(f"排程完成,共生成 {len(detailed_results)} 条计划") export_results(detailed_results, OUTPUT_PATH) except Exception as e: print(f"程序执行失败: {e}") if __name__ == "__main__": main() 最终结果呈现应该要有计划序号生产计划安排机台号,订单号,产品号需要工时生产,计划开始时间,生产计划预计完成时间,发货日期,订单数量,重量,产能,最迟开始时间,合单序号,前置任务生产计划序号,是否延期,延期天数

def get_range_cross_ranges(range_start_, range_end_, range_list_): # 将输入的 range_start_ 和 range_end_ 转换为 (min_row, min_col, max_row, max_col) 形式 min_row, min_col = range_start_ max_row, max_col = range_end_ # 遍历 range_list_ 中的每一个 range for r in range_list_: # 将 range_list_ 中的每一个 range 转换为 (min_row, min_col, max_row, max_col) 形式 (r_min_row, r_min_col), (r_max_row, r_max_col) = r # 判断输入的 range 是否与当前 range 有重合 if not (max_row < r_min_row or min_row > r_max_row or max_col < r_min_col or min_col > r_max_col): return r # 如果没有找到与输入 range 重合的 range,返回 False return [] def merge_ranges2_to_ranges1(range_list_1, range_list_2): merged_range_list_1 = [] for range1 in range_list_1: cross_range = get_range_cross_ranges(*range1, range_list_2) if cross_range: range1 = ((min(range1[0][0], cross_range[0][0]), min(range1[0][1], cross_range[0][1])), (max(range1[1][0], cross_range[1][0]), max(range1[1][1], cross_range[1][1]))) merged_range_list_1.append(range1) return merge_ranges_to_self(merged_range_list_1) def pop_items_by_indices(original_list, indices): # 将索引列表按降序排序,以便从后往前移除项目 indices.sort(reverse=True) pop_list = [] # 遍历索引列表,并从原始列表中移除相应的项目 for index in indices: if 0 <= index < len(original_list): pop_list.insert(0, original_list.pop(index)) return original_list, pop_list def merge_ranges_to_self(range_list): _merged_ranges = [] for range1 in range_list: _mix_indexes = get_range_cross_ranges_indexes(*range1, _merged_ranges) if _mix_indexes: _merged_ranges, need_merge_ranges = pop_items_by_indices(_merged_ranges, _mix_indexes) need_merge_min_rows = [i[0][0] for i in need_merge_ranges] need_merge_min_cols = [i[0][1] for i in need_merge_ranges] need_merge_max_rows = [i[1][0] for i in need_merge_ranges] need_merge_max_cols = [i[1][1] for i in need_merge_ranges] _merged_ranges.append( ((min(range1[0][0], *need_merge_min_rows), min(range1[0][1], *need_merge_min_cols)), (max(range1[1][0], *need_merge_max_rows), max(range1[1][1], *need_merge_max_cols))) ) else: _merged_ranges.append(range1) return _merged_ranges def get_range_cross_ranges_indexes(range_start_, range_end_, range_list_): # 将输入的 range_start_ 和 range_end_ 转换为 (min_row, min_col, max_row, max_col) 形式 min_row, min_col = range_start_ max_row, max_col = range_end_ result = [] # 遍历 range_list_ 中的每一个 range for idx, r in enumerate(range_list_): # 将 range_list_ 中的每一个 range 转换为 (min_row, min_col, max_row, max_col) 形式 (r_min_row, r_min_col), (r_max_row, r_max_col) = r # 判断输入的 range 是否与当前 range 有重合 if not (max_row < r_min_row or min_row > r_max_row or max_col < r_min_col or min_col > r_max_col): result.append(idx) # 如果没有找到与输入 range 重合的 range,返回 False return result 注释

import pandas as pd import numpy as np # 计算用户对歌曲的播放比例 triplet_dataset_sub_song_merged_sum_df = triplet_dataset_sub_song_mergedpd[['user', 'listen_count']].groupby('user').sum().reset_index() triplet_dataset_sub_song_merged_sum_df.rename(columns={'listen_count': 'total_listen_count'}, inplace=True) triplet_dataset_sub_song_merged = pd.merge(triplet_dataset_sub_song_mergedpd, triplet_dataset_sub_song_merged_sum_df) triplet_dataset_sub_song_mergedpd['fractional_play_count'] = triplet_dataset_sub_song_mergedpd['listen_count'] / triplet_dataset_sub_song_merged['total_listen_count'] # 将用户和歌曲编码为数字 small_set = triplet_dataset_sub_song_mergedpd user_codes = small_set.user.drop_duplicates().reset_index() song_codes = small_set.song.drop_duplicates().reset_index() user_codes.rename(columns={'index': 'user_index'}, inplace=True) song_codes.rename(columns={'index': 'song_index'}, inplace=True) song_codes['so_index_value'] = list(song_codes.index) user_codes['us_index_value'] = list(user_codes.index) small_set = pd.merge(small_set, song_codes, how='left') small_set = pd.merge(small_set, user_codes, how='left') # 将数据转换为稀疏矩阵形式 from scipy.sparse import coo_matrix mat_candidate = small_set[['us_index_value', 'so_index_value', 'fractional_play_count']] data_array = mat_candidate.fractional_play_count.values row_array = mat_candidate.us_index_value.values col_array = mat_candidate.so_index_value.values data_sparse = coo_matrix((data_array, (row_array, col_array)), dtype=float) # 使用SVD方法进行矩阵分解并进行推荐 from scipy.sparse import csc_matrix from scipy.sparse.linalg import svds import math as mt def compute_svd(urm, K): U, s, Vt = svds(urm, K) dim = (len(s), len(s)) S = np.zeros(dim, dtype=np.float32) for i in range(0, len(s)): S[i, i] = mt.sqrt(s[i]) U = csc_matrix(U, dtype=np.float32) S = csc_matrix(S, dtype=np.float32) Vt = csc_matrix(Vt, dtype=np.float32) return U, S, Vt def compute_estimated_matrix(urm, U, S, Vt, uTest, K, test): rightTerm = S * Vt max_recommendation = 250 estimatedRatings = np.zeros(shape=(MAX_UID, MAX_PID), dtype=np.float16) recomendRatings = np.zeros(shape=(MAX_UID, max_recommendation), dtype=np.float16) for userTest in uTest: prod = U[userTest, :] * rightTerm estimatedRatings[userTest, :] = prod.todense() recomendRatings[userTest, :] = (-estimatedRatings[userTest, :]).argsort()[:max_recommendation] return recomendRatings K = 50 urm = data_sparse MAX_PID = urm.shape[1] MAX_UID = urm.shape[0] U, S, Vt = compute_svd(urm, K) uTest = [4, 5, 6, 7, 8, 73, 23] # uTest=[1b5bb32767963cbc215d27a24fef1aa01e933025] uTest_recommended_items = compute_estimated_matrix(urm, U, S, Vt 继续将这段代码输出完整

import pandas as pd import numpy as np from prophet import Prophet # 1. 新增:导入评估指标计算所需的库 from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error # 1. 优化数据读取和预处理 def preprocess_data(): # 读取数据 df_items = pd.read_excel(r"D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C题\附件1.xlsx", engine='openpyxl') df_sales = pd.read_excel(r"D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C题\附件2.xlsx", engine='openpyxl') # 合并数据集 df_merged = pd.merge(df_sales, df_items, on='单品编码', how='left') # 计算金额并处理退货 df_merged['金额'] = df_merged['销量(千克)'] * df_merged['销售单价(元/千克)'] mask = df_merged['销售类型'] == '退货' df_merged.loc[mask, '金额'] = -df_merged.loc[mask, '金额'] df_merged.loc[mask, '销量(千克)'] = -df_merged.loc[mask, '销量(千克)'] return df_merged, df_items # 2. 优化数据聚合和透视 def create_pivot_tables(df_merged, df_items): # 计算每日指标 daily_metrics = df_merged.groupby(['单品编码', '单品名称', '销售日期']).agg( 平均价格=('销售单价(元/千克)', 'mean'), 总销量=('销量(千克)', 'sum'), 总金额=('金额', 'sum') ).reset_index() # 计算退货率 return_rate = df_merged.groupby('单品编码').apply( lambda x: x[x['销售类型']=='退货']['金额'].sum() / x[x['销售类型']=='销售']['金额'].sum() if x[x['销售类型']=='销售']['金额'].sum() != 0 else 0 ).reset_index(name='退货率') # 创建透视表 pivot_func = lambda metric: daily_metrics.pivot_table( index=['单品编码', '单品名称'], columns='销售日期', values=metric, fill_value=0 ).reset_index() price_pivot = pivot_func('平均价格') vol_pivot = pivot_func('总销量') sales_pivot = pivot_func('总金额') # 合并品类和退货率信息 final_dfs = {} for df, name in zip([price_pivot, vol_pivot, sales_pivot], ['price', 'vol', 'sales']): result = pd.merge( df_items[['单品编码', '单品名称', '分类名称']], return_rate, on='单品编码', how='left' ) final_dfs[name] = pd.merge( result, df, on=['单品编码', '单品名称'], how='left' ) return final_dfs # 3. 优化文件保存 def save_results(final_dfs): final_dfs['price'].to_excel( r'D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C235问题二output_price.xlsx', index=False, engine='openpyxl' ) final_dfs['vol'].to_excel( r'D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C235问题二output_vol.xlsx', index=False, engine='openpyxl' ) final_dfs['sales'].to_excel( r'D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C235问题二output_sales.xlsx', index=False, engine='openpyxl' ) # 4. 新增:定义模型性能评估函数 def evaluate_prophet_performance(y_true, y_pred): """计算Prophet模型预测的R², MAE和RMSE。""" r2 = r2_score(y_true, y_pred) mae = mean_absolute_error(y_true, y_pred) rmse = np.sqrt(mean_squared_error(y_true, y_pred)) return r2, mae, rmse # 5. 修改:优化Prophet分析流程并集成评估 def prophet_analysis(): # 读取数据并重命名列 # 假设'预处理后的数据表.xlsx'中,第一列是日期'ds',后续列是各个品类的销售数据 input_df_sales = pd.read_excel( r"D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C题\预处理后的数据表.xlsx", sheet_name="Sheet1" ).rename(columns={'销售日期': 'ds'}) df_price = pd.read_excel( r"D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C题\p3_预处理.xlsx" ).rename(columns={'销售日期': 'ds'}) # 用于存储最终评估结果的列表 evaluation_results = [] # 分析每个品类 for col in input_df_sales.columns[1:]: # 准备数据 df_sales = input_df_sales[['ds', col]].rename(columns={col: 'y'}) df_sales = df_sales[df_sales['y'] > 0] merged_df = pd.merge(df_sales, df_price, on='ds') # 初始化模型 model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False ) model.add_country_holidays(country_name='CN') # 动态添加回归量 # 假设df_price中包含与品类名相同的列作为回归量 if col in df_price.columns: model.add_regressor(col) # 训练模型 model.fit(merged_df) # 预测(仅对训练数据进行预测以评估拟合水平) future = model.make_future_dataframe(periods=0) future = pd.merge(future, df_price, on='ds') forecast = model.predict(future) # 评估模型性能 y_true = merged_df['y'] y_pred = forecast['yhat'] r2, mae, rmse = evaluate_prophet_performance(y_true, y_pred) # 将结果存入列表 evaluation_results.append({ '品类名称': col, 'R²': r2, 'MAE': mae, 'RMSE': rmse }) print(f"已完成对品类 '{col}' 的分析和评估。") # 将结果列表转换为DataFrame results_df = pd.DataFrame(evaluation_results) results_df = results_df.set_index('品类名称') # 将品类名称设为索引 return results_df # 6. 优化成本利润率计算 def calculate_profit_margin(): # 读取数据 price_df = pd.read_excel( r'D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C235问题二output_price.xlsx', engine='openpyxl' ) cost_df = pd.read_excel( r"D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C题\附件3.xlsx", engine='openpyxl' ) # 转换成本数据为宽格式 pivot_cost = cost_df.pivot_table( index='单品编码', columns='日期', values='批发价格(元/千克)', fill_value=0 ).reset_index() # 合并价格和成本数据 merged_df = pd.merge( price_df[['单品编码']], pivot_cost, on='单品编码', how='left' ) # 计算成本利润率 date_cols = [col for col in price_df.columns if isinstance(col, pd.Timestamp)] for date in date_cols: if date in merged_df: # 确保分母不为0 price_series = price_df[date] cost_series = merged_df[date] # 使用np.divide来安全地处理除以0的情况 margin = np.divide(price_series - cost_series, cost_series, where=cost_series!=0) merged_df[date] = margin merged_df[date] = merged_df[date].replace([np.inf, -np.inf], np.nan) # 替换无穷大值 # 保存结果 merged_df.to_excel( r'D:\虚拟c盘\大学\竞赛\数学建模\2023年C题\C235问题二cost_profit_margin.xlsx', index=False, engine='openpyxl' ) return merged_df # 主流程 def main(): # 数据预处理 # df_merged, df_items = preprocess_data() # 创建透视表 # final_dfs = create_pivot_tables(df_merged, df_items) # 保存结果 # save_results(final_dfs) # Prophet分析与评估 prophet_results_df = prophet_analysis() # 按照图片格式输出结果 print("\n表3 各品类蔬菜的 Prophet 模型拟合水平") # 将DataFrame中的数值四舍五入到小数点后两位 print(prophet_results_df.round(2)) # 计算成本利润率 # profit_margin_df = calculate_profit_margin() # print("\n已完成成本利润率计算并保存文件。") if __name__ == "__main__": main()

import pandas as pd import numpy as np from datetime import datetime, timedelta, time import os 基础参数(文档约束) CHANGE_TIME = 3.5 # 换模时间(小时) REST_PERIODS = [ # 每日休息时段(小时) (12, 12.5), # 12:00-12:30 (16.5, 17), # 16:30-17:00 (0, 1) # 0:00-1:00 ] class Order: “”“订单类:封装订单属性与计算逻辑”“” def init(self, order_id, product_id, quantity, delivery_date, capacity, attr): self.order_id = order_id # 订单号 self.product_id = product_id # 产品号 self.quantity = quantity # 数量(pcs) self.delivery_date = delivery_date # 发货日期(datetime) self.capacity = capacity # 产能(pcs/h) self.processing_hours = quantity / capacity # 生产工时(h) self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 self.delay_days = 0 # 延期天数 self.satisfaction = 10.0 # 满意度(初始10分) self.attr = attr # 产品属性(1:易搭配,3:难生产) self.merged_orders = [order_id] # 合单包含的原始订单ID(合单专用") class MergedOrder: “”“合单任务类:封装合并后的生产任务”“” def init(self, product_id, total_quantity, delivery_dates, capacity, attr, original_orders): self.product_id = product_id # 产品号 self.total_quantity = total_quantity # 合并后总数量 self.delivery_dates = delivery_dates # 原始订单发货日期列表 self.capacity = capacity # 产能 self.processing_hours = total_quantity / capacity # 总生产工时 self.attr = attr # 产品属性 self.original_orders = original_orders # 原始订单对象列表 self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 class Machine: “”“机台类:封装机台属性与状态”“” def init(self, machine_id, initial_time=None): self.machine_id = machine_id # 机台号 # 设置初始可用时间 if initial_time is None: self.available_time = datetime(2025, 3, 1) else: self.available_time = initial_time self.last_product = None # 上一生产产品 self.adjacent = [] # 相邻机台列表 self.schedule = [] # 排程计划(任务列表) def load_official_data(): “”“加载正式数据(使用相对路径)”“” try: # 1. 读取附件1(1)数据 attachment1 = pd.ExcelFile(r"C:\Users\彭婉\Desktop\2025-08\附件1(1).xlsx") # 1.1 订单表 orders_df = attachment1.parse( sheet_name=“订单表”, parse_dates=[“发货日期(DeliveryDate)”] ) # 1.2 机台初始工作状态表 machine_initial_df = attachment1.parse( sheet_name=“机台初始工作状态表”, parse_dates=[“生产开始时间”] ) machine_initial_times = { row[“机台号”]: row[“生产开始时间”] for _, row in machine_initial_df.iterrows() } # 1.3 放假日期表 holidays_df = attachment1.parse(sheet_name=“放假日期表”) holidays = # 2. 读取附件2(1)数据 attachment2 = pd.ExcelFile(r"C:\Users\彭婉\Desktop\2025-08\附件2(1).xlsx") # 2.1 产品工时计算参数表 product_capacity_df = attachment2.parse(sheet_name="产品工时计算参数表") product_capacity = { row["产品号"]: row["Capacity(pcs/h)"] for _, row in product_capacity_df.iterrows() } # 2.2 产品机台生产关系表 product_machine_df = attachment2.parse(sheet_name="产品机台生产关系表") product_machines = {} for _, row in product_machine_df.iterrows(): product_id = row["产品号"] # 找出值为1的机台列 valid_machines = [col for col in product_machine_df.columns if col != "产品号" and row[col] == 1] product_machines[product_id] = valid_machines # 2.3 机台关系表 machine_relation_df = attachment2.parse(sheet_name="机台关系表") machine_adjacent = {} for _, row in machine_relation_df.iterrows(): machine_id = row["机台号"] # 找出值为1的相邻机台列 adjacent_machines = [col for col in machine_relation_df.columns if col != "机台号" and row[col] == 1] machine_adjacent[machine_id] = adjacent_machines # 2.4 产品属性表 product_attr_df = attachment2.parse(sheet_name="产品属性表") product_attrs = { row["产品号"]: row["属性"] for _, row in product_attr_df.iterrows() } # 3. 初始化订单数据(补充产能列) orders_df["Capacity(pcs/h)"] = orders_df["产品号"].map(product_capacity) # 4. 初始化机台对象 machines = [] for mid in machine_initial_times.keys(): # 确保每个机台都有初始时间 initial_time = machine_initial_times.get(mid, datetime(2025, 3, 1)) machine = Machine(mid, initial_time) # 设置相邻机台关系 machine.adjacent = machine_adjacent.get(mid, []) machines.append(machine) return orders_df, product_machines, product_attrs, machines, holidays except Exception as e: print(f"加载数据错误: {e}") # 返回空数据集避免崩溃 return pd.DataFrame(), {}, {}, [], set() def merge_orders(orders, merge_days): “”“合单逻辑:相同产品号的订单,发货日期在merge_days内的合并为一个生产任务”“” merged_tasks = [] merge_count = 0 # 合单次数(合并的组数) # 按产品号分组 product_groups = {} for order in orders: if order.product_id not in product_groups: product_groups[order.product_id] = [] product_groups[order.product_id].append(order) # 对每个产品组内的订单按发货日期排序并合单 for product_id, group_orders in product_groups.items(): # 按发货日期升序排序 group_orders.sort(key=lambda x: x.delivery_date) if len(group_orders) == 1: # 单个订单无需合单,直接作为一个任务 merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=group_orders[0].quantity, delivery_dates=[group_orders[0].delivery_date], capacity=group_orders[0].capacity, attr=group_orders[0].attr, original_orders=[group_orders[0]] )) continue # 合并逻辑:滑动窗口检查发货日期差 current_merge = [group_orders[0]] for i in range(1, len(group_orders)): # 计算当前订单与组内第一个订单的发货日期差 date_diff = (group_orders[i].delivery_date - current_merge[0].delivery_date).days if date_diff <= merge_days: current_merge.append(group_orders[i]) else: # 超过合单窗口,生成合并任务 merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in current_merge), delivery_dates=[o.delivery_date for o in current_merge], capacity=current_merge[0].capacity, attr=current_merge[0].attr, original_orders=current_merge )) merge_count += 1 # 记录一次合单 current_merge = [group_orders[i]] # 处理最后一组合单 if len(current_merge) >= 1: merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in current_merge), delivery_dates=[o.delivery_date for o in current_merge], capacity=current_merge[0].capacity, attr=current_merge[0].attr, original_orders=current_merge )) if len(current_merge) > 1: merge_count += 1 # 仅多订单合并才计数 return merged_tasks, merge_count def calculate_end_time(start_time, processing_hours, holidays): “”“计算考虑休息和放假后的生产结束时间”“” current = start_time remaining = processing_hours while remaining > 0: # 跳过放假日期 if current.date() in holidays: # 直接跳到第二天0点 current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) continue # 计算当前时间点 current_time = current.time() current_date = current.date() day_start = datetime.combine(current_date, time(0, 0)) # 计算剩余工作时间 work_hours_today = 24.0 for rest_start, rest_end in REST_PERIODS: work_hours_today -= (rest_end - rest_start) # 如果剩余工时小于一天工作量 if remaining <= work_hours_today: # 按小时累加,跳过休息时间 for hour in np.arange(0, 24, 0.1): # 检查是否在休息时间 in_rest = False for rest_start, rest_end in REST_PERIODS: if rest_start <= hour < rest_end: in_rest = True break if not in_rest: current += timedelta(hours=0.1) remaining -= 0.1 if remaining <= 0: return current else: # 直接消耗全天工作时间 remaining -= work_hours_today # 跳到第二天0点 current = datetime.combine(current_date + timedelta(days=1), time(0, 0)) return current def problem3_scheduling(merged_tasks, product_machines, product_attrs, machines, holidays): “”“问题三排程:考虑产品属性、机台关系及合单”“” # 排序:优先易搭配>难生产>无限制,同属性按最早发货日期排序 merged_tasks.sort(key=lambda x: ( x.attr, # 1:易搭配优先,3:难生产其次,2:无限制最后 min(x.delivery_dates) # 按最早发货日期排序 )) # 机台映射表(便于查询相邻机台) machine_map = {m.machine_id: m for m in machines} for task in merged_tasks: # 筛选可生产该产品的机台 candidate_machines = [ m for m in machines if m.machine_id in product_machines.get(task.product_id, []) ] # 易搭配产品优先分配M03机台 if task.attr == 1: m03 = next((m for m in candidate_machines if m.machine_id == "M03"), None) if m03: candidate_machines = [m03] + [m for m in candidate_machines if m.machine_id != "M03"] best_machine = None best_end = None best_start = None for machine in candidate_machines: # 计算换模时间(首单或不同产品需换模) change = CHANGE_TIME if machine.last_product != task.product_id else 0 initial_start = machine.available_time + timedelta(hours=change) end_time = calculate_end_time(initial_start, task.processing_hours, holidays) adjusted_start = initial_start # 难生产产品约束:相邻机台不能同时生产难生产产品 if task.attr == 3: for adj_id in machine.adjacent: adj_machine = machine_map.get(adj_id) if not adj_machine: continue # 检查相邻机台当前任务(如果有) if adj_machine.schedule: last_task = adj_machine.schedule[-1] # 如果相邻机台正在生产难生产产品且时间冲突 if last_task["product_attr"] == 3 and not (end_time <= last_task["start"] or adjusted_start >= last_task["end"]): # 调整开始时间至冲突任务结束后 adjusted_start = max(adjusted_start, last_task["end"]) end_time = calculate_end_time(adjusted_start, task.processing_hours, holidays) # 更新最优机台(最早结束时间) if best_end is None or end_time < best_end: best_end = end_time best_start = adjusted_start best_machine = machine # 分配机台并更新状态 if best_machine: task.start_time = best_start task.end_time = best_end task.machine = best_machine.machine_id # 更新原始订单的延期和满意度 for original_order in task.original_orders: original_order.start_time = best_start original_order.end_time = best_end original_order.machine = best_machine.machine_id original_order.delay_days = max(0, (best_end.date() - original_order.delivery_date.date()).days) # 按文档公式计算满意度 original_order.satisfaction = max(0, 10.0 - (original_order.delay_days // 3) * 0.1) # 更新机台状态 best_machine.available_time = best_end best_machine.last_product = task.product_id best_machine.schedule.append({ "product": task.product_id, "start": best_start, "end": best_end, "product_attr": task.attr, "original_orders": [o.order_id for o in task.original_orders] }) # 收集所有原始订单用于统计 all_original_orders = [] for task in merged_tasks: all_original_orders.extend(task.original_orders) # 统计结果 delay_orders = [o for o in all_original_orders if o.delay_days > 0] max_delay = max([o.delay_days for o in delay_orders]) if delay_orders else 0 avg_delay = round(np.mean([o.delay_days for o in delay_orders]) if delay_orders else 0, 1) avg_satisfaction = round(np.mean([o.satisfaction for o in all_original_orders]), 2) min_satisfaction = round(min([o.satisfaction for o in all_original_orders]), 2) return all_original_orders, { "延期订单总数": len(delay_orders), "最长延期天数": max_delay, "平均延期天数": avg_delay, "订单平均满意度": avg_satisfaction, "订单最差满意度": min_satisfaction } def export_problem3_results(results, output_path=r"C:\Users\彭婉\Desktop\2025-08\工作簿1.xlsx"): “”“导出问题三结果到Excel(按指定格式)”“” # 创建结果DataFrame result_df = pd.DataFrame(results) # 重命名列以符合要求 result_df.rename(columns={ "合单类别": "合单类别", "合单次数": "合单次数", "延期订单总数": "延期订单总数", "最长延期天数": "最长前期天数", # 注意这里按要求改为"最长前期天数" "平均延期天数": "平均延期天数", "订单平均满意度": "订单平均满意度", "订单最差满意度": "订单最差满意度" }, inplace=True) # 设置列顺序 column_order = [ "合单类别", "合单次数", "延期订单总数", "最长前期天数", "平均延期天数", "订单平均满意度", "订单最差满意度" ] result_df = result_df[column_order] # 导出到Excel with pd.ExcelWriter(output_path, engine="openpyxl") as writer: result_df.to_excel(writer, sheet_name="生产排程结果", index=False) print(f"结果已导出至:{os.path.abspath(output_path)}") return result_df def main(): # 加载数据 orders_df, product_machines, product_attrs, original_machines, holidays = load_official_data() # 检查数据是否加载成功 if orders_df.empty or not original_machines: print("数据加载失败,请检查文件路径和格式") return # 初始化原始订单对象 original_orders = [] for _, row in orders_df.iterrows(): try: order = Order( row["订单号PO"], row["产品号"], row["订单数量"], row["发货日期(DeliveryDate)"], row["Capacity(pcs/h)"], product_attrs.get(row["产品号"], 2) # 默认属性为2(无限制) ) original_orders.append(order) except Exception as e: print(f"订单初始化错误: {e} - 行数据: {row}") # 问题三:分别处理7天、15天、30天合单 merge_days_list = [7, 15, 30] problem3_results = [] for merge_days in merge_days_list: print(f"\n===== 处理 {merge_days} 天合单 =====") # 复制机台(避免状态污染) machines = [] for m in original_machines: # 正确复制机台初始时间 new_machine = Machine(m.machine_id, m.available_time) new_machine.adjacent = m.adjacent.copy() new_machine.last_product = m.last_product machines.append(new_machine) # 1. 合单 merged_tasks, merge_count = merge_orders(original_orders.copy(), merge_days) print(f"合单次数:{merge_count}") # 2. 排程 try: all_orders, stats = problem3_scheduling( merged_tasks, product_machines, product_attrs, machines, holidays ) except Exception as e: print(f"排程错误: {e}") continue # 3. 收集结果 problem3_results.append({ "合单类别": f"{merge_days}天合单", "合单次数": merge_count, "延期订单总数": stats["延期订单总数"], "最长延期天数": stats["最长延期天数"], "平均延期天数": stats["平均延期天数"], "订单平均满意度": stats["订单平均满意度"], "订单最差满意度": stats["订单最差满意度"] }) # 打印结果 print("\n===== 问题三最终结果 =====") result_df = export_problem3_results(problem3_results) # 控制台输出表格 print("\n生产排程结果表:") print(result_df.to_string(index=False)) if name == “main”: main() 补充修改一下这个代码,这个代码本身会输出一个内容,不要修改,它会输出的,但是增加一些,它可以多输出的一些东西,就满足我上述的这个要求,最终结果呈现应该要有订单号PO,产品号,需要工时 (Requested Time--Hours),生产计划开始时间(Plan Start Time),生产计划预计完成时间(Plan Finish Time),发货日期(DeliveryDate),订单数量,Weight/pcs(g),Capacity(pcs/h),最迟开始时间 (Late Start Time),合单序号(Joined Order Number),前置任务生产计划序号(Forward Plan Number),是否延期,延期天数

import os from docx import Document from openpyxl import Workbook from openpyxl.styles import Alignment from openpyxl.utils import get_column_letter def get_merged_regions(table): """获取表格所有合并区域信息""" merged = [] for merge in table._tbl.xpath('.//w:gridSpan | .//w:vMerge'): cell = merge.getparent().getparent() row_idx = int(cell.xpath('count(ancestor::w:tr/preceding-sibling::w:tr)')) col_idx = int(cell.xpath('count(preceding-sibling::w:tc)')) # 处理水平合并 if 'gridSpan' in merge.tag: colspan = int(merge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 1)) merged.append(('h', row_idx, col_idx, colspan)) # 处理垂直合并 elif 'vMerge' in merge.tag: if merge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val') != 'continue': rowspan = 1 next_row = row_idx + 1 while next_row < len(table.rows): next_cell = table.cell(next_row, col_idx)._tc if next_cell.xpath('.//w:vMerge'): v_merge_val = next_cell.xpath('.//w:vMerge')[0].get( '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val') if v_merge_val == 'continue': rowspan += 1 next_row += 1 else: break else: break merged.append(('v', row_idx, col_idx, rowspan)) return merged def get_table_data_range(table, start_text, end_text): """获取表格中从start_text到end_text之间的数据范围""" start_row = None end_row = None for row_idx, row in enumerate(table.rows): for cell in row.cells: if start_text in cell.text: start_row = row_idx if end_text in cell.text: end_row = row_idx if start_row is not None and end_row is not None: break return start_row, end_row def convert_table_to_excel(table, ws, start_row=None, end_row=None): """将Word表格转换为Excel工作表(保留合并和样式)""" # 确定处理的行范围 if start_row is None: start_row = 0 if end_row is None: end_row = len(table.rows) # 获取合并区域信息 merged_regions = get_merged_regions(table) # 创建数据矩阵,处理合并单元格的值 data_matrix = [] last_values = {} # 跟踪每列的最后一个非空值,处理合并单元格 for row_idx, row in enumerate(table.rows[start_row:end_row]): original_row_idx = start_row + row_idx row_data = [] for col_idx, cell in enumerate(row.cells): # 检查当前单元格是否属于垂直合并区域 is_in_merged = False for merge_type, m_row, m_col, m_span in merged_regions: if merge_type == 'v' and m_col == col_idx and m_row <= original_row_idx < m_row + m_span: is_in_merged = True # 如果是合并区域的起始行,使用当前单元格的值 if original_row_idx == m_row: value = cell.text.strip() last_values[col_idx] = value # 否则使用该列的最后一个非空值 else: value = last_values.get(col_idx, "") break # 如果不是合并区域,直接使用当前单元格的值 if not is_in_merged: value = cell.text.strip() last_values[col_idx] = value row_data.append(value) data_matrix.append(row_data) # 调整合并区域的行索引以匹配数据矩阵 adjusted_merged_regions = [] for merge_type, row, col, span in merged_regions: if start_row <= row < end_row: new_row = row - start_row adjusted_merged_regions.append((merge_type, new_row, col, span)) # 写入数据并合并单元格 for row_idx, row_data in enumerate(data_matrix): for col_idx, value in enumerate(row_data): cell = ws.cell(row=row_idx + 1, column=col_idx + 1, value=value) cell.alignment = Alignment(horizontal='center', vertical='center') # 应用合并区域 for merge_type, row, col, span in adjusted_merged_regions: if merge_type == 'h': start_col = col + 1 end_col = start_col + span - 1 ws.merge_cells(start_row=row + 1, start_column=start_col, end_row=row + 1, end_column=end_col) elif merge_type == 'v': start_row = row + 1 end_row = start_row + span - 1 ws.merge_cells(start_row=start_row, start_column=col + 1, end_row=end_row, end_column=col + 1) # 调整列宽(智能适应内容) for col_idx, _ in enumerate(data_matrix[0]): max_length = 0 for row in data_matrix: try: cell_value = row[col_idx] except IndexError: continue if cell_value and len(cell_value) > max_length: max_length = len(cell_value) adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[get_column_letter(col_idx + 1)].width = adjusted_width # 使用示例 path = './岗位说明书' os.makedirs("EXCEL", exist_ok=True) for file in os.listdir(path): if not file.endswith(('.docx', '.doc')): continue file_path = os.path.join(path, file) try: doc = Document(file_path) except Exception as e: print(f"无法处理文件 {file}: {e}") continue wb = Workbook() wb.remove(wb.active) # 查找目标表格 target_table = None start_row = None end_row = None for table in doc.tables: start, end = get_table_data_range(table, "主要职责及评价标准:", "岗位法律及规范风险点:") if start is not None: target_table = table start_row = start end_row = end break if target_table and start_row is not None: ws = wb.create_sheet("岗位职责") # 如果找到了结束标记,处理到结束标记前一行 if end_row is not None: convert_table_to_excel(target_table, ws, start_row, end_row) else: # 如果没有找到结束标记,处理从开始标记到表格末尾 convert_table_to_excel(target_table, ws, start_row) excel_name = os.path.splitext(file)[0] + "_转换结果.xlsx" wb.save(os.path.join("EXCEL", excel_name)) print(f"已成功转换: {excel_name}") else: print(f"未找到'主要职责及评价标准'表格或'岗位法律及规范风险点'标记: {file}") "工作职责及目的(描述该岗位主要活动及要达到的结果,每一应负责任请依其重要性排列)" 列中序号需要保留在内容中,“重要性”和"工作领域列"中如果为内容重复列就合并

最新推荐

recommend-type

perl-Pod-Coverage-TrustPod-0.100005-7.el8.tar.gz

# 适用操作系统:Centos8 #Step1、解压 tar -zxvf xxx.el8.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm
recommend-type

HTML时间格式化工具及测试页面介绍

标题 "BoolStudio.github.io" 暗示这是一个与GitHub相关的在线资源,具体来说是与BoolStudio相关的网页地址。GitHub是一个著名的代码托管平台,它支持Git版本控制系统,允许用户在云端存储和共享代码。BoolStudio可能是GitHub上的一个用户或组织账户名称,而该页面可能是他们托管的项目或个人页面的入口。 描述中的信息包含了HTML元素和JavaScript代码片段。这段描述展示了一个测试页文件的部分代码,涉及到HTML的标题(title)和内嵌框架(iframe)的使用,以及JavaScript中Date对象的扩展功能。 从描述中我们可以分析出以下知识点: 1. HTML标题(Title): 在HTML中,`<title>`标签用于定义网页的标题,它会显示在浏览器的标题栏或页面的标签上。在描述中出现了`<title>现在时间</title>`,这表明网页的标题被设置为了“现在时间”。 2. 微软时间: 这可能指的是在网页中嵌入微软产品的日期和时间显示。尽管这部分内容在描述中被删除了,但微软时间通常与Windows操作系统的日期和时间显示相关联。 3. iframe元素: `<iframe>`标签定义了一个内嵌框架,可以在网页中嵌入另一个文档。在描述中出现的是`<iframe src"></iframe>`,这表示创建了一个空的iframe元素,其src属性为空,实际上没有嵌入任何内容。通常src属性会被设置为另一个HTML文档的URL,用来在当前页面中显示外部页面的内容。 4. JavaScript日期格式化: 描述中包含了一段JavaScript代码,这段代码扩展了Date对象的功能,允许它根据提供的格式字符串(fmt)返回格式化的日期和时间。例如,如果fmt是'y年M月d日 h时m分s秒',则该函数会按照这个格式返回当前日期和时间。 具体到代码实现,以下步骤展示了如何在JavaScript中扩展Date对象并格式化日期: - 首先创建了一个对象o,该对象包含日期和时间的不同部分,例如年(y)、月(M)、日(d)、时(h)、分(m)、秒(s)。 - 使用正则表达式检查格式字符串fmt中是否包含年份的占位符(y+),如果存在则将其替换为四位数的年份,不足部分用0补齐。 - 使用for...in循环遍历对象o中的每一个键值对,并使用正则表达式测试这些键(如年、月、日等)是否在格式字符串中存在。如果存在,则替换为相应的日期时间值。 这个知识点展示了在JavaScript中如何自定义对象的方法,以及如何使用正则表达式进行字符串的搜索和替换。 标签 "HTML" 表明这个文件主要涉及到超文本标记语言(HTML),这是构建网页内容的标准标记语言,它允许开发者使用标签来定义网页的结构和内容。HTML文件通常包含各种元素,如段落、链接、图片、表格和表单等。 压缩包子文件的文件名称列表中的 "BoolStudio.github.io-main" 可能指的是一个包含 "BoolStudio.github.io" 网站主要文件的压缩包。通常,网站项目会包含多个文件和资源,为了便于管理和分发,开发者会将这些文件打包成一个压缩文件。在本例中,"main" 可能表示该压缩包包含了网站的主要文件和代码。 通过对给定文件信息的分析,我们可以了解到网页设计和开发的相关知识点,特别是HTML文档结构、JavaScript对象和函数扩展以及如何在网站项目中组织文件。这些知识对于前端开发者来说是非常重要的,能够帮助他们创建更加动态和功能丰富的网页应用。
recommend-type

Elixir测试:从用例执行到覆盖率分析

### Elixir 测试:从用例执行到覆盖率分析 #### 1. 测试用例执行 在测试中,以 `Test` 结尾且包含 `use ExUnit` 的模块被视为测试用例。测试用例本质上是设置回调和测试的集合。测试用例的执行方式有两种:并发执行或顺序执行。使用 `async: true` 选项的测试用例将并发执行,其余的则按定义顺序(默认随机)顺序执行。 测试用例的执行流程如下: 1. 执行所有 `setup_all` 回调。 2. 执行所有测试及其特定的设置和拆卸回调。 `setup_all` 回调按定义顺序在同一进程中执行。在运行任何测试之前,测试用例将生成一个进程,在该进程中运行所有
recommend-type

Android Studio 时间延时

<think> 我们参考多个引用,总结在Android Studio中实现时间延时的方法。根据引用[1]和[2],我们知道有几种常见的延时方法,包括使用Handler的postDelayed方法、使用Timer和TimerTask以及使用Thread.sleep。引用[3]和[5]则展示了具体代码示例。 具体方法如下: 1. **使用Handler的postDelayed方法**(推荐在主线程中使用,避免直接操作UI线程的问题): ```java new Handler().postDelayed(new Runnable() { @Override
recommend-type

IMS Open Corpus Workbench:打造高效大型文本语料库管理工具

IMS Open Corpus Workbench(以下简称CWB)是一个强大的开源工具集,它专门用于管理和查询大型的、带有语言注释的文本语料库。这项工具有着广泛的应用领域,包括语言学研究、自然语言处理、人文科学研究等。 ### 标题知识点: #### 大型文本语料库的索引和查询工具 大型文本语料库指的是含有大量文本数据的数据库,其中包含的文本量通常以百万计。这些数据可能是书面文本、口语录音文字转写等形式。对于如此庞大的数据集,索引是必要的,它可以帮助研究者快速定位到感兴趣的片段,而查询工具则提供了从这些大量数据中提取特定信息的能力。 #### 开源 CWB作为一个开源工具,意味着其源代码对所有人开放,并且可以免费使用和修改。开源项目通常是由社区驱动,有着活跃的开发者和用户群体,不断对工具进行改进和拓展。这种模式促进了创新,并且有利于长期维护和升级。 ### 描述知识点: #### 管理和查询带有语言注释的文本 在语料库中,文本数据经常会被加上各种形式的语言注释,比如句法结构、词性标注、语义角色等。CWB支持管理这类富含语言信息的语料库,使其不仅仅保存原始文本信息,还整合了深层的语言知识。此外,CWB提供了多种查询语言注释数据的方式,使得用户可以针对特定的注释信息进行精确查询。 #### 核心组件:CQP(Corpus Query Processor) CQP是CWB中的核心组件,是一个高度灵活和高效的查询处理器。它支持在终端会话中交互式地使用,这为熟悉命令行界面的用户提供了一个强大的工具。同时,CQP也可以嵌入到其他程序中,比如Perl脚本,从而提供编程式的语料库访问方式。这为高级用户提供了一个强大的平台,可以编写复杂的查询,并将查询结果集成到其他程序中。 #### 基于Web的GUI CQPweb 除了命令行界面外,CWB还提供了一个基于Web的图形用户界面CQPweb,使得不熟悉命令行的用户也能够方便地使用CWB的强大功能。CQPweb通常允许用户通过网页直接构建查询,并展示查询结果,极大地降低了使用门槛。 ### 标签知识点: #### 开源软件 CWB作为开源软件,其主要特点和优势包括: - **社区支持**:开放源代码鼓励了全球开发者共同参与,提供错误修正、功能增强、新特性开发等。 - **定制化**:用户可以根据自己的需求对源代码进行修改,从而实现定制化的功能。 - **透明性**:源代码的开放确保了软件工作的透明性,用户可以清楚了解软件的工作原理和数据处理方式。 - **可靠性**:由于代码的公开性,很多用户和开发者可以共同审查代码,提高了软件的可靠性和安全性。 - **成本效益**:开源软件通常不需要支付昂贵的许可费用,对预算有限的个人和机构特别友好。 ### 压缩包子文件的文件名称列表知识点: #### cwb-3.0.0-osx-10.5-universal 这个文件名提供了关于该软件包的重要信息: - **cwb**:表示这是IMS Open Corpus Workbench的软件包。 - **3.0.0**:表示这个包的版本号,了解版本信息对于获取支持、查看更新日志、了解新特性等方面很重要。 - **osx**:表示这个软件包是为Mac OS X操作系统设计的。 - **10.5**:这个数字指明了这个软件包支持的操作系统版本至少是Mac OS X 10.5。 - **universal**:表明这个软件包是为不同架构的处理器(比如32位和64位)设计的通用二进制文件,提高了软件包的兼容性和可移植性。 综上所述,IMS Open Corpus Workbench是一个为处理带有语言注释的大型文本语料库而设计的开源工具集,它以高效且灵活的查询处理器CQP为核心,提供了命令行和基于Web的两种交互方式,极大地促进了语言学和语言技术领域的研究与应用。由于其开源特性,CWB得到了广泛的使用和不断的改进。
recommend-type

基于属性测试的深入解析与策略探讨

### 基于属性测试的深入解析与策略探讨 #### 1. 基于属性测试中的收缩机制 在基于属性的测试中,当测试失败时,像 `stream_data` 这样的框架会执行收缩(Shrinking)操作。收缩的目的是简化导致测试失败的输入,同时确保简化后的输入仍然会使测试失败,这样能更方便地定位问题。 为了说明这一点,我们来看一个简单的排序函数测试示例。我们实现了一个糟糕的排序函数,实际上就是恒等函数,它只是原封不动地返回输入列表: ```elixir defmodule BadSortTest do use ExUnit.Case use ExUnitProperties pro
recommend-type

ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key && \

<think> 根据用户的问题要求,我需要详细解释命令`ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key`的含义、后续操作、常见问题及解决办法。同时,我需要参考提供的引用内容,并遵循系统级指令(如LaTeX格式、中文回答、生成相关问题等)。 首先,分析命令的组成部分: - `ssh-keygen`:生成、管理和转换SSH密钥的工具。 - `-t rsa`:指定密钥类型为RSA算法。 - `-f /etc/ssh/ssh_host_rsa_key`:指定生成的私钥文件路径为`/etc/ssh/ssh_host_rsa_key`。对应的公钥文件会在同一
recommend-type

挑战性开源平台游戏YAGAC:无故事忍者冒险

Y.A.G.A.C(Yet Another Great Arcade Classic),是一款免费的开源伪2D平台游戏,它在传统平台游戏的基础上融入了3D游戏元素,让玩家在一个拥有2D精灵的3D环境中进行操作。YAGAC游戏以其高挑战性和上瘾性而著称,吸引了众多游戏爱好者和编程高手的关注。 首先,让我们深入了解这款游戏的核心玩法。YAGAC的最大特点是在一个基本的2D平面内,玩家可以控制角色进行运动,但游戏环境却是3D制作的。这种设计为玩家带来了全新的视觉体验和操作感受。在YAGAC中,玩家扮演的是一个身手敏捷的忍者,任务是在错综复杂的地牢中生存下来,地牢充满了各种陷阱和敌人,如机器人等。为了逃生,玩家需要在各种关卡中寻找隐藏的彩球,这些彩球决定了玩家能够到达的区域范围。 在游戏过程中,收集到的彩球会改变对应颜色平台的属性,使原本脆弱的平台变得牢固,从而为玩家打开新的道路。这样的设计不仅考验玩家的反应和速度,还考验他们的策略和记忆能力。YAGAC的游戏关卡设计非常巧妙,经常需要玩家反复尝试,每一次尝试都可能发现新的线索和策略,这样的设计增加了游戏的重复可玩性。 YAGAC使用的引擎在游戏流畅性方面表现出色,这也是游戏的一大强项。一款游戏引擎的强大与否直接关系到游戏体验的好坏,YAGAC的开发团队选择或者开发了一个能够高效处理3D图形和2D动作的引擎,确保了游戏在各种配置的计算机上都能保持良好的运行状态和响应速度。 接下来,我们来探讨YAGAC的开源属性。由于YAGAC是开源的,这意味着游戏的源代码是开放的,任何个人或组织都可以访问、修改并重新分发该软件。开源软件通常由社区维护,并且鼓励用户贡献代码,共同改进游戏。对于像YAGAC这样的游戏来说,开源可以吸引更多的开发者参与进来,共同完善游戏体验。玩家和开发者可以对游戏进行本地化、修改游戏机制,甚至是增加新的内容和关卡。 开源平台游戏的概念不仅限于提供免费的游戏体验,它还鼓励创新和协作。开源文化促进知识共享和技术进步,这在软件开发领域尤为重要。同时,玩家也可以从中学到编程知识,因为游戏的开源性质允许用户查看和学习游戏背后的代码。 最后,关于YAGAC的文件名称“yagac”,这很可能是游戏的主程序文件名或者主目录名。当玩家在计算机上进行下载和安装时,可能会遇到这样的文件名,识别这一点有助于玩家快速定位和管理游戏文件。 总的来说,Y.A.G.A.C.作为一款开源平台游戏,在游戏玩法设计、开源文化和3D游戏引擎的流畅性等方面,都显示出其独特的优势。游戏的挑战性和上瘾性,以及社区驱动的开发模式,都为游戏爱好者和开源爱好者提供了一个理想的游戏和学习平台。对于对游戏开发感兴趣的玩家来说,YAGAC无疑是一个研究和实践的良好起点。
recommend-type

状态化属性测试与测试数据随机化及测试生命周期解析

# 状态化属性测试与测试数据随机化及测试生命周期解析 ## 状态化属性测试 ### 状态化系统测试的引入 之前我们主要探讨了针对纯无状态函数的属性测试,这类函数接收输入并返回输出。然而,属性测试同样适用于有状态系统的测试。有状态系统是指那些会携带状态的系统,例如数据库。 在之前的示例中,我们仅使用属性测试来生成数据,然后将其输入到代码中并对结果进行断言。但对于有状态系统,情况有所不同:我们需要处理状态的设置,并仅在系统处于特定状态时执行某些操作。 ### 对有状态系统进行建模 我们可以利用属性测试框架生成随机数据的能力,为有状态系统生成随机命令。例如,若有状态系统是数据库,我们就能生成针
recommend-type

icpc英语字典

<think> 首先,用户的问题是:“我想查找icpc相关的英语字典 请问icpc英语专业字典”。用户想查找与ICPC(International Collegiate Programming Contest)相关的英语字典,特别是专业字典。 现在,回顾系统指令: - 所有行内数学表达式必须使用$...$格式。 - 独立公式使用$$...$$。 - LaTeX语法正确。 - 使用中文回答。 - 生成相关问题。 - 在回答中引用的段落末尾自然地添加引用标识。 - 回答结构清晰,帮助用户逐步解决问题。 - 保持真实可靠。 用户提供了几个引用: - 引用[1]:关于填字游戏和寻找变形词的程序。