diff --git a/src/raman_fitting/delegating/main_delegator.py b/src/raman_fitting/delegating/main_delegator.py index 28f1326..3a962b4 100644 --- a/src/raman_fitting/delegating/main_delegator.py +++ b/src/raman_fitting/delegating/main_delegator.py @@ -21,7 +21,7 @@ from raman_fitting.exports.exporter import ExportManager from raman_fitting.imports.files.file_indexer import ( RamanFileIndex, - initialize_index, + initialize_index_from_source_files, groupby_sample_group, groupby_sample_id, IndexSelector, @@ -62,7 +62,7 @@ class MainDelegator: fit_model_specific_names: Sequence[str] | None = None sample_IDs: Sequence[str] = field(default_factory=list) sample_groups: Sequence[str] = field(default_factory=list) - index: RamanFileIndex = field(default_factory=initialize_index) + index: RamanFileIndex = field(default_factory=initialize_index_from_source_files) selection: Sequence[RamanFileInfo] = field(init=False) selected_models: Sequence[RamanFileInfo] = field(init=False) @@ -125,7 +125,7 @@ def select_fitting_model( try: return self.lmfit_models[window_name][model_name] except KeyError as exc: - raise ValueError(f"Model {window_name} {model_name} not found.") from exc + raise KeyError(f"Model {window_name} {model_name} not found.") from exc def main_run(self): selection = self.select_samples_from_index() @@ -214,4 +214,4 @@ def make_examples(): if __name__ == "__main__": - RamanIndex = make_examples() + example_run = make_examples() diff --git a/src/raman_fitting/imports/files/file_indexer.py b/src/raman_fitting/imports/files/file_indexer.py index 07f4069..86e357f 100644 --- a/src/raman_fitting/imports/files/file_indexer.py +++ b/src/raman_fitting/imports/files/file_indexer.py @@ -1,6 +1,8 @@ """ Indexer for raman data files """ from itertools import groupby, filterfalse from typing import List, Sequence +from typing import TypeAlias + from pathlib import Path from pydantic import ( @@ -16,58 +18,62 @@ from raman_fitting.imports.collector import collect_raman_file_infos from raman_fitting.imports.models import RamanFileInfo -from .utils import load_dataset_from_file +from raman_fitting.imports.files.utils import load_dataset_from_file from loguru import logger from tablib import Dataset +RamanFileInfoSet: TypeAlias = Sequence[RamanFileInfo] + + class RamanFileIndex(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - source: NewPath | FilePath = Field(None, validate_default=False) - raman_files: Sequence[RamanFileInfo] = Field(None) + index_file: NewPath | FilePath = Field(None, validate_default=False) + raman_files: RamanFileInfoSet = Field(None) dataset: Dataset = Field(None) force_reload: bool = Field(True, validate_default=False) @model_validator(mode="after") def read_or_load_data(self) -> "RamanFileIndex": - if not any([self.source, self.raman_files, self.dataset]): + if not any([self.index_file, self.raman_files, self.dataset]): raise ValueError("Not all fields should be empty.") - if self.source is not None: - if self.source.exists() and not self.force_reload: - self.dataset = load_dataset_from_file(self.source) + if self.index_file is not None: + if self.index_file.exists() and not self.force_reload: + self.dataset = load_dataset_from_file(self.index_file) self.raman_files = parse_dataset_to_index(self.dataset) return self - elif self.source.exists() and self.force_reload: - logger.info( - f"Index source file {self.source} exists and will be overwritten." + elif self.index_file.exists() and self.force_reload: + logger.warning( + f"Index index_file file {self.index_file} exists and will be overwritten." ) - elif not self.source.exists() and self.force_reload: + elif not self.index_file.exists() and self.force_reload: logger.info( - "Index source file does not exists but was asked to reload from it." + "Index index_file file does not exists but was asked to reload from it." ) - elif not self.source.exists() and not self.force_reload: + elif not self.index_file.exists() and not self.force_reload: pass else: - logger.debug("Index source file not provided.") + logger.debug("Index file not provided, index will not be persisted.") if self.raman_files is not None: - if self.dataset is None: - self.dataset = cast_raman_files_to_dataset(self.raman_files) - else: - dataset_rf = cast_raman_files_to_dataset(self.raman_files) + dataset_rf = cast_raman_files_to_dataset(self.raman_files) + if self.dataset is not None: assert ( dataset_rf == self.dataset - ), "Both dataset and raman_files provider but are different." - if self.dataset is not None: - self.raman_files = parse_dataset_to_index(self.dataset) - elif self.dataset is None and self.raman_files is None: - raise ValueError( - "Index error, both raman_files and dataset are not provided." - ) + ), "Both dataset and raman_files provided and they are different." + self.dataset = dataset_rf + else: + if self.dataset is not None: + self.raman_files = parse_dataset_to_index(self.dataset) + else: + raise ValueError( + "Index error, both raman_files and dataset are not provided." + ) + return self def cast_raman_files_to_dataset(raman_files: List[RamanFileInfo]) -> Dataset: @@ -157,7 +163,7 @@ def select_index( def collect_raman_file_index_info( raman_files: Sequence[Path] | None = None, **kwargs -) -> List[RamanFileInfo]: +) -> RamanFileInfoSet: """loops over the files and scrapes the index data from each file""" if not raman_files: raman_files = list(settings.internal_paths.example_fixtures.glob("*.txt")) @@ -166,7 +172,9 @@ def collect_raman_file_index_info( return index -def initialize_index(files: Sequence[Path] | None = None, force_reload: bool = False): +def initialize_index_from_source_files( + files: Sequence[Path] | None = None, force_reload: bool = False +) -> RamanFileIndex: index_file = settings.destination_dir.joinpath("index.csv") raman_files = collect_raman_file_index_info(raman_files=files) index_data = { @@ -199,4 +207,4 @@ def main(): if __name__ == "__main__": - RamanIndex = main() + main() diff --git a/src/raman_fitting/imports/models.py b/src/raman_fitting/imports/models.py index f34e4d9..58e8aac 100644 --- a/src/raman_fitting/imports/models.py +++ b/src/raman_fitting/imports/models.py @@ -6,7 +6,7 @@ ConfigDict, ) -from .samples.sample_id_helpers import parse_sample_from_filepath +from .samples.sample_id_helpers import extract_sample_metadata_from_filepath from .files.metadata import FileMetaData, get_file_metadata from .files.index_helpers import get_filename_id_from_path @@ -29,7 +29,7 @@ def set_filename_id(self) -> "RamanFileInfo": @model_validator(mode="after") def parse_and_set_sample_from_file(self) -> "RamanFileInfo": - sample = parse_sample_from_filepath(self.file) + sample = extract_sample_metadata_from_filepath(self.file) self.sample = sample return self @@ -38,13 +38,3 @@ def parse_and_set_metadata_from_filepath(self) -> "RamanFileInfo": file_metadata = get_file_metadata(self.file) self.file_metadata = FileMetaData(**file_metadata) return self - - -# def extra_assign_export_dir_on_index(result_dir, index: List[RamanFileInfo]): -# """assign the DestDir column to index and sets column values as object type""" -# _index = [] - -# for rf_info in index: -# rf_info.export_dir = result_dir.joinpath(rf_info.sample.group) -# _index.append(rf_info) -# return _index diff --git a/src/raman_fitting/imports/samples/sample_id_helpers.py b/src/raman_fitting/imports/samples/sample_id_helpers.py index 315c11e..a417cdb 100644 --- a/src/raman_fitting/imports/samples/sample_id_helpers.py +++ b/src/raman_fitting/imports/samples/sample_id_helpers.py @@ -35,66 +35,72 @@ def parse_string_to_sample_id_and_position( _lensplit = len(split) if _lensplit == 0: - sID, position = split[0], 0 + sample_id, position = split[0], 0 elif len(split) == 1: - sID, position = split[0], 0 + sample_id, position = split[0], 0 elif len(split) == 2: - sID = split[0] + sample_id = split[0] _pos_strnum = "".join(i for i in split[1] if i.isnumeric()) if _pos_strnum: position = int(_pos_strnum) else: position = split[1] elif len(split) >= 3: - sID = "_".join(split[0:-1]) + sample_id = "_".join(split[0:-1]) position = int("".join(filter(str.isdigit, split[-1]))) - return (sID, position) + return (sample_id, position) -def sID_to_sgrpID(sID: str, max_len=4) -> str: +def extract_sample_group_from_sample_id(sample_id: str, max_len=4) -> str: """adding the extra sample Group key from sample ID""" - _len = len(sID) + _len = len(sample_id) _maxalphakey = min( - [n for n, i in enumerate(sID) if not str(i).isalpha()], default=_len + [n for n, i in enumerate(sample_id) if not str(i).isalpha()], default=_len ) _maxkey = min((_len, _maxalphakey, max_len)) - sgrpID = "".join([i for i in sID[0:_maxkey] if i.isalpha()]) - return sgrpID + sample_group_id = "".join([i for i in sample_id[0:_maxkey] if i.isalpha()]) + return sample_group_id -def overwrite_sID_from_mapper(sID: str, mapper: dict) -> str: - """Takes an sID and potentially overwrites from a mapper dict""" - _sID_map = mapper.get(sID, None) - if _sID_map: - sID = _sID_map - return sID +def overwrite_sample_id_from_mapper(sample_id: str, mapper: dict) -> str: + """Takes an sample_id and potentially overwrites from a mapper dict""" + sample_id_map = mapper.get(sample_id) + if sample_id_map is not None: + return sample_id_map + return sample_id -def overwrite_sgrpID_from_parts(parts: List[str], sgrpID: str, mapper: dict) -> str: +def overwrite_sample_group_id_from_parts( + parts: List[str], sample_group_id: str, mapper: dict +) -> str: for k, val in mapper.items(): if k in parts: - sgrpID = val - return sgrpID + sample_group_id = val + return sample_group_id -def parse_sample_from_filepath( +def extract_sample_metadata_from_filepath( filepath: Path, sample_name_mapper: Optional[Dict[str, Dict[str, str]]] = None ) -> SampleMetaData: - """parse the sID, position and sgrpID from stem""" + """parse the sample_id, position and sgrpID from stem""" stem = filepath.stem parts = filepath.parts - sID, position = parse_string_to_sample_id_and_position(stem) + sample_id, position = parse_string_to_sample_id_and_position(stem) if sample_name_mapper is not None: sample_id_mapper = sample_name_mapper.get("sample_id", {}) - sID = overwrite_sID_from_mapper(sID, sample_id_mapper) - sgrpID = sID_to_sgrpID(sID) + sample_id = overwrite_sample_id_from_mapper(sample_id, sample_id_mapper) + sample_group_id = extract_sample_group_from_sample_id(sample_id) if sample_name_mapper is not None: sample_grp_mapper = sample_name_mapper.get("sample_group_id", {}) - sgrpID = overwrite_sgrpID_from_parts(parts, sgrpID, sample_grp_mapper) + sample_group_id = overwrite_sample_group_id_from_parts( + parts, sample_group_id, sample_grp_mapper + ) - sample = SampleMetaData(**{"id": sID, "group": sgrpID, "position": position}) + sample = SampleMetaData( + **{"id": sample_id, "group": sample_group_id, "position": position} + ) return sample diff --git a/src/raman_fitting/interfaces/cli.py b/src/raman_fitting/interfaces/cli.py index ddf7cd1..0a0f1d9 100644 --- a/src/raman_fitting/interfaces/cli.py +++ b/src/raman_fitting/interfaces/cli.py @@ -4,6 +4,8 @@ import argparse from raman_fitting.config.settings import RunModes +from loguru import logger + _RUN_MODES = ["normal", "testing", "debug", "make_index", "make_examples"] @@ -88,6 +90,6 @@ def main(): extra_kwargs.update( {"fit_model_specific_names": ["2peaks", "3peaks", "4peaks"]} ) - print(f"CLI args: {args}") + logger.info(f"Starting raman_fitting with CLI args:\n{args}") kwargs = {**vars(args), **extra_kwargs} _main_run = rf.MainDelegator(**kwargs) diff --git a/src/raman_fitting/models/deconvolution/base_model.py b/src/raman_fitting/models/deconvolution/base_model.py index 388cbf8..e5ebc0b 100644 --- a/src/raman_fitting/models/deconvolution/base_model.py +++ b/src/raman_fitting/models/deconvolution/base_model.py @@ -153,7 +153,6 @@ def get_models_and_peaks_from_definitions( def main(): models = get_models_and_peaks_from_definitions() - # breakpoint() print("Models: ", len(models)) diff --git a/src/raman_fitting/models/fit_models.py b/src/raman_fitting/models/fit_models.py index af696c2..75028b6 100644 --- a/src/raman_fitting/models/fit_models.py +++ b/src/raman_fitting/models/fit_models.py @@ -1,10 +1,7 @@ -import datetime as dt import logging -from collections import OrderedDict, namedtuple from typing import Dict import time -import pandas as pd from pydantic import BaseModel, model_validator, Field, ConfigDict from lmfit import Model as LMFitModel from lmfit.model import ModelResult @@ -47,8 +44,8 @@ def match_window_names(self): return self def run_fit(self) -> ModelResult: - if not self.fit_kwargs: - self.fit_kwargs.update(**{"method": "leastsq"}) + if "method" not in self.fit_kwargs: + self.fit_kwargs["method"] = "leastsq" lmfit_model = self.model.lmfit_model start_time = time.time() fit_result = run_fit(lmfit_model, self.spectrum, **self.fit_kwargs) @@ -79,118 +76,6 @@ def run_fit( return out -# TODO refactor this PrepareParams class -class PrepareParams: - fit_result_template = namedtuple( - "FitResult", - [ - "FitComponents", - "FitParameters", - "FitReport", - "extrainfo", - "model_name", - "raw_data_col", - ], - ) - ratio_params = [("I", "_height"), ("A", "_amplitude")] - _standard_2nd_order = "2nd_4peaks" - - def __init__(self, model_result, extra_fit_results={}): - self.extra_fit_results = extra_fit_results - self.model_result = model_result - - """ - Takes the ModelResult class instance from lmfit. - Optional extra functionality with a list of instances. - """ - self.result = {} - self.peaks = set( - [i.prefix for i in self.comps] - ) # peaks is prefix from components - - self.make_result() - - def make_result(self): - self.prep_params() - self.prep_components() - self.FitReport = self.model_result.fit_report(show_correl=False) - - self.extra_info = {} - self.prep_extra_info() - self.FitResult = self.fit_result_template( - self.FitComponents, - self.FitParameters, - self.FitReport, - self.extra_info, - self.model_name_lbl, - self.raw_data_lbl, - ) - - def prep_extra_info(self): - self.extra_info = {} - _destfitcomps = self.model_result._info["DestFittingComps"] - _model_destdir = _destfitcomps.joinpath( - f'{self.model_name_lbl}_{self.model_result._info["SampleID"]}' - ) - self.extra_info = { - **self.model_result._info, - **{"DestFittingModel": _model_destdir}, - } - - def prep_params(self): - try: - self.add_ratio_params() - except Exception as e: - logger.error(f"{self._qcnm} extra prep params failed\n\t{e}\n") - - self.result.update( - {"_run_date_YmdH": dt.datetime.now().strftime(format="%Y-%m-%d %H:00")} - ) - self.FitParameters = pd.DataFrame(self.result, index=[self.model_name_lbl]) - - def prep_components(self): - # FittingParams = pd.DataFrame(fit_params_od,index=[peak_model]) - _fit_comps_data = OrderedDict({"RamanShift": self.model_result.userkws["x"]}) - _fit_comps_data.update(self.model_result.eval_components()) - - # IDEA take out - # print('===/n',self.model_result, '/n') - # print('===/n',self.model_result.__dict__.keys(), '/n') - - _fit_comps_data.update( - { - self.model_name_lbl: self.model_result.best_fit, - "residuals": self.model_result.residual, - self.model_result._int_lbl: self.model_result.data, - } - ) - FittingComps = pd.DataFrame(_fit_comps_data) - self.FitComponents = FittingComps - - -def NormalizeFit(model: LMFitModel, norm_cleaner, plotprint=False): # pragma: no cover - # IDEA: optional add normalization seperately to Fitter - x, y = norm_cleaner.spec.ramanshift, norm_cleaner.blcorr_desp_intensity - # Model = InitializeModels("2peaks normalization Lorentzian") - params = model.make_params() - pre_fit = model.fit(y, params, x=x) # 'leastsq' - IG, ID = pre_fit.params["G_height"].value, pre_fit.params["D_height"].value - output = { - "factor": 1 / IG, - "ID/IG": ID / IG, - "ID": ID, - "IG": IG, - "G_center": pre_fit.params["G_center"].value, - "D_center": pre_fit.params["D_center"].value, - "Model": model, - } - # pre_fit = Model.fit(y,params ,x=x,method='differential-evolution') # 'leastsq' - if plotprint: - pre_fit.plot() - print(pre_fit.fit_report()) - return output - - if __name__ == "__main__": from raman_fitting.config.settings import settings