|  | """ | 
					
						
						|  | Simulaciones OrbMol con interfaz estilo Facebook FAIRChem | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | from __future__ import annotations | 
					
						
						|  | import os | 
					
						
						|  | import tempfile | 
					
						
						|  | from pathlib import Path | 
					
						
						|  | import numpy as np | 
					
						
						|  | import ase | 
					
						
						|  | import ase.io | 
					
						
						|  | from ase import units | 
					
						
						|  | from ase.io.trajectory import Trajectory | 
					
						
						|  | from ase.optimize import LBFGS | 
					
						
						|  | from ase.filters import FrechetCellFilter | 
					
						
						|  | from ase.md import MDLogger | 
					
						
						|  | from ase.md.velocitydistribution import MaxwellBoltzmannDistribution | 
					
						
						|  | from ase.md.verlet import VelocityVerlet | 
					
						
						|  | from ase.md.nose_hoover_chain import NoseHooverChainNVT | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | from orb_models.forcefield import pretrained | 
					
						
						|  | from orb_models.forcefield.calculator import ORBCalculator | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | _model_calc: ORBCalculator | None = None | 
					
						
						|  | _current_task_name = None | 
					
						
						|  |  | 
					
						
						|  | model_name = { | 
					
						
						|  | "OMol": "orb_v3_conservative_omol", | 
					
						
						|  | "OMat": "orb_v3_conservative_inf_omat", | 
					
						
						|  | "OMol-Direct": "orb_v3_direct_omol"} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def load_orbmol_model(task_name,device: str = "cpu", precision: str = "float32-high") -> ORBCalculator: | 
					
						
						|  | """Load OrbMol calculator, switches only if another model is required.""" | 
					
						
						|  | global _model_calc, _current_task_name | 
					
						
						|  | if _model_calc is None or _current_task_name != task_name: | 
					
						
						|  | if task_name == "OMol": | 
					
						
						|  | orbff= pretrained.orb_v3_conservative_omol( | 
					
						
						|  | device=device, | 
					
						
						|  | precision=precision, | 
					
						
						|  | ) | 
					
						
						|  | elif task_name == "OMat": | 
					
						
						|  | orbff = pretrained.orb_v3_conservative_inf_omat( | 
					
						
						|  | device=device, | 
					
						
						|  | precision=precision, | 
					
						
						|  | ) | 
					
						
						|  | elif task_name == "OMol-Direct": | 
					
						
						|  | orbff = pretrained.orb_v3_direct_omol( | 
					
						
						|  | device=device, | 
					
						
						|  | precision=precision, | 
					
						
						|  | ) | 
					
						
						|  | else: | 
					
						
						|  | raise ValueError(f"Unknown task_name: {task_name}") | 
					
						
						|  | _model_calc = ORBCalculator(orbff, device=device) | 
					
						
						|  | _current_task_name = task_name | 
					
						
						|  | return _model_calc | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def load_check_ase_atoms(structure_file): | 
					
						
						|  | """COPIA EXACTA de Facebook - valida y carga estructura""" | 
					
						
						|  | if not structure_file: | 
					
						
						|  | raise Exception("You need an input structure file to run a simulation!") | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | atoms = ase.io.read(structure_file) | 
					
						
						|  |  | 
					
						
						|  | if not (all(atoms.pbc) or np.all(~np.array(atoms.pbc))): | 
					
						
						|  | raise Exception( | 
					
						
						|  | "Mixed PBC are not supported yet - please set PBC all True or False in your structure before uploading" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if len(atoms) == 0: | 
					
						
						|  | raise Exception("Error: Structure file contains no atoms.") | 
					
						
						|  |  | 
					
						
						|  | if len(atoms) > 2000: | 
					
						
						|  | raise Exception( | 
					
						
						|  | f"Error: Structure file contains {len(atoms)}, which is more than 2000 atoms. Please use a smaller structure for this demo, or run this on a local machine!" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | atoms.positions -= atoms.get_center_of_mass() | 
					
						
						|  | cell_center = atoms.get_cell().sum(axis=0) / 2 | 
					
						
						|  | atoms.positions += cell_center | 
					
						
						|  |  | 
					
						
						|  | return atoms | 
					
						
						|  | except Exception as e: | 
					
						
						|  | raise Exception(f"Error loading structure with ASE: {str(e)}") | 
					
						
						|  |  | 
					
						
						|  | def run_md_simulation( | 
					
						
						|  | structure_file, | 
					
						
						|  | num_steps, | 
					
						
						|  | num_prerelax_steps, | 
					
						
						|  | md_timestep, | 
					
						
						|  | temperature_k, | 
					
						
						|  | md_ensemble, | 
					
						
						|  | task_name, | 
					
						
						|  | total_charge=0, | 
					
						
						|  | spin_multiplicity=1, | 
					
						
						|  | explanation: str | None = None, | 
					
						
						|  | oauth_token=None, | 
					
						
						|  | progress=None, | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | MD simulation estilo Facebook pero con OrbMol | 
					
						
						|  | """ | 
					
						
						|  | temp_path = None | 
					
						
						|  | traj_path = None | 
					
						
						|  | md_log_path = None | 
					
						
						|  | atoms = None | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | atoms = load_check_ase_atoms(structure_file) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | atoms.info["charge"] = total_charge | 
					
						
						|  | atoms.info["spin"] = spin_multiplicity | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | calc = load_orbmol_model(task_name) | 
					
						
						|  | atoms.calc = calc | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | interval = 1 | 
					
						
						|  | steps = [0] | 
					
						
						|  | expected_steps = num_steps + num_prerelax_steps | 
					
						
						|  |  | 
					
						
						|  | def update_progress(): | 
					
						
						|  | steps[-1] += interval | 
					
						
						|  | if progress: | 
					
						
						|  | progress(steps[-1] / expected_steps) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with tempfile.NamedTemporaryFile(suffix=".traj", delete=False) as traj_f: | 
					
						
						|  | traj_path = traj_f.name | 
					
						
						|  | with tempfile.NamedTemporaryFile(suffix=".log", delete=False) as log_f: | 
					
						
						|  | md_log_path = log_f.name | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | opt = LBFGS(atoms, logfile=md_log_path, trajectory=traj_path) | 
					
						
						|  | if progress: | 
					
						
						|  | opt.attach(update_progress, interval=interval) | 
					
						
						|  | opt.run(fmax=0.05, steps=num_prerelax_steps) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | MaxwellBoltzmannDistribution(atoms, temperature_K=temperature_k * 2) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if md_ensemble == "NVE": | 
					
						
						|  | dyn = VelocityVerlet(atoms, timestep=md_timestep * units.fs) | 
					
						
						|  | elif md_ensemble == "NVT": | 
					
						
						|  | dyn = NoseHooverChainNVT( | 
					
						
						|  | atoms, | 
					
						
						|  | timestep=md_timestep * units.fs, | 
					
						
						|  | temperature_K=temperature_k, | 
					
						
						|  | tdamp=10 * md_timestep * units.fs, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | traj = Trajectory(traj_path, "a", atoms) | 
					
						
						|  | dyn.attach(traj.write, interval=1) | 
					
						
						|  | if progress: | 
					
						
						|  | dyn.attach(update_progress, interval=interval) | 
					
						
						|  | dyn.attach( | 
					
						
						|  | MDLogger( | 
					
						
						|  | dyn, | 
					
						
						|  | atoms, | 
					
						
						|  | md_log_path, | 
					
						
						|  | header=True, | 
					
						
						|  | stress=False, | 
					
						
						|  | peratom=True, | 
					
						
						|  | mode="a", | 
					
						
						|  | ), | 
					
						
						|  | interval=10, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dyn.run(num_steps) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | reproduction_script = f""" | 
					
						
						|  | import ase.io | 
					
						
						|  | from ase.md.velocitydistribution import MaxwellBoltzmannDistribution | 
					
						
						|  | from ase.md.verlet import VelocityVerlet | 
					
						
						|  | from ase.optimize import LBFGS | 
					
						
						|  | from ase.io.trajectory import Trajectory | 
					
						
						|  | from ase.md import MDLogger | 
					
						
						|  | from ase import units | 
					
						
						|  | from orb_models.forcefield import pretrained | 
					
						
						|  | from orb_models.forcefield.calculator import ORBCalculator | 
					
						
						|  |  | 
					
						
						|  | # Read the atoms object from ASE read-able file | 
					
						
						|  | atoms = ase.io.read('input_file.traj') | 
					
						
						|  | # Set the total charge and spin multiplicity | 
					
						
						|  | atoms.info["charge"] = {total_charge} | 
					
						
						|  | atoms.info["spin"] = {spin_multiplicity} | 
					
						
						|  | # Set up the OrbMol calculator | 
					
						
						|  | orbff = pretrained.{model_name[task_name]}(device='cpu', precision='float32-high') | 
					
						
						|  | atoms.calc = ORBCalculator(orbff, device='cpu') | 
					
						
						|  | # Do a quick pre-relaxation to make sure the system is stable | 
					
						
						|  | opt = LBFGS(atoms, trajectory="relaxation_output.traj") | 
					
						
						|  | opt.run(fmax=0.05, steps={num_prerelax_steps}) | 
					
						
						|  | # Initialize the velocity distribution; we set twice the temperature since we did a relaxation and | 
					
						
						|  | # much of the kinetic energy will partition to the potential energy right away | 
					
						
						|  | MaxwellBoltzmannDistribution(atoms, temperature_K={temperature_k}*2) | 
					
						
						|  | # Initialize the integrator; NVE is shown here as an example | 
					
						
						|  | dyn = VelocityVerlet(atoms, timestep={md_timestep} * units.fs) | 
					
						
						|  | # Set up trajectory and MD logger | 
					
						
						|  | dyn.attach(MDLogger(dyn, atoms, 'md.log', header=True, stress=False, peratom=True, mode="w"), interval=10) | 
					
						
						|  | traj = Trajectory("md_output.traj", "w", atoms) | 
					
						
						|  | dyn.attach(traj.write, interval=1) | 
					
						
						|  | # Run the simulation! | 
					
						
						|  | dyn.run({num_steps}) | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with open(md_log_path, "r") as md_log_file: | 
					
						
						|  | md_log = md_log_file.read() | 
					
						
						|  |  | 
					
						
						|  | if explanation is None: | 
					
						
						|  | explanation = f"MD simulation of {len(atoms)} atoms for {num_steps} steps with a timestep of {md_timestep} fs at {temperature_k} K in the {md_ensemble} ensemble using OrbMol. You submitted this simulation, so I hope you know what you're looking for or what it means!" | 
					
						
						|  |  | 
					
						
						|  | return traj_path, md_log, reproduction_script, explanation | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | raise Exception( | 
					
						
						|  | f"Error running MD simulation: {str(e)}. Please try again or report this error." | 
					
						
						|  | ) | 
					
						
						|  | finally: | 
					
						
						|  |  | 
					
						
						|  | if temp_path and os.path.exists(temp_path): | 
					
						
						|  | os.remove(temp_path) | 
					
						
						|  |  | 
					
						
						|  | if md_log_path and os.path.exists(md_log_path): | 
					
						
						|  | os.remove(md_log_path) | 
					
						
						|  |  | 
					
						
						|  | if atoms is not None and getattr(atoms, "calc", None) is not None: | 
					
						
						|  | atoms.calc = None | 
					
						
						|  |  | 
					
						
						|  | def run_relaxation_simulation( | 
					
						
						|  | structure_file, | 
					
						
						|  | num_steps, | 
					
						
						|  | fmax, | 
					
						
						|  | task_name, | 
					
						
						|  | total_charge: float = 0, | 
					
						
						|  | spin_multiplicity: float = 1, | 
					
						
						|  | relax_unit_cell=False, | 
					
						
						|  | explanation: str | None = None, | 
					
						
						|  | oauth_token=None, | 
					
						
						|  | progress=None, | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Relaxation simulation estilo Facebook pero con OrbMol | 
					
						
						|  | """ | 
					
						
						|  | temp_path = None | 
					
						
						|  | traj_path = None | 
					
						
						|  | opt_log_path = None | 
					
						
						|  | atoms = None | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | atoms = load_check_ase_atoms(structure_file) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | atoms.info["charge"] = total_charge | 
					
						
						|  | atoms.info["spin"] = spin_multiplicity | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | calc = load_orbmol_model(task_name) | 
					
						
						|  | atoms.calc = calc | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with tempfile.NamedTemporaryFile(suffix=".traj", delete=False) as traj_f: | 
					
						
						|  | traj_path = traj_f.name | 
					
						
						|  | with tempfile.NamedTemporaryFile(suffix=".log", delete=False) as log_f: | 
					
						
						|  | opt_log_path = log_f.name | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | optimizer = LBFGS( | 
					
						
						|  | FrechetCellFilter(atoms) if relax_unit_cell else atoms, | 
					
						
						|  | trajectory=traj_path, | 
					
						
						|  | logfile=opt_log_path, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if progress: | 
					
						
						|  | interval = 1 | 
					
						
						|  | steps = [0] | 
					
						
						|  | def update_progress(steps): | 
					
						
						|  | steps[-1] += interval | 
					
						
						|  | progress(steps[-1] / num_steps) | 
					
						
						|  | optimizer.attach(update_progress, interval=interval, steps=steps) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | optimizer.run(fmax=fmax, steps=num_steps) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | reproduction_script = f""" | 
					
						
						|  | import ase.io | 
					
						
						|  | from ase.optimize import LBFGS | 
					
						
						|  | from ase.filters import FrechetCellFilter | 
					
						
						|  | from orb_models.forcefield import pretrained | 
					
						
						|  | from orb_models.forcefield.calculator import ORBCalculator | 
					
						
						|  |  | 
					
						
						|  | # Read the atoms object from ASE read-able file | 
					
						
						|  | atoms = ase.io.read('input_file.traj') | 
					
						
						|  | # Set the total charge and spin multiplicity | 
					
						
						|  | atoms.info["charge"] = {total_charge} | 
					
						
						|  | atoms.info["spin"] = {spin_multiplicity} | 
					
						
						|  | # Set up the OrbMol calculator | 
					
						
						|  | orbff = pretrained.{model_name[task_name]}(device='cpu', precision='float32-high') | 
					
						
						|  | atoms.calc = ORBCalculator(orbff, device='cpu') | 
					
						
						|  | # Initialize the optimizer | 
					
						
						|  | relax_unit_cell = {relax_unit_cell} | 
					
						
						|  | optimizer = LBFGS(FrechetCellFilter(atoms) if relax_unit_cell else atoms, trajectory="relaxation_output.traj") | 
					
						
						|  | # Run the optimization! | 
					
						
						|  | optimizer.run(fmax={fmax}, steps={num_steps}) | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with open(opt_log_path, "r") as opt_log_file: | 
					
						
						|  | opt_log = opt_log_file.read() | 
					
						
						|  |  | 
					
						
						|  | if explanation is None: | 
					
						
						|  | explanation = f"Relaxation of {len(atoms)} atoms for {num_steps} steps with a force tolerance of {fmax} eV/脜 using OrbMol. You submitted this simulation, so I hope you know what you're looking for or what it means!" | 
					
						
						|  |  | 
					
						
						|  | return traj_path, opt_log, reproduction_script, explanation | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | raise Exception( | 
					
						
						|  | f"Error running relaxation: {str(e)}. Please try again or report this error." | 
					
						
						|  | ) | 
					
						
						|  | finally: | 
					
						
						|  |  | 
					
						
						|  | if temp_path and os.path.exists(temp_path): | 
					
						
						|  | os.remove(temp_path) | 
					
						
						|  | if opt_log_path and os.path.exists(opt_log_path): | 
					
						
						|  | os.remove(opt_log_path) | 
					
						
						|  | if atoms is not None and getattr(atoms, "calc", None) is not None: | 
					
						
						|  | atoms.calc = None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def atoms_to_xyz(atoms: ase.Atoms) -> str: | 
					
						
						|  | """Convert ASE Atoms to an XYZ string for quick visualization.""" | 
					
						
						|  | lines = [str(len(atoms)), "generated by simulation_scripts_orbmol"] | 
					
						
						|  | for s, (x, y, z) in zip(atoms.get_chemical_symbols(), atoms.get_positions()): | 
					
						
						|  | lines.append(f"{s} {x:.6f} {y:.6f} {z:.6f}") | 
					
						
						|  | return "\n".join(lines) | 
					
						
						|  |  | 
					
						
						|  | def last_frame_xyz_from_traj(traj_path: str | Path) -> str: | 
					
						
						|  | """Read the last frame of an ASE .traj and return it as XYZ string.""" | 
					
						
						|  | tr = Trajectory(str(traj_path)) | 
					
						
						|  | last = tr[-1] | 
					
						
						|  | return atoms_to_xyz(last) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def validate_ase_atoms_and_login(structure_file, login_button_value="", oauth_token=None): | 
					
						
						|  | """Validaci贸n simplificada - sin login UMA""" | 
					
						
						|  | if not structure_file: | 
					
						
						|  | return (False, False, "Missing input structure!") | 
					
						
						|  |  | 
					
						
						|  | if isinstance(structure_file, dict): | 
					
						
						|  | structure_file = structure_file["path"] | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | atoms = ase.io.read(structure_file) | 
					
						
						|  |  | 
					
						
						|  | if len(atoms) == 0: | 
					
						
						|  | return (False, False, "No atoms in the structure file!") | 
					
						
						|  | elif not (all(atoms.pbc) or np.all(~np.array(atoms.pbc))): | 
					
						
						|  | return (False, False, f"Mixed PBC {atoms.pbc} not supported!") | 
					
						
						|  | elif len(atoms) > 2000: | 
					
						
						|  | return (False, False, f"Too many atoms ({len(atoms)}), max 2000!") | 
					
						
						|  | else: | 
					
						
						|  | return (True, True, "Structure loaded successfully - ready for OrbMol simulation!") | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return (False, False, f"Failed to load structure: {str(e)}") |