Skip to content

Commit

Permalink
chore: fix test and processing
Browse files Browse the repository at this point in the history
  • Loading branch information
MyPyDavid committed Dec 10, 2023
1 parent 6dc5254 commit 79af303
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 95 deletions.
2 changes: 1 addition & 1 deletion src/raman_fitting/deconvolution_models/base_peak.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def _main():
)
models[model_name] = lmfit_comp_model
print(lmfit_comp_model)

Check warning on line 250 in src/raman_fitting/deconvolution_models/base_peak.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/deconvolution_models/base_peak.py#L249-L250

Added lines #L249 - L250 were not covered by tests
breakpoint()
# breakpoint()


if __name__ == "__main__":
Expand Down
165 changes: 91 additions & 74 deletions src/raman_fitting/processing/baseline_subtraction.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,108 @@
import logging
from typing import Optional

import numpy as np
from scipy.stats import linregress

from .splitter import get_default_spectrum_windows
from .splitter import get_default_spectrum_windows, split_spectrum_data_in_windows
from .spectrum_template import SpectrumData

logger = logging.getLogger(__name__)


def subtract_baseline_per_window(spec, label, windows_data, window_limits):
ramanshift = spec.ramanshift
intensity = spec.intensity
window_name = spec.window_name
if not ramanshift.any():
return intensity, (0, 0)

Check warning on line 18 in src/raman_fitting/processing/baseline_subtraction.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/baseline_subtraction.py#L18

Added line #L18 was not covered by tests
# breakpoint()
lbl_1st_order = list(filter(lambda x: "1st_order" in x, windows_data.keys()))
if any(i in label for i in ("full", "norm")) and lbl_1st_order:
i_fltrd_dspkd_fit = windows_data.get(lbl_1st_order[0]).intensity
else:
i_fltrd_dspkd_fit = intensity
window_config = window_limits.get(window_name)

bl_linear = linregress(
ramanshift[[0, -1]],
[
np.mean(i_fltrd_dspkd_fit[0 : window_config.min]),
np.mean(i_fltrd_dspkd_fit[window_config.max : :]),
],
)
i_blcor = intensity - (bl_linear[0] * ramanshift + bl_linear[1])
return i_blcor, bl_linear


def get_normalization_factor(data, norm_method="simple") -> float:
try:
if norm_method == "simple":
normalization_intensity = np.nanmax(data["normalization"].intensity)
elif norm_method == "fit":
raise NotImplementedError("NormalizeFit not yet implemented")

Check warning on line 43 in src/raman_fitting/processing/baseline_subtraction.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/baseline_subtraction.py#L42-L43

Added lines #L42 - L43 were not covered by tests
# IDEA not implemented
# normalization = NormalizeFit(
# self.blcorr_data["1st_order"], plotprint=False
# ) # IDEA still implement this NormalizeFit
# normalization_intensity = normalization["IG"]
else:
logger.warning(f"unknown normalization method {norm_method}")
normalization_intensity = 1

Check warning on line 51 in src/raman_fitting/processing/baseline_subtraction.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/baseline_subtraction.py#L50-L51

Added lines #L50 - L51 were not covered by tests
except Exception as exc:
logger.error(f"normalization error {exc}")
normalization_intensity = 1

return normalization_intensity


def normalize_data(windows_data, norm_factor, label: Optional[str] = None) -> dict:
ret = {}
for window_name, spec in windows_data.items():
norm_label = f"norm_blcorr_{window_name}"
if label:
norm_label = f"{label}_{norm_label}"

Check warning on line 64 in src/raman_fitting/processing/baseline_subtraction.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/baseline_subtraction.py#L64

Added line #L64 was not covered by tests
_data = SpectrumData(
spec.ramanshift, spec.intensity * norm_factor, norm_label, window_name
)
ret.update(**{window_name: _data})
return ret


def subtract_loop(windows_data: dict, window_limits: dict, label=None):
_blcorr = {}
_info = {}
for window_name, spec in windows_data.items():
blcorr_int, blcorr_lin = subtract_baseline_per_window(
spec, window_name, windows_data, window_limits
)
label = f"blcorr_{window_name}"
if label:
label = f"{label}_{label}"
_data = SpectrumData(spec.ramanshift, blcorr_int, label, window_name)
_blcorr.update(**{window_name: _data})
_info.update(**{window_name: blcorr_lin})
return _blcorr, _info


class BaselineSubtractorNormalizer:
"""
For baseline subtraction as well as normalization of a spectrum
"""

def __init__(self, *args, **kws):
self.split_spectrum_data_in_windows()
self.windowlimits = get_default_spectrum_windows()
blcorr_data, blcorr_info = self.subtract_loop()
def __init__(self, ramanshift: np.array, intensity: np.array, label: str = None):
self._ramanshift = ramanshift
self._intensity = intensity
self._label = label
self.windows_data = split_spectrum_data_in_windows(
ramanshift=ramanshift, intensity=intensity, label=label
)
self.window_limits = get_default_spectrum_windows()
blcorr_data, blcorr_info = subtract_loop(
self.windows_data, self.window_limits, label=self._label
)
self.blcorr_data = blcorr_data
self.blcorr_info = blcorr_info
normalization_intensity = self.get_normalization_factor()
normalization_intensity = get_normalization_factor(self.blcorr_data)
self.norm_factor = 1 / normalization_intensity
self.norm_data = self.normalize_data(self.blcorr_data, self.norm_factor)

def subtract_loop(self):
_blcorr = {}
_info = {}
for windowname, spec in self.windows_data.items():
blcorr_int, blcorr_lin = self.subtract_baseline_per_window(windowname, spec)
label = f"blcorr_{windowname}"
if self.label:
label = f"{self.label}_{label}"
_data = self.data(spec.ramanshift, blcorr_int, label)
_blcorr.update(**{windowname: _data})
_info.update(**{windowname: blcorr_lin})
return _blcorr, _info

def subtract_baseline_per_window(self, windowname, spec):
rs = spec.ramanshift
if not rs.any():
return spec.intensity, (0, 0)

if windowname[0:4] in ("full", "norm"):
i_fltrd_dspkd_fit = self.windows_data.get("1st_order").intensity
else:
i_fltrd_dspkd_fit = spec.intensity
_limits = self.windowlimits.get(windowname)

bl_linear = linregress(
rs[[0, -1]],
[
np.mean(i_fltrd_dspkd_fit[0 : _limits[0]]),
np.mean(i_fltrd_dspkd_fit[_limits[1] : :]),
],
)
i_blcor = spec.intensity - (bl_linear[0] * rs + bl_linear[1])
return i_blcor, bl_linear

def get_normalization_factor(self, norm_method="simple") -> float:
try:
if norm_method == "simple":
normalization_intensity = np.nanmax(
self.blcorr_data["normalization"].intensity
)
elif norm_method == "fit":
raise NotImplementedError("NormalizeFit not yet implemented")
# IDEA not implemented
# normalization = NormalizeFit(
# self.blcorr_data["1st_order"], plotprint=False
# ) # IDEA still implement this NormalizeFit
# normalization_intensity = normalization["IG"]
else:
logger.warning(f"unknown normalization method {norm_method}")
normalization_intensity = 1
except Exception as exc:
logger.error(f"normalization error {exc}")
normalization_intensity = 1

return normalization_intensity

def normalize_data(self, data, norm_factor) -> dict:
ret = {}
for windowname, spec in data.items():
label = f"norm_blcorr_{windowname}"
if self.label:
label = f"{self.label}_{label}"

_data = self.data(spec.ramanshift, spec.intensity * self.norm_factor, label)
ret.update(**{windowname: _data})
return ret
self.norm_data = normalize_data(self.blcorr_data, self.norm_factor)
12 changes: 8 additions & 4 deletions src/raman_fitting/processing/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ def split_data(self, on_lbl="filtered"):
def spectrum_methods_delegator(self):
for method, on_lbl, out_lbl in POST_PROCESS_METHODS:
try:
breakpoint()
# breakpoint()
getattr(self, method)(on_lbl=on_lbl, out_lbl=out_lbl)
except Exception as exc:
logger.error(

Check warning on line 33 in src/raman_fitting/processing/post_processing.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/post_processing.py#L32-L33

Added lines #L32 - L33 were not covered by tests
f"spectrum_methods_delegator, {self._qcnm} {method} failed for {self.file} with {exc}"
)
raise exc from exc

Check warning on line 36 in src/raman_fitting/processing/post_processing.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/post_processing.py#L36

Added line #L36 was not covered by tests

self.set_clean_data_df()
self.set_df_from_register()
Expand All @@ -54,7 +55,10 @@ def baseline_correction(self, on_lbl="despiked", out_lbl="clean_data"):
_r, _int, _lbl = self.register.get(on_lbl)
_baseline_corrected = BaselineSubtractorNormalizer(_r, _int, label="despiked")
self._baseline_corrected = _baseline_corrected

_fullspec = _baseline_corrected.norm_data["full"]
self.register_spectrum(_fullspec.ramanshift, _fullspec.intensity, out_lbl)
full_keys = list(
filter(lambda x: x.endswith("full"), _baseline_corrected.norm_data)
)
if full_keys:
_fullspec = _baseline_corrected.norm_data[full_keys[0]]
self.register_spectrum(_fullspec.ramanshift, _fullspec.intensity, out_lbl)
self.clean_data = _baseline_corrected.norm_data
3 changes: 3 additions & 0 deletions src/raman_fitting/processing/spectrum_constructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def load_data_delegator(self):
self.info = {**self.info, **self.run_kwargs}

def set_clean_data_df(self):
if self.clean_data is None:
return

Check warning on line 97 in src/raman_fitting/processing/spectrum_constructor.py

View check run for this annotation

Codecov / codecov/patch

src/raman_fitting/processing/spectrum_constructor.py#L97

Added line #L97 was not covered by tests

self.clean_df = {
k: pd.DataFrame(
{"ramanshift": val.ramanshift, f"int_{self.SamplePos}": val.intensity}
Expand Down
10 changes: 10 additions & 0 deletions src/raman_fitting/processing/spectrum_template.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections import namedtuple
from dataclasses import dataclass

import numpy as np

SpecTemplate = namedtuple("Spectrum", "ramanshift intensity label")


@dataclass
class SpectrumData:
ramanshift: np.array
intensity: np.array
label: str
window_name: str


class SpectrumTemplate:
def __init__(self, spec_name="spectrum_info"):
self.spec_name = spec_name
Expand Down
38 changes: 22 additions & 16 deletions src/raman_fitting/processing/splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
import numpy as np

from pydantic import BaseModel
from .spectrum_template import SpectrumData

SPECTRUM_WINDOWS = [
{"name": "full", "min": 200, "max": 3600},
{"name": "full_1st_2nd", "min": 800, "max": 3500},
{"name": "low", "min": 150, "max": 850, "extra_margin": 10},
{"name": "1st_order", "min": 900, "max": 2000},
{"name": "mid", "min": 1850, "max": 2150, "extra_margin": 10},
{"name": "2nd_order", "min": 2150, "max": 3380},
{"name": "normalization", "min": 1500, "max": 1675, "extra_margin": 10},
]
SPECTRUM_WINDOWS = {
"full": {"min": 200, "max": 3600},
"full_1st_2nd": {"min": 800, "max": 3500},
"low": {"min": 150, "max": 850, "extra_margin": 10},
"1st_order": {"min": 900, "max": 2000},
"mid": {"min": 1850, "max": 2150, "extra_margin": 10},
"2nd_order": {"min": 2150, "max": 3380},
"normalization": {"min": 1500, "max": 1675, "extra_margin": 10},
}


class SpectrumWindow(BaseModel):
Expand All @@ -23,14 +24,14 @@ class SpectrumWindow(BaseModel):

def get_default_spectrum_windows() -> Dict[str, SpectrumWindow]:
windows = {}
for window_config in SPECTRUM_WINDOWS:
windows[window_config["name"]] = SpectrumWindow(**window_config)
for window_name, window_config in SPECTRUM_WINDOWS.items():
windows[window_name] = SpectrumWindow(name=window_name, **window_config)
return windows


def split_spectrum_data_in_windows(
ramanshift, intensity, spec_windows=None, label=None
) -> Dict:
) -> Dict[str, SpectrumData]:
"""
For splitting of spectra into the several SpectrumWindows,
the names of the windows are taken from SpectrumWindows
Expand All @@ -40,12 +41,17 @@ def split_spectrum_data_in_windows(
if spec_windows is None:
spec_windows = get_default_spectrum_windows()
windows_data = {}
for windowname, window in spec_windows.items():
for window_name, window in spec_windows.items():
# find indices of window in ramanshift array
ind = (ramanshift >= np.min(window.min)) & (ramanshift <= np.max(window.max))
window_lbl = f"window_{windowname}"
window_lbl = f"window_{window_name}"
if label is not None:
window_lbl = f"{label}_{window_lbl}"
_data = {"ramanshift": ramanshift[ind], "intensity": intensity[ind]}
windows_data[window_lbl] = _data
_data = {
"ramanshift": ramanshift[ind],
"intensity": intensity[ind],
"label": window_lbl,
"window_name": window_name,
}
windows_data[window_lbl] = SpectrumData(**_data)
return windows_data

0 comments on commit 79af303

Please sign in to comment.