diff --git a/larch/io/columnfile.py b/larch/io/columnfile.py index 69b904bc5..735792571 100644 --- a/larch/io/columnfile.py +++ b/larch/io/columnfile.py @@ -612,6 +612,7 @@ def write_group(filename, group, scalars=None, arrays=None, def read_fdmnes(filename, **kwargs): """read [FDMNES](http://fdmnes.neel.cnrs.fr/) ascii files""" + shift_energy = kwargs.pop("shift_energy", True) group = read_ascii(filename, **kwargs) group.header_dict = dict(filetype='FDMNES', energy_units='eV') for headline in group.header: @@ -622,7 +623,8 @@ def read_fdmnes(filename, **kwargs): vals_names = headline.split(" = ")[1].split(", ") group.header_dict.update(dict(zip(vals_names, vals))) group.name = f'FDMNES file {filename}' - group.energy += group.header_dict["E_edge"] + if shift_energy: + group.energy += group.header_dict["E_edge"] #fix _arrlabel -> arrlabel for ilab, lab in enumerate(group.array_labels): if lab.startswith("_"): diff --git a/larch/xrd/struct2xas.py b/larch/xrd/struct2xas.py index aa11542bd..0c7c31b8d 100644 --- a/larch/xrd/struct2xas.py +++ b/larch/xrd/struct2xas.py @@ -4,11 +4,15 @@ """ Struct2XAS: convert CIFs and XYZs files to FDMNES and FEFF inputs """ + # main imports import os import json import time -import tempfile +from dataclasses import dataclass +from typing import Union, List # , Any, Dict + +# import tempfile import numpy as np # pymatgen @@ -19,7 +23,8 @@ import larch.utils.logging as logging from larch.utils import mkdir, unixpath -from larch.utils.strutils import fix_filename, unique_name, strict_ascii + +# from larch.utils.strutils import fix_filename, unique_name, strict_ascii from larch.site_config import user_larchdir from larch.io import read_ascii from larch.math.convolution1D import lin_gamma, conv @@ -43,7 +48,7 @@ __author__ = ["Beatriz G. Foschiani", "Mauro Rovezzi"] __email__ = ["beatrizgfoschiani@gmail.com", "mauro.rovezzi@esrf.fr"] __credits__ = ["Jade Chongsathapornpong", "Marius Retegan"] -__version__ = "2023.3.0" +__version__ = "2024.1.0" # initialize the logger @@ -109,10 +114,21 @@ def structure_folders(): return folders +@dataclass +class Job: + num: int #: index of the job + description: str #: description + type: Union[str, None] #: 'fdmnes', 'feff' + timestamp: str #: creation timestamp + path: Union[str, None] #: path for the calculation + + class Struct2XAS: """Class to convert data from CIF and XYZ files to FDMNES and FEFF inputs""" - def __init__(self, file, abs_atom) -> None: + def __init__( + self, file: str, abs_atom: str, parent_path: Union[str, None] = None + ) -> None: """ Arguments @@ -134,7 +150,7 @@ def __init__(self, file, abs_atom) -> None: Structures from xyz files are always considered non-symmetric for the lack of lattice information. For creating the object from an XYZ file, a non-symmetric `structure` object from pymatgen is - generated (spacegroup: P1) from a `molecule` one via + generated (spacegroup: P1) from a `molecule` via :func:`xyz2struct`. The lattice parameters chosen for this structure are arbitrary and are based on the size of the molecule, as are the fractional coordinates. Therefore, the analysis of this @@ -145,7 +161,6 @@ def __init__(self, file, abs_atom) -> None: For creating the object from cif file, a pymatgen structure is generated with symmetry information from cif file. """ - self.file = file self.abs_atom = abs_atom self.frame = 0 @@ -156,9 +171,18 @@ def __init__(self, file, abs_atom) -> None: self.nabs_sites = len(self.get_abs_sites()) self.elems = self._get_elems() self.species = self._get_species() - self.parent_path = user_larchdir + if parent_path is None: + self.parent_path = user_larchdir + else: + self.parent_path = parent_path self.folders = structure_folders() - logger.info( + self.calc_path = None + self.njob = 0 + self.jobs = [] + self.tmpl_fdmnes = None + self.tmpl_feff = None + self.tmpl_sbatch = None + logger.debug( f"Frames: {self.nframes}, Absorbing sites: {self.nabs_sites}. (Indexes for frames and abs_sites start at 0)" ) @@ -243,9 +267,10 @@ def read_structure(self, file): self.molecules = None self.mol = None self.nframes = 1 - self.cif = CifParser(self.file) + self.cif = CifParser(self.file, occupancy_tolerance=10, site_tolerance=5e-3) self.struct = Structure.from_file(self.file) - # self.struct = self.cif.get_structures()[0] #: NOT WORKING! + # self.struct = self.cif.parse_structures()[0] #: TODO + # self.struct = self.cif.get_structures()[0] #: DEPRECATED / NOT WORKING! logger.debug("structure created from a CIF file") elif self.file_ext == ".xyz": self.is_xyz = True @@ -368,7 +393,7 @@ def get_abs_sites(self): The lists inside the list contain the following respective information: [ - abs_idx, # index identifier of the absorber site + abs_idx, # index identifier of the absorber site # used by self.set_abs_site(). species_string, # The specie for absorber sites. frac_coords, # Fractional coordinate position @@ -617,6 +642,7 @@ def get_coord_envs(self): """ abs_sites = self.get_abs_sites() idx_abs_site = abs_sites[self.abs_site][-1] + coord_env_list = [] if self.is_cif: from pymatgen.analysis.bond_valence import BVAnalyzer @@ -641,7 +667,6 @@ def get_coord_envs(self): except ValueError: valences = "undefined" - coord_env_list = [] se = lgf.compute_structure_environments( max_cn=6, valences=valences, @@ -658,32 +683,26 @@ def get_coord_envs(self): strategy=strategy, structure_environments=se ) coord_env_ce = lse.coordination_environments[idx_abs_site] - ngbs_sites = lse._all_nbs_sites - coord_env_list.append( - [ - f"Coord. Env. for Site {abs_sites[self.abs_site][0]}", - coord_env_ce, - ngbs_sites, - ] - ) + coord_env = lse._all_nbs_sites if self.is_xyz: from pymatgen.analysis.local_env import CrystalNN obj = CrystalNN() - coord_env_list = [] + coord_env = obj.get_nn_info(self.struct, idx_abs_site) for site in coord_env: site["site"].cart_coords = self.struct[site["site_index"]].coords coord_dict = obj.get_cn_dict(self.struct, idx_abs_site) - coord_env_list.append( - [ - f"Coord. Env. for Site {abs_sites[self.abs_site][0]}", - {"ce_symbol": f"Elements Dict = {coord_dict}"}, - coord_env, - ] - ) + coord_env_ce = {"ce_symbol": f"Elements Dict = {coord_dict}"} + coord_env_list.append( + [ + f"Coord. Env. for Site {abs_sites[self.abs_site][0]}", + coord_env_ce, + coord_env, + ] + ) return coord_env_list def get_coord_envs_info(self): @@ -695,7 +714,11 @@ def get_coord_envs_info(self): abs_site_coord = self.get_abs_sites()[self.abs_site][4] elems_dist = [] - for site in coord_env[2]: + sites = coord_env[2] + if len(sites) == 0: + logger.warning("Empty coordination environment") + return None + for site in sites: if self.is_cif: coord_sym = [ coord_env[1][i]["ce_symbol"] for i in range(len(coord_env[1])) @@ -768,20 +791,28 @@ def make_cluster(self, radius): return atoms def make_input_fdmnes( - self, radius=7, parent_path=None, template=None, green=True, **kwargs + self, + parent_path: str = None, + newjob: Union[str, None] = None, + template: str = None, + radius: float = 7, + green: str = True, + **kwargs, ): """ Create a fdmnes input from a template. Arguments --------- - radius : float, [7] - radius for fdmnes calculation in Angstrom parent_path : str, [None] path to the parent directory where the input files are stored - if None it will create a temporary directory + (under `fdmnes` directory) + newjob : str or None [None] + if a string, a new job is created with the given description template : str, [None] full path to the template file + radius : float, [7] + radius for fdmnes calculation in Angstrom green : boolean [True] True: use `Green` (muffin-tin potentials, faster) False: use finite-difference method (slower) @@ -792,28 +823,52 @@ def make_input_fdmnes( Returns ------- None -> writes FDMNES input to disk - directory structure: {parent_path}/fdmnes/{self.file_name}/{self.abs_atom}/frame{self.frame}/site{self.abs_site}/ + directory structure: + + {parent_path} + |+ fdmnes + ||+ {file_name} + |||+ {abs_atom} + ||||+ frame{nframe} + |||||+ site{abs_site} + ||||||+ job{njob} """ - replacements = {} - replacements.update(**kwargs) - replacements["version"] = __version__ - - if template is None: - template = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "templates", "fdmnes.tmpl" - ) - if parent_path is None: - parent_path = self.folders["fdmnes"] - - self.outdir = os.path.join( - parent_path, + parent_path = self.parent_path + + if (newjob is not None) or (self.njob == 0): + self.njob += 1 + job = Job( + num=self.njob, + description=newjob, + type="fdmnes", + timestamp=_get_timestamp(), + path=None, + ) + self.jobs.append(job) + else: + job = self.jobs[self.njob - 1] + calc_path = os.path.join( + "fdmnes", self.file_name, self.abs_atom, f"frame{self.frame}", f"site{self.abs_site}", + f"job{self.njob}", ) + job.path = calc_path + self.outdir = os.path.join(parent_path, calc_path) + self.calc_path = calc_path + + if template is None: + template = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "templates", "fdmnes.tmpl" + ) + assert os.path.isfile(template), "wrong template path" + + replacements = {} + replacements.update(**kwargs) + replacements["version"] = __version__ method = "green" if green else "" absorber = "" @@ -928,7 +983,8 @@ def make_input_fdmnes( try: os.makedirs(self.outdir, mode=0o755) except FileExistsError: - pass + errmsg = f"`job{job.num}` already exists -> use `newjob` or remove it" + raise FileExistsError(errmsg) # Write the input file. fnout = os.path.join(self.outdir, "job_inp.txt") @@ -940,17 +996,40 @@ def make_input_fdmnes( with open(os.path.join(self.outdir, "fdmfile.txt"), "w") as fp: fp.write("1\njob_inp.txt") - logger.info(f"written FDMNES input -> {fnout}") + logger.info(f"written input for `/{job.path}`") + + def make_sbatch(self, template: str, **kwargs): + """Generates a SBATCH file (SLURM workload manager) using a template + + Arguments + --------- + template: str + path to the SBATCH template file + **kwargs + keyword arguments to be replaced in the template file + + Returns + ------- + None: writes `job.sbatch` + + """ + assert os.path.isfile(template), "wrong template path" + batch_script = os.path.join(self.outdir, "job.sbatch") + with open(batch_script, "w") as fp, open(template) as tp: + fp.write(tp.read().format(**kwargs)) + logger.info(f"written {fp.name}") + os.chmod(batch_script, 0o755) def make_input_feff( self, - radius=7, - parent_path=None, - template=None, - feff_comment="*", - edge="K", - sig2=None, - debye=None, + parent_path: str = None, + newjob: Union[str, None] = None, + template: str = None, + radius: float = 7, + feff_comment: str = "*", + edge: str = "K", + sig2: Union[float, None] = None, + debye: Union[List[float], None] = None, **kwargs, ): """ @@ -958,13 +1037,15 @@ def make_input_feff( Arguments --------- - radius : float, [7] - radius for feff calculation [Angstrom]. parent_path : str, [None] path to the parent directory where the input files are stored - if None it will create a temporary directory + (under "feff" directory) + newjob : str or None [None] + if a string, a new job is created with the given description template : str, [None] full path to the template file + radius : float, [7] + radius for feff calculation [Angstrom]. feff_coment : str, ["*"] comment character used in the input file sig2 : float or None, [None] @@ -982,21 +1063,42 @@ def make_input_feff( Returns ------- None -> writes FEFF input to disk - directory structure: {parent_path}/feff/{self.file_name}/{self.abs_atom}/frame{self.frame}/site{self.abs_site}/ + directory structure: + + {parent_path} + |+ feff + ||+ {file_name} + |||+ {abs_atom} + ||||+ frame{nframe} + |||||+ site{abs_site} + ||||||+ job{njob} """ - replacements = {} - replacements.update(**kwargs) - replacements["version"] = __version__ if parent_path is None: - parent_path = self.folders["feff"] - self.outdir = os.path.join( - parent_path, + parent_path = self.parent_path + + if (newjob is not None) or (self.njob == 0): + self.njob += 1 + job = Job( + num=self.njob, + description=newjob, + type="fdmnes", + timestamp=_get_timestamp(), + path=None, + ) + self.jobs.append(job) + else: + job = self.jobs[self.njob - 1] + calc_path = os.path.join( + "feff", self.file_name, self.abs_atom, f"frame{self.frame}", f"site{self.abs_site}", + f"job{self.njob}", ) + job.path = calc_path + self.outdir = os.path.join(parent_path, calc_path) + self.calc_path = calc_path if template is None: template = os.path.join( @@ -1004,6 +1106,11 @@ def make_input_feff( "templates", "feff_exafs.tmpl", ) + assert os.path.isfile(template), "wrong template path" + + replacements = {} + replacements.update(**kwargs) + replacements["version"] = __version__ if sig2 is None: use_sig2 = "*" @@ -1343,7 +1450,9 @@ def convolve_data( return group -def save_cif_from_mp(api_key, material_id, parent_path=None): +def save_cif_from_mp( + api_key: str, material_id: str, parent_path: str = None +) -> list[str, str]: """Collect a CIF file from the Materials Project Database, given the material id Parameters @@ -1353,8 +1462,8 @@ def save_cif_from_mp(api_key, material_id, parent_path=None): material id : str material id (format mp-xxxx) from Materials Project parent_path : str - path where to store the CIF files - if None, a temporary one is created + output where to store the CIF files + if None, user_larchdir + 'mp_structs' is used Returns ------- @@ -1377,7 +1486,7 @@ def save_cif_from_mp(api_key, material_id, parent_path=None): return [parent_path, outfn] -def save_mp_structure(api_key, material_id, parent_path=None): +def save_mp_structure(api_key: str, material_id: str, parent_path: str = None) -> str: """Save structure from Materials Project Database as json, given the material id Parameters @@ -1387,7 +1496,7 @@ def save_mp_structure(api_key, material_id, parent_path=None): material id : str material id (format mp-xxxx) from Materials Project parent_path : str - path where to store the Structure files + output path where to store the Structure files if None, user_larchdir + 'mp_structs' is used Returns