file-type

深入解析H-store并行数据库的四篇核心论文

ZIP文件

5星 · 超过95%的资源 | 下载需积分: 20 | 2.73MB | 更新于2025-04-09 | 38 浏览量 | 6 下载量 举报 收藏
download 立即下载
H-store是一种专为现代数据密集型工作负载而设计的分布式事务数据库系统。它采用了一种新的架构,旨在提高事务处理的性能,尤其是在高并发环境下。H-store的四篇论文主要介绍了该数据库系统的并行机制和架构设计,为数据库领域带来了新的设计理念和技术革新。以下将详细探讨这些论文中介绍的知识点。 首先,H-store的并行机制是其核心特点之一。在传统的数据库系统中,事务处理往往是串行的,这限制了系统的并发性能。H-store通过采用基于无共享架构的设计,能够将事务分配到不同的节点上并行处理。这种设计使得系统可以充分利用多核处理器的能力,并实现真正的水平扩展。论文中详细阐述了H-store如何实现事务的分区,以及在分区过程中如何保证事务的一致性和隔离性。 其次,H-store架构设计的目标是提供高性能的事务处理能力,尤其是在在线事务处理(OLTP)场景中。H-store采用了细粒度的锁和冲突检测机制,减少了锁竞争,并提高了并行性。此外,H-store中的存储引擎使用列式存储,这可以提高查询性能,尤其是在处理只涉及部分列的数据查询时。论文中对H-store的存储引擎、查询执行器和事务协调器的设计进行了详细的描述。 另外,H-store还引入了一些创新的概念,比如原生多核并行处理和基于时间戳的冲突解决机制。原生多核并行处理确保了事务的执行能够充分利用现代硬件架构的优势,而时间戳机制则用于解决在并行环境中可能发生的冲突。H-store通过这种方式,显著减少了数据库操作的延迟,提高了吞吐量。 H-store的论文还讨论了数据分区策略和复制机制。良好的数据分区可以有效地平衡各个节点的负载,防止出现性能瓶颈。H-store的分区策略通常基于数据的访问模式和事务类型,以实现最优的负载均衡。同时,H-store的复制机制不仅确保了数据的高可用性,还在发生故障时保证了系统的快速恢复。 H-store的性能优化和故障恢复机制也是论文中讨论的重要内容。论文中介绍了H-store如何通过预写日志(WAL)和快照技术来实现故障恢复,并保证数据的持久性。同时,H-store使用了高效的索引技术和执行计划缓存机制来优化查询性能。 H-store系统之所以能处理大规模并发事务,很大程度上得益于其轻量级进程的设计。每个事务在H-store中都是以轻量级进程的方式运行,这有助于减少资源开销,同时提供更好的任务调度和响应能力。 总结来说,H-store的四篇论文涉及到了数据库系统架构、并行处理、事务管理、存储优化、故障恢复以及性能优化等多个方面。通过这些论文的学习,可以深入了解H-store的设计思想、实现技术和应用场景。H-store的设计对数据库领域的研究者和开发者们提供了宝贵的参考,并且为未来数据库系统的发展指明了方向。随着大数据时代的发展,H-store这样的高效并行数据库系统将越来越受到重视。

相关推荐

filetype

import numpy as np import matplotlib.pyplot as plt from scipy.spatial import cKDTree from tqdm import tqdm import os import sys import time import argparse import multiprocessing from collections import defaultdict import csv import matplotlib as mpl import warnings import re import platform 忽略可能的警告 warnings.filterwarnings(“ignore”, category=UserWarning) 原子类型映射 (根据Masses部分动态更新) ATOM_TYPE_MAPPING = {} TYPE_SYMBOL_MAPPING = {} 专业绘图设置 plt.style.use(‘seaborn-v0_8-whitegrid’) mpl.rcParams.update({ ‘font.family’: ‘serif’, ‘font.serif’: [‘Times New Roman’, ‘DejaVu Serif’], ‘font.size’: 12, ‘axes.labelsize’: 14, ‘axes.titlesize’: 16, ‘xtick.labelsize’: 12, ‘ytick.labelsize’: 12, ‘figure.dpi’: 600, ‘savefig.dpi’: 600, ‘figure.figsize’: (8, 6), ‘lines.linewidth’: 2.0, ‘legend.fontsize’: 10, ‘legend.framealpha’: 0.8, ‘mathtext.default’: ‘regular’, ‘axes.linewidth’: 1.5, ‘xtick.major.width’: 1.5, ‘ytick.major.width’: 1.5, ‘xtick.major.size’: 5, ‘ytick.major.size’: 5, }) def wrap_coordinates(coords, box): “”“将坐标映射到周期性盒子内 [0, box] 区间”“” wrapped_coords = coords.copy() for dim in range(3): if box[dim] > 0: # 将坐标调整到 [0, box[dim]) 区间 wrapped_coords[:, dim] = wrapped_coords[:, dim] % box[dim] return wrapped_coords def get_hbond_config(): “”“返回氢键配置,包含距离和角度阈值”“” return [ { “name”: “P-O/P=O···Hw”, “donor_type”: “water_oxygens”, “acceptor_type”: “P-O/P=O”, “h_type”: “water_hydrogens”, “distance_threshold”: 2.375, “angle_threshold”: 143.99, “color”: “#1f77b4” }, { “name”: “P-O/P=O···Hh”, “donor_type”: “hydronium_oxygens”, “acceptor_type”: “P-O/P=O”, “h_type”: “hydronium_hydrogens”, “distance_threshold”: 2.275, “angle_threshold”: 157.82, “color”: “#ff7f0e” }, { “name”: “P-O/P=O···Hp”, “donor_type”: “P-OH”, “acceptor_type”: “P-O/P=O”, “h_type”: “phosphate_hydrogens”, “distance_threshold”: 2.175, “angle_threshold”: 155.00, “color”: “#2ca02c” }, { “name”: “P-OH···Ow”, “donor_type”: “P-OH”, “acceptor_type”: “water_oxygens”, “h_type”: “phosphate_hydrogens”, “distance_threshold”: 2.275, “angle_threshold”: 155.13, “color”: “#d62728” }, { “name”: “Hw···Ow”, “donor_type”: “water_oxygens”, “acceptor_type”: “water_oxygens”, “h_type”: “water_hydrogens”, “distance_threshold”: 2.375, “angle_threshold”: 138.73, “color”: “#9467bd” }, { “name”: “Hh···Ow”, “donor_type”: “hydronium_oxygens”, “acceptor_type”: “water_oxygens”, “h_type”: “hydronium_hydrogens”, “distance_threshold”: 2.225, “angle_threshold”: 155.31, “color”: “#8c564b” }, { “name”: “Hw···F”, “donor_type”: “water_oxygens”, “acceptor_type”: “fluoride_atoms”, “h_type”: “water_hydrogens”, “distance_threshold”: 2.225, “angle_threshold”: 137.68, “color”: “#e377c2” }, { “name”: “Hh···F”, “donor_type”: “hydronium_oxygens”, “acceptor_type”: “fluoride_atoms”, “h_type”: “hydronium_hydrogens”, “distance_threshold”: 2.175, “angle_threshold”: 154.64, “color”: “#7f7f7f” }, { “name”: “Hp···F”, “donor_type”: “P-OH”, “acceptor_type”: “fluoride_atoms”, “h_type”: “phosphate_hydrogens”, “distance_threshold”: 2.275, “angle_threshold”: 153.71, “color”: “#bcbd22” } ] def calculate_angle(coords, lattice, donor_idx, h_idx, acceptor_idx): “”“计算D-H···A键角 (角度制),使用笛卡尔向量表示并处理周期性”“” # 获取分数坐标 frac_coords = coords # 获取氢原子H的分数坐标 h_frac = frac_coords[h_idx] # 计算供体D相对于H的分数坐标差 d_frac = frac_coords[donor_idx] dh_frac = d_frac - h_frac # 计算受体A相对于H的分数坐标差 a_frac = frac_coords[acceptor_idx] ah_frac = a_frac - h_frac # 应用周期性修正 (将分数坐标差限制在[-0.5, 0.5]范围内) dh_frac = np.where(dh_frac > 0.5, dh_frac - 1, dh_frac) dh_frac = np.where(dh_frac < -0.5, dh_frac + 1, dh_frac) ah_frac = np.where(ah_frac > 0.5, ah_frac - 1, ah_frac) ah_frac = np.where(ah_frac < -0.5, ah_frac + 1, ah_frac) # 转换为笛卡尔向量 (H->D 和 H->A) vec_hd = np.dot(dh_frac, lattice) # H->D向量 vec_ha = np.dot(ah_frac, lattice) # H->A向量 # 计算向量点积 dot_product = np.dot(vec_hd, vec_ha) # 计算向量模长 norm_hd = np.linalg.norm(vec_hd) norm_ha = np.linalg.norm(vec_ha) # 避免除以零 if norm_hd < 1e-6 or norm_ha < 1e-6: return None # 计算余弦值 cos_theta = dot_product / (norm_hd * norm_ha) # 处理数值误差 cos_theta = np.clip(cos_theta, -1.0, 1.0) # 计算角度 (弧度转角度) angle_rad = np.arccos(cos_theta) angle_deg = np.degrees(angle_rad) return angle_deg def identify_atom_types(atom_types, coords, lattice, box): “”“原子类型识别函数”“” # 初始化数据结构 atom_categories = { “phosphate_oxygens”: {“P-O/P=O”: [], “P-OH”: []}, “phosphate_hydrogens”: [], “water_oxygens”: [], “water_hydrogens”: [], “hydronium_oxygens”: [], “hydronium_hydrogens”: [], “fluoride_atoms”: [i for i, a_type in enumerate(atom_types) if a_type == “F”], “aluminum_atoms”: [i for i, a_type in enumerate(atom_types) if a_type == “Al”] } # 将坐标映射到盒子内 [0, box] 区间 wrapped_coords = wrap_coordinates(coords, box) # 构建全局KDTree kdtree = cKDTree(wrapped_coords, boxsize=box) # 识别磷酸基团 p_atoms = [i for i, a_type in enumerate(atom_types) if a_type == "P"] phosphate_oxygens = [] for p_idx in p_atoms: # 查找P周围的O原子 (距离 < 1.6Å) neighbors = kdtree.query_ball_point(wrapped_coords[p_idx], r=1.6) p_o_indices = [idx for idx in neighbors if idx != p_idx and atom_types[idx] == "O"] phosphate_oxygens.extend(p_o_indices) # 识别所有H原子并确定归属 hydrogen_owners = {} h_atoms = [i for i, a_type in enumerate(atom_types) if a_type == "H"] for h_idx in h_atoms: neighbors = kdtree.query_ball_point(wrapped_coords[h_idx], r=1.2) candidate_os = [idx for idx in neighbors if idx != h_idx and atom_types[idx] == "O"] if not candidate_os: continue min_dist = float('inf') owner_o = None for o_idx in candidate_os: # 使用映射后的坐标计算距离 dist = np.linalg.norm(wrapped_coords[h_idx] - wrapped_coords[o_idx]) if dist < min_dist: min_dist = dist owner_o = o_idx hydrogen_owners[h_idx] = owner_o # 分类磷酸氧:带H的为P-OH,不带H的为P-O/P=O for o_idx in phosphate_oxygens: has_hydrogen = any(owner_o == o_idx for h_idx, owner_o in hydrogen_owners.items()) if has_hydrogen: atom_categories["phosphate_oxygens"]["P-OH"].append(o_idx) else: atom_categories["phosphate_oxygens"]["P-O/P=O"].append(o_idx) # 识别水和水合氢离子 all_o_indices = [i for i, a_type in enumerate(atom_types) if a_type == "O"] non_phosphate_os = [o_idx for o_idx in all_o_indices if o_idx not in phosphate_oxygens] o_h_count = {} for h_idx, owner_o in hydrogen_owners.items(): o_h_count[owner_o] = o_h_count.get(owner_o, 0) + 1 for o_idx in non_phosphate_os: h_count = o_h_count.get(o_idx, 0) attached_hs = [h_idx for h_idx, owner_o in hydrogen_owners.items() if owner_o == o_idx] if h_count == 2: atom_categories["water_oxygens"].append(o_idx) atom_categories["water_hydrogens"].extend(attached_hs) elif h_count == 3: atom_categories["hydronium_oxygens"].append(o_idx) atom_categories["hydronium_hydrogens"].extend(attached_hs) # 识别磷酸基团的H原子 for o_idx in atom_categories["phosphate_oxygens"]["P-OH"]: attached_hs = [h_idx for h_idx, owner_o in hydrogen_owners.items() if owner_o == o_idx] atom_categories["phosphate_hydrogens"].extend(attached_hs) return atom_categories def find_hbonds_in_frame(atom_types, coords, lattice, box, atom_categories, hbond_config): “”“在当前帧中寻找所有氢键”“” hbond_results = # 将坐标映射到盒子内 [0, box] 区间 wrapped_coords = wrap_coordinates(coords, box) # 构建全局KDTree kdtree = cKDTree(wrapped_coords, boxsize=box) # 处理每一类氢键 for hbond in hbond_config: # 获取供体原子列表 if hbond["donor_type"] == "P-OH": donors = atom_categories["phosphate_oxygens"]["P-OH"] else: donors = atom_categories[hbond["donor_type"]] # 获取受体原子列表 if hbond["acceptor_type"] == "P-O/P=O": acceptors = atom_categories["phosphate_oxygens"]["P-O/P=O"] elif hbond["acceptor_type"] == "P-OH": acceptors = atom_categories["phosphate_oxygens"]["P-OH"] else: acceptors = atom_categories[hbond["acceptor_type"]] # 获取氢原子列表 hydrogens = atom_categories[hbond["h_type"]] # 如果没有氢原子或受体,跳过 if len(hydrogens) == 0 or len(acceptors) == 0: continue # 为受体构建KDTree acceptor_coords = wrapped_coords[acceptors] acceptor_kdtree = cKDTree(acceptor_coords, boxsize=box) # 遍历所有氢原子 for h_idx in hydrogens: h_coords = wrapped_coords[h_idx] # 查找与H成键的供体 (距离 < bond_threshold) donor_neighbors = kdtree.query_ball_point(h_coords, r=1.3) donor_candidates = [idx for idx in donor_neighbors if idx in donors] # 如果没有找到供体,跳过 if not donor_candidates: continue # 选择最近的供体 min_dist = float('inf') donor_idx = None for d_idx in donor_candidates: dist = np.linalg.norm(wrapped_coords[h_idx] - wrapped_coords[d_idx]) if dist < min_dist: min_dist = dist donor_idx = d_idx # 查找在阈值内的受体 acceptor_indices = acceptor_kdtree.query_ball_point(h_coords, r=hbond["distance_threshold"]) for a_idx_offset in acceptor_indices: a_idx = acceptors[a_idx_offset] # 排除供体自身 if a_idx == donor_idx: continue # 计算键角 (使用原始坐标) angle = calculate_angle(coords, lattice, donor_idx, h_idx, a_idx) if angle is not None and angle >= hbond["angle_threshold"]: # 记录氢键 (供体, 氢, 受体) hbond_results[hbond["name"]].append((donor_idx, h_idx, a_idx)) return hbond_results def read_data_file(data_file): “”“读取LAMMPS data文件,支持Materials Studio格式”“” atom_types = [] coords = [] box = np.zeros(3) # 初始化状态机 section = None box_dims = [] masses_parsed = False with open(data_file, 'r') as f: for line in f: stripped = line.strip() # 跳过空行 if not stripped: continue # 检测部分标题 if "xlo xhi" in stripped: parts = stripped.split() if len(parts) >= 2: try: xlo = float(parts[0]) xhi = float(parts[1]) box_dims.append(xhi - xlo) except ValueError: pass section = "box" elif "ylo yhi" in stripped: parts = stripped.split() if len(parts) >= 2: try: ylo = float(parts[0]) yhi = float(parts[1]) box_dims.append(yhi - ylo) except ValueError: pass elif "zlo zhi" in stripped: parts = stripped.split() if len(parts) >= 2: try: zlo = float(parts[0]) zhi = float(parts[1]) box_dims.append(zhi - zlo) except ValueError: pass elif stripped.startswith("Masses"): section = "masses" continue elif stripped.startswith("Atoms"): section = "atoms" # 下一行可能是标题或空行 continue # 解析内容 if section == "masses": # 解析质量行: "1 1.00800000 # H" if stripped and stripped[0].isdigit(): parts = re.split(r'\s+', stripped, 2) if len(parts) >= 2: try: atom_type_num = int(parts[0]) mass = float(parts[1]) # 尝试从注释中提取元素符号 symbol = "Unknown" if len(parts) > 2 and "#" in parts[2]: comment = parts[2].split("#")[1].strip() if comment: symbol = comment # 更新原子类型映射 ATOM_TYPE_MAPPING[atom_type_num] = symbol TYPE_SYMBOL_MAPPING[symbol] = atom_type_num except (ValueError, IndexError): pass elif section == "atoms": # 解析原子行: "1 7 17.470000000000 55.085000000000 14.227000000000" if stripped and stripped[0].isdigit(): parts = re.split(r'\s+', stripped) if len(parts) >= 5: # 至少需要id, type, x, y, z try: # 第一列是原子ID,第二列是原子类型 atom_id = int(parts[0]) atom_type_num = int(parts[1]) x = float(parts[2]) y = float(parts[3]) z = float(parts[4]) # 获取原子符号 atom_type = ATOM_TYPE_MAPPING.get(atom_type_num, "Unknown") atom_types.append(atom_type) coords.append([x, y, z]) except (ValueError, IndexError): print(f"Warning: Skipping invalid atom line: {stripped}") # 设置盒子尺寸 if len(box_dims) == 3: box = np.array(box_dims) else: print("Warning: Could not parse box dimensions. Using default values.") box = np.array([100.0, 100.0, 100.0]) # 默认盒子尺寸 # 确保我们有原子类型映射 if not ATOM_TYPE_MAPPING: # 使用默认映射 default_mapping = {1: "H", 2: "O", 3: "F", 7: "P"} ATOM_TYPE_MAPPING.update(default_mapping) for num, sym in default_mapping.items(): TYPE_SYMBOL_MAPPING[sym] = num return np.array(atom_types), np.array(coords), box def read_lammpstrj_frame(file_handle): “”“改进版:从LAMMPS轨迹文件中读取一帧”“” frame_data = {} # 读取时间步 line = file_handle.readline().strip() if not line: # 文件结束 return None if "ITEM: TIMESTEP" not in line: # 尝试寻找下一帧的开始 while line and "ITEM: TIMESTEP" not in line: line = file_handle.readline().strip() if not line: return None timestep_line = file_handle.readline().strip() if not timestep_line.isdigit(): # 尝试跳过非数字行 while timestep_line and not timestep_line.isdigit(): timestep_line = file_handle.readline().strip() timestep = int(timestep_line) if timestep_line.isdigit() else 0 # 寻找原子数量行 line = file_handle.readline().strip() while line and "NUMBER OF ATOMS" not in line: line = file_handle.readline().strip() if not line: return None num_atoms_line = file_handle.readline().strip() if not num_atoms_line.isdigit(): # 尝试跳过非数字行 while num_atoms_line and not num_atoms_line.isdigit(): num_atoms_line = file_handle.readline().strip() num_atoms = int(num_atoms_line) if num_atoms_line.isdigit() else 0 # 寻找盒子边界行 line = file_handle.readline().strip() while line and "BOX BOUNDS" not in line: line = file_handle.readline().strip() if not line: return None # 读取三行盒子数据 box_lines = [] for _ in range(3): box_line = file_handle.readline().split() if len(box_line) >= 2: try: lo = float(box_line[0]) hi = float(box_line[1]) box_lines.append((lo, hi)) except ValueError: box_lines.append((0.0, 0.0)) else: box_lines.append((0.0, 0.0)) box = np.array([hi - lo for lo, hi in box_lines]) # 寻找原子数据行 line = file_handle.readline().strip() while line and "ATOMS" not in line: line = file_handle.readline().strip() if not line: return None # 解析列标题 headers = line.split()[2:] # 跳过"ITEM: ATOMS" column_indices = {} for i, header in enumerate(headers): if header in ["id", "type", "x", "y", "z"]: column_indices[header] = i # 读取原子数据 atom_types = [] coords = [] for _ in range(num_atoms): parts = file_handle.readline().split() if len(parts) < max(column_indices.values()) + 1: continue atom_id = int(parts[column_indices.get("id", 0)]) atom_type_num = int(parts[column_indices.get("type", 1)]) x = float(parts[column_indices.get("x", 2)]) y = float(parts[column_indices.get("y", 3)]) z = float(parts[column_indices.get("z", 4)]) atom_type = ATOM_TYPE_MAPPING.get(atom_type_num, "Unknown") atom_types.append(atom_type) coords.append([x, y, z]) if len(atom_types) != num_atoms: print(f"Warning: Expected {num_atoms} atoms, found {len(atom_types)}") frame_data = { "timestep": timestep, "atom_types": np.array(atom_types), "coords": np.array(coords), "box": box } return frame_data def calculate_hbond_lifetime(hbond_events, delta_t, max_tau): “”“计算氢键寿命相关函数”“” # 初始化相关函数数组 corr_func = np.zeros(max_tau) norm_factor = np.zeros(max_tau) # 计算时间原点数量 num_origins = len(hbond_events) - max_tau # 遍历所有可能的时间原点 for t0 in range(0, num_origins, delta_t): # 获取当前时间原点存在的氢键 current_hbonds = set(hbond_events[t0]) # 遍历时间间隔 for tau in range(max_tau): t = t0 + tau if t >= len(hbond_events): break # 计算在时间t仍然存在的氢键数量 surviving = current_hbonds.intersection(hbond_events[t]) corr_func[tau] += len(surviving) norm_factor[tau] += len(current_hbonds) # 归一化相关函数 for tau in range(max_tau): if norm_factor[tau] > 0: corr_func[tau] /= norm_factor[tau] else: corr_func[tau] = 0.0 return corr_func def integrate_correlation_function(corr_func, dt): “”“积分相关函数计算弛豫时间”“” # 使用梯形法积分 integral = np.trapz(corr_func, dx=dt) return integral def process_frame(frame_data): “”“处理单帧数据”“” atom_types = frame_data[“atom_types”] coords = frame_data[“coords”] box = frame_data[“box”] # 创建晶格矩阵 (正交盒子) lattice = np.diag(box) # 识别原子类别 atom_categories = identify_atom_types(atom_types, coords, lattice, box) # 获取氢键配置 hbond_config = get_hbond_config() # 查找氢键 hbond_results = find_hbonds_in_frame(atom_types, coords, lattice, box, atom_categories, hbond_config) return hbond_results def test_first_frame(data_file, traj_file): “”“测试模式:只处理第一帧,包含详细调试信息”“” print(“Running in test mode: analyzing first frame only”) try: # 读取data文件获取初始原子类型和坐标 atom_types, coords, box = read_data_file(data_file) print(f"Successfully read data file: {data_file}") print(f"Box dimensions: {box}") print(f"Number of atoms: {len(atom_types)}") # 打印原子类型映射 print("\nAtom Type Mapping:") for type_num, symbol in ATOM_TYPE_MAPPING.items(): print(f"Type {type_num} -> {symbol}") # 创建晶格矩阵 (正交盒子) lattice = np.diag(box) # 识别原子类别 atom_categories = identify_atom_types(atom_types, coords, lattice, box) # 打印原子类别统计 print("\nAtom Categories:") for cat, values in atom_categories.items(): if isinstance(values, dict): for sub_cat, indices in values.items(): print(f"{cat}.{sub_cat}: {len(indices)} atoms") else: print(f"{cat}: {len(values)} atoms") # 获取氢键配置 hbond_config = get_hbond_config() # 查找氢键 hbond_results = find_hbonds_in_frame(atom_types, coords, lattice, box, atom_categories, hbond_config) # 打印氢键统计 print("\nHydrogen Bonds Found:") total_hbonds = 0 for hbond_type, bonds in hbond_results.items(): print(f"{hbond_type}: {len(bonds)} bonds") total_hbonds += len(bonds) print(f"\nTotal Hydrogen Bonds: {total_hbonds}") # 保存结果到CSV os.makedirs("hbond_analysis", exist_ok=True) csv_path = os.path.join("hbond_analysis", "first_frame_hbonds.csv") with open(csv_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Hydrogen Bond Type", "Count"]) for hbond_type, bonds in hbond_results.items(): writer.writerow([hbond_type, len(bonds)]) print(f"\nSaved hydrogen bond counts to: {csv_path}") except Exception as e: print(f"Error during test mode: {str(e)}") import traceback traceback.print_exc() def calculate_hbond_lifetimes(data_file, traj_file, workers=1, delta_t=10, max_tau=1000, dt=0.1): “”“计算氢键寿命”“” print(f"Starting hydrogen bond lifetime analysis with {workers} workers") # 读取data文件获取初始原子类型和坐标 atom_types, _, box = read_data_file(data_file) # 创建晶格矩阵 (正交盒子) lattice = np.diag(box) # 获取氢键配置 hbond_config = get_hbond_config() # 初始化氢键事件记录器 hbond_events = {hbond["name"]: [] for hbond in hbond_config} # 打开轨迹文件 frame_count = 0 frames = [] with open(traj_file, 'r') as f: # 计算总帧数 total_frames = sum(1 for line in f if "ITEM: TIMESTEP" in line) f.seek(0) # 重置文件指针 # 进度条 pbar = tqdm(total=total_frames, desc="Reading frames") # 读取每一帧 while True: frame_data = read_lammpstrj_frame(f) if frame_data is None: break frames.append(frame_data) pbar.update(1) pbar.close() print(f"Read {len(frames)} frames from trajectory") # Windows系统禁用多进程 if platform.system() == "Windows" and workers > 1: print("Warning: Multiprocessing disabled on Windows. Using single process.") workers = 1 # 处理帧数据 results = [] if workers > 1: try: # 尝试使用pathos库 from pathos.multiprocessing import ProcessingPool as Pool print("Using pathos.multiprocessing for parallel processing") with Pool(workers) as pool: results = list(tqdm(pool.imap(process_frame, frames), total=len(frames), desc="Processing frames")) except ImportError: # 回退到标准multiprocessing print("Using standard multiprocessing for parallel processing") with multiprocessing.Pool(workers) as pool: results = list(tqdm(pool.imap(process_frame, frames), total=len(frames), desc="Processing frames")) else: results = [] for frame in tqdm(frames, desc="Processing frames"): results.append(process_frame(frame)) # 收集氢键事件 for frame_hbonds in results: for hbond_type, bonds in frame_hbonds.items(): # 使用frozenset存储原子索引元组确保可哈希 hbond_set = set() for bond in bonds: # 使用(donor, acceptor)作为氢键标识符 hbond_set.add((bond[0], bond[2])) hbond_events[hbond_type].append(hbond_set) # 计算每种氢键类型的相关函数 lifetime_results = {} for hbond_type, events in hbond_events.items(): print(f"\nCalculating lifetime for {hbond_type}...") corr_func = calculate_hbond_lifetime(events, delta_t, max_tau) tau = integrate_correlation_function(corr_func, dt) lifetime_results[hbond_type] = {"corr_func": corr_func, "tau": tau} # 打印结果 print(f" Relaxation time for {hbond_type}: {tau:.2f} fs") # 绘制相关函数 plot_correlation_functions(lifetime_results, dt, max_tau) return lifetime_results def plot_correlation_functions(results, dt, max_tau): “”“绘制氢键寿命相关函数”“” os.makedirs(“hbond_lifetimes”, exist_ok=True) # 创建时间轴 time_axis = np.arange(0, max_tau * dt, dt) # 创建图形 plt.figure(figsize=(10, 8)) # 绘制每种氢键类型的相关函数 for hbond_type, data in results.items(): corr_func = data["corr_func"] tau = data["tau"] # 截断时间轴以匹配相关函数长度 t_plot = time_axis[:len(corr_func)] plt.plot( t_plot, corr_func, label=f"{hbond_type} (τ={tau:.2f} fs)", linewidth=2 ) # 设置图形属性 plt.title("Hydrogen Bond Lifetime Correlation Functions", fontsize=16) plt.xlabel("Time (fs)", fontsize=14) plt.ylabel("Survival Probability", fontsize=14) plt.yscale("log") plt.grid(True, linestyle='--', alpha=0.7) plt.legend(fontsize=10, framealpha=0.8) # 保存图像 plot_path = os.path.join("hbond_lifetimes", "hbond_lifetimes.tiff") plt.tight_layout() plt.savefig(plot_path, dpi=600) plt.close() print(f"\nSaved correlation function plot to: {plot_path}") # 保存数据到CSV csv_path = os.path.join("hbond_lifetimes", "hbond_lifetimes.csv") with open(csv_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) header = ["Time (fs)"] for hbond_type in results.keys(): header.append(f"{hbond_type} (C(t))") header.append(f"{hbond_type} (τ)") writer.writerow(header) for i in range(len(time_axis)): if i >= max_tau: break row = [time_axis[i]] for hbond_type, data in results.items(): corr_func = data["corr_func"] tau = data["tau"] if i < len(corr_func): row.append(corr_func[i]) else: row.append("") row.append(tau) writer.writerow(row) print(f"Saved lifetime data to: {csv_path}") def main(): parser = argparse.ArgumentParser(description=‘Hydrogen Bond Analysis for LAMMPS Trajectories’) parser.add_argument(‘–test’, action=‘store_true’, help=‘Run in test mode (first frame only)’) parser.add_argument(‘–workers’, type=int, default=multiprocessing.cpu_count(), help=f’Number of parallel workers (default: {multiprocessing.cpu_count()})‘) parser.add_argument(’–delta_t’, type=int, default=10, help=‘Time origin spacing for correlation function (default: 10 frames)’) parser.add_argument(‘–max_tau’, type=int, default=1000, help=‘Maximum time for correlation function (default: 1000 frames)’) parser.add_argument(‘–dt’, type=float, default=0.1, help=‘Time step (fs) (default: 0.1 fs)’) args = parser.parse_args() # 设置默认文件 data_file = "H3PO4-23pure.data" traj_file = "nvt-P2O5-353K-23.lammpstrj" # 检查文件是否存在 if not os.path.exists(data_file): print(f"Error: Data file not found: {data_file}") sys.exit(1) if not os.path.exists(traj_file): print(f"Error: Trajectory file not found: {traj_file}") sys.exit(1) start_time = time.time() if args.test: # 测试模式:只处理第一帧 test_first_frame(data_file, traj_file) else: # 完整模式:计算氢键寿命 calculate_hbond_lifetimes( data_file, traj_file, workers=args.workers, delta_t=args.delta_t, max_tau=args.max_tau, dt=args.dt ) elapsed = time.time() - start_time print(f"\nAnalysis completed in {elapsed:.2f} seconds") if name == “main”: main() 以上代码是尝试计算LAMMPS数据的氢键寿命,但是要么计算时间太长要么就是显示没办法并行,cpu和内存的利用率不是很高(可以为了尽快计算,可以提高cpu和内存的占用率),import MDAnalysis as mda import numpy as np from MDAnalysis.lib.distances import capped_distance, calc_angles, distance_array import multiprocessing as mp from multiprocessing import shared_memory from tqdm import tqdm import os import argparse import matplotlib.pyplot as plt from scipy.integrate import cumtrapz import pandas as pd import numba from scipy.spatial import cKDTree import time import gc import zlib import asyncio import concurrent.futures from functools import partial ====================== 优化参数配置 ====================== BOX_SIZE = 80.0 CALC_REGION = [0, 60, 0, 60, 0, 60] ATOM_TYPE_MAP = {1: ‘H’, 2: ‘O’, 7: ‘P’} HB_DIST_CUTOFF = 3.5 HB_ANGLE_CUTOFF = 130 FRAME_DT = 0.1 TAU_MAX = 20.0 DT_ORIGIN = 2.0 MEMORY_CHUNK = 100 # 内存分块大小(帧数) POSITION_DTYPE = np.float32 # 压缩坐标数据类型 N_WORKERS = os.cpu_count() KD_TREE_LEAFSIZE = 32 # KD树优化参数 原子类型常量 O_WATER, O_HYDRONIUM, O_PHYDROXYL, O_PDOUBLE, O_PBRIDGE = 0, 1, 2, 3, 4 H_WATER, H_HYDRONIUM, H_PHOSPHORIC, P, UNKNOWN = 5, 6, 7, 8, 9 原子类型映射 ATOM_TYPE_IDS = { ‘O_water’: O_WATER, ‘O_hydronium’: O_HYDRONIUM, ‘O_phydroxyl’: O_PHYDROXYL, ‘O_pdouble’: O_PDOUBLE, ‘O_pbridge’: O_PBRIDGE, ‘H_water’: H_WATER, ‘H_hydronium’: H_HYDRONIUM, ‘H_phosphoric’: H_PHOSPHORIC, ‘P’: P, ‘UNK’: UNKNOWN } ATOM_TYPE_NAMES = ====================== 内存优化数据结构 ====================== class CompressedFrame: “”“压缩帧数据存储”“” slots = [‘positions’, ‘types’, ‘box’] def __init__(self, positions, types, box): # 压缩坐标数据 self.positions = positions.astype(POSITION_DTYPE) self.types = types self.box = box def decompress(self): return self.positions, self.types, self.box ====================== 核心优化函数 ====================== @numba.jit(nopython=True, parallel=True, fastmath=True) def numba_classify_atoms(positions, types, box, atom_labels): “”“Numba加速的原子分类”“” n_atoms = positions.shape[0] oh_bonds = np.zeros((n_atoms, n_atoms), dtype=np.bool_) po_bonds = np.zeros((n_atoms, n_atoms), dtype=np.bool_) # 构建距离矩阵 for i in numba.prange(n_atoms): for j in range(i+1, n_atoms): if types[i] == 2 and types[j] == 1: # O-H dist = np.linalg.norm(positions[i] - positions[j]) if dist < 1.2: oh_bonds[i, j] = True elif types[i] == 7 and types[j] == 2: # P-O dist = np.linalg.norm(positions[i] - positions[j]) if dist < 1.8: po_bonds[i, j] = True # 分类原子 for i in numba.prange(n_atoms): if types[i] == 2: # Oxygen h_count = np.sum(oh_bonds[i]) p_count = np.sum(po_bonds[:, i]) if h_count == 3: atom_labels[i] = O_HYDRONIUM elif h_count == 2: atom_labels[i] = O_WATER elif h_count == 1 and p_count == 1: atom_labels[i] = O_PHYDROXYL elif p_count == 1: atom_labels[i] = O_PDOUBLE elif p_count == 2: atom_labels[i] = O_PBRIDGE elif types[i] == 1: # Hydrogen # 查找连接的氧原子 connected_o = np.where(oh_bonds[:, i])[0] if len(connected_o) == 1: o_type = atom_labels[connected_o[0]] if o_type == O_HYDRONIUM: atom_labels[i] = H_HYDRONIUM elif o_type == O_WATER: atom_labels[i] = H_WATER elif o_type == O_PHYDROXYL: atom_labels[i] = H_PHOSPHORIC elif types[i] == 7: # Phosphorus atom_labels[i] = P else: atom_labels[i] = UNKNOWN return atom_labels def classify_atoms_optimized(positions, types, box): “”“向量化原子分类”“” n_atoms = len(types) atom_labels = np.full(n_atoms, UNKNOWN, dtype=np.int8) return numba_classify_atoms(positions, types, box, atom_labels) def find_hbonds_kdtree(positions, atom_labels, box): “”“KDTree加速的氢键识别”“” # 识别供体和受体 donor_mask = np.isin(atom_labels, [O_WATER, O_HYDRONIUM, O_PHYDROXYL]) acceptor_mask = np.isin(atom_labels, [O_WATER, O_HYDRONIUM, O_PDOUBLE, O_PBRIDGE, O_PHYDROXYL]) # 构建KD树 acceptor_tree = cKDTree(positions[acceptor_mask], boxsize=box[:3], leafsize=KD_TREE_LEAFSIZE) hbonds = [] # 处理供体 for donor_idx in np.where(donor_mask)[0]: # 找到连接的氢原子 h_mask = (atom_labels == H_WATER) | (atom_labels == H_HYDRONIUM) | (atom_labels == H_PHOSPHORIC) dists = distance_array(positions[donor_idx].reshape(1, -1), positions[h_mask], box=box) connected_h = np.where(dists[0] < 1.2)[0] h_indices = np.where(h_mask)[0] for h_idx in connected_h: hydrogen_idx = h_indices[h_idx] # 查询可能的受体 neighbors = acceptor_tree.query_ball_point( positions[hydrogen_idx], HB_DIST_CUTOFF ) for j in neighbors: acceptor_idx = np.where(acceptor_mask)[0][j] if donor_idx == acceptor_idx: continue # 计算角度 angle = calc_angles( positions[donor_idx], positions[hydrogen_idx], positions[acceptor_idx], box=box ) angle_deg = np.degrees(angle) if angle_deg > HB_ANGLE_CUTOFF: dist = np.linalg.norm(positions[hydrogen_idx] - positions[acceptor_idx]) hbond_type = f"{ATOM_TYPE_NAMES[atom_labels[donor_idx]].split('_')[0]}-{ATOM_TYPE_NAMES[atom_labels[acceptor_idx]].split('_')[0]}" hbonds.append({ 'donor': donor_idx, 'hydrogen': hydrogen_idx, 'acceptor': acceptor_idx, 'type': hbond_type, 'distance': dist, 'angle': angle_deg }) return hbonds @numba.jit(nopython=True, parallel=True) def vectorized_hbond_survival(positions, hbond_indices, box, tau_max_frames): “”“向量化氢键存活检查”“” n_hbonds = len(hbond_indices) survival = np.zeros((n_hbonds, tau_max_frames), dtype=np.bool_) for t in numba.prange(1, tau_max_frames): for i in range(n_hbonds): donor, hydrogen, acceptor = hbond_indices[i] dist = np.linalg.norm(positions[t, hydrogen] - positions[t, acceptor]) angle = calc_angles( positions[t, donor], positions[t, hydrogen], positions[t, acceptor], box=box ) angle_deg = np.degrees(angle) # 如果前一帧存活且当前帧满足条件 if survival[i, t-1] and dist < HB_DIST_CUTOFF and angle_deg > HB_ANGLE_CUTOFF: survival[i, t] = True return survival def process_frame_chunk(chunk): “”“处理帧块(共享内存优化)”“” frame_indices, shm_name, shape, dtype = chunk existing_shm = shared_memory.SharedMemory(name=shm_name) positions = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf) results = [] for frame_idx in frame_indices: frame_positions = positions[frame_idx] # 原子分类和氢键识别... # 简化为示例 atom_labels = classify_atoms_optimized(frame_positions, ...) hbonds = find_hbonds_kdtree(frame_positions, atom_labels, ...) results.append((frame_idx, hbonds)) existing_shm.close() return results async def async_load_trajectory(universe, frame_chunk): “”“异步加载轨迹块”“” loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as pool: frames = await loop.run_in_executor( pool, partial(load_frames_sync, universe, frame_chunk) ) return frames def load_frames_sync(universe, frame_chunk): “”“同步加载帧块”“” frames = [] for frame_idx in frame_chunk: universe.trajectory[frame_idx] frames.append(CompressedFrame( universe.atoms.positions.copy(), universe.atoms.types.astype(int), universe.dimensions.copy() )) return frames def calculate_hbond_lifetime_chunk(args): “”“分块计算氢键寿命”“” chunk_start, chunk_end, universe, atom_labels_cache, tau_max_frames = args scf_chunk = np.zeros(tau_max_frames) count_chunk = np.zeros(tau_max_frames) # 预加载本块需要的所有帧 frames_needed = range(chunk_start, chunk_end + tau_max_frames) frame_data = {} for frame_idx in frames_needed: if frame_idx not in frame_data: universe.trajectory[frame_idx] frame_data[frame_idx] = universe.atoms.positions.copy() # 处理本块内的每个原点 for origin in range(chunk_start, chunk_end, int(DT_ORIGIN / FRAME_DT)): hbonds_origin = atom_labels_cache.get(origin, []) if not hbonds_origin: continue # 提取氢键索引 hbond_indices = np.array([ [hb['donor'], hb['hydrogen'], hb['acceptor']] for hb in hbonds_origin ]) # 收集时间窗口内的坐标 window_frames = min(tau_max_frames, len(universe.trajectory) - origin) positions_window = np.zeros((window_frames, len(universe.atoms), 3)) for t in range(window_frames): frame_idx = origin + t positions_window[t] = frame_data[frame_idx] # 向量化检查氢键存活 survival = vectorized_hbond_survival( positions_window, hbond_indices, universe.dimensions, window_frames ) # 更新统计 for t in range(1, window_frames): n_survived = np.sum(survival[:, t]) if n_survived > 0: scf_chunk[t] += n_survived count_chunk[t] += len(hbonds_origin) return scf_chunk, count_chunk ====================== 优化主函数 ====================== def main(top_file, traj_file, output_prefix): print(f"加载拓扑: {top_file}“) print(f"加载轨迹: {traj_file}”) # 创建Universe universe = mda.Universe( top_file, traj_file, topology_format="DATA", format="LAMMPSDUMP", atom_style="id type charge x y z" ) n_frames = len(universe.trajectory) tau_max_frames = int(TAU_MAX / FRAME_DT) print(f"总帧数: {n_frames} | 时间窗口: {tau_max_frames}帧") # 分块预处理 print("分块预处理轨迹...") atom_labels_cache = {} frame_hbond_counts = {} total_hbonds = 0 # 使用共享内存存储坐标 shm = shared_memory.SharedMemory(create=True, size=n_frames * universe.atoms.n_atoms * 3 * 8) positions_shm = np.ndarray((n_frames, universe.atoms.n_atoms, 3), dtype=np.float64, buffer=shm.buf) # 异步加载轨迹 chunk_size = min(MEMORY_CHUNK, n_frames) for chunk_start in range(0, n_frames, chunk_size): chunk_end = min(chunk_start + chunk_size, n_frames) print(f"处理块: {chunk_start}-{chunk_end}") # 异步加载当前块 frame_chunk = list(range(chunk_start, chunk_end)) frames = asyncio.run(async_load_trajectory(universe, frame_chunk)) # 存储到共享内存 for i, frame_idx in enumerate(frame_chunk): positions_shm[frame_idx] = frames[i].positions # 并行处理当前块 with mp.Pool(processes=N_WORKERS) as pool: tasks = [(frame_idx, shm.name, positions_shm.shape, positions_shm.dtype) for frame_idx in frame_chunk] results = list(tqdm(pool.imap(process_frame_chunk, tasks, chunksize=10), total=len(tasks))) # 收集结果 for frame_idx, frame_hbonds in results: atom_labels_cache[frame_idx] = frame_hbonds total_hbonds += len(frame_hbonds) for hb in frame_hbonds: hb_type = hb['type'] frame_hbond_counts.setdefault(hb_type, []).append(1) # 释放内存 del frames, results gc.collect() # 计算氢键密度 avg_hbonds = total_hbonds / n_frames calc_volume = ((CALC_REGION[1]-CALC_REGION[0]) * (CALC_REGION[3]-CALC_REGION[2]) * (CALC_REGION[5]-CALC_REGION[4])) * 1e-27 hbond_density = avg_hbonds / (calc_volume * 6.022e23) print(f"平均氢键数: {avg_hbonds:.2f}") print(f"氢键密度: {hbond_density:.4f} mol/L") # 分块计算氢键寿命 print("分块计算氢键寿命...") scf_total = np.zeros(tau_max_frames) count_total = np.zeros(tau_max_frames) chunk_size = max(1000, tau_max_frames * 10) # 确保块大小大于时间窗口 chunks = [(start, min(start + chunk_size, n_frames - tau_max_frames)) for start in range(0, n_frames - tau_max_frames, chunk_size)] with mp.Pool(processes=N_WORKERS) as pool: tasks = [(chunk[0], chunk[1], universe, atom_labels_cache, tau_max_frames) for chunk in chunks] results = list(tqdm(pool.imap(calculate_hbond_lifetime_chunk, tasks, chunksize=1), total=len(tasks))) for scf_chunk, count_chunk in results: scf_total += scf_chunk count_total += count_chunk # 结果处理 scf_avg = np.where(count_total > 0, scf_total / count_total, 0) time_axis = np.arange(0, tau_max_frames) * FRAME_DT tau_relax = np.trapz(scf_avg, time_axis) print(f"弛豫时间: {tau_relax:.2f} ps") # 输出结果(与原始代码相同) # ... print("分析完成!") shm.close() shm.unlink() if name == “main”: parser = argparse.ArgumentParser(description=‘超大规模氢键寿命分析’) parser.add_argument(‘top_file’, type=str, help=‘拓扑文件路径’) parser.add_argument(‘traj_file’, type=str, help=‘轨迹文件路径’) parser.add_argument(‘output_prefix’, type=str, help=‘输出前缀’) parser.add_argument(‘–workers’, type=int, default=N_WORKERS, help=‘并行进程数’) args = parser.parse_args() N_WORKERS = args.workers # 设置NUMA亲和性(Linux) if os.name == 'posix': os.system("taskset -p 0xFFFFFFFF %d" % os.getpid()) main(args.top_file, args.traj_file, args.output_prefix) 以上是计算vasp数据时候采用的氢键寿命计算代码,但总时长只4ps可能对于投小论文缺乏支撑性,所以在这里我们对LAMMPS数据进行了计算氢键寿命,希望能够采取有效的方式提高计算效率的同时能够准确计算出氢键寿命

若澜
  • 粉丝: 4
上传资源 快速赚钱