diff --git a/pyproject.toml b/pyproject.toml index 892312c..ebddc10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,8 @@ pysmiles = "^2.0.0" numba = "^0.61.2" numba-stats = "^1.10.1" optuna = "^4.3.0" +ms-deisotope = "^0.0.60" +pyopenms = "^3.1.0" [tool.poetry.group.dev.dependencies] jupyterlab = "^4.4.3" diff --git a/tests/test_deisotoping.py b/tests/test_deisotoping.py new file mode 100644 index 0000000..514fa75 --- /dev/null +++ b/tests/test_deisotoping.py @@ -0,0 +1,116 @@ +# test deisotoping and isotope generation + +import numpy as np +import pytest + +from vimms.Chemicals import Isotopes, Adducts +from vimms.Common import Formula, ADDUCT_TERMS, POSITIVE, PROTON_MASS +from vimms.Deisotoping import Deisotoper +from vimms.MassSpecUtils import adduct_transformation + + +def test_isotope_distribution_multi_element(): + formula = Formula("C10H16N2O2S") + isotopes = Isotopes(formula) + peaks = isotopes.get_isotopes(total_proportion=0.99) + + proportions = [peak[1] for peak in peaks] + mzs = [peak[0] for peak in peaks] + + assert len(peaks) > 1 + assert np.isclose(sum(proportions), 1.0, atol=1e-6) + assert all(mzs[i] < mzs[i + 1] for i in range(len(mzs) - 1)) + + +def test_isotope_distribution_chlorine_m2_peak(): + formula = Formula("C5H10Cl2") + isotopes = Isotopes(formula) + peaks = isotopes.get_isotopes(total_proportion=0.99) + + mono_mz = peaks[0][0] + deltas = [mz - mono_mz for mz, _, _ in peaks[1:]] + + assert any(np.isclose(delta, 1.997, atol=0.01) for delta in deltas) + + +def test_deisotoper_recovers_mono(): + formula = Formula("C10H16N2O2S") + isotopes = Isotopes(formula) + adducts = Adducts(formula, adduct_prior_dict={POSITIVE: {"M+H": 1.0}}) + adduct_name = adducts.get_adducts()[POSITIVE][0][0] + mul, add = ADDUCT_TERMS[adduct_name] + + peaks = [] + for mz, proportion, _ in isotopes.get_isotopes(total_proportion=0.99): + adducted_mz = adduct_transformation(mz, mul, add) + peaks.append((adducted_mz, proportion * 1e5)) + + deisotoper = Deisotoper(ppm_tolerance=10.0, max_charge=1, min_isotopes=2) + clusters = deisotoper.deisotope(peaks) + + assert len(clusters) == 1 + expected_mz = formula.mass + PROTON_MASS + assert np.isclose(clusters[0].monoisotopic_mz, expected_mz, atol=1e-3) + + +def test_deisotoper_handles_m_plus_2_only(): + peaks = [(100.0, 1e5), (101.997, 6e4)] + deisotoper = Deisotoper(ppm_tolerance=10.0, max_charge=1, min_isotopes=2) + clusters = deisotoper.deisotope(peaks) + + assert len(clusters) == 1 + assert np.isclose(clusters[0].monoisotopic_mz, 100.0, atol=1e-6) + + +def test_pyopenms_deisotope_helper(): + pytest.importorskip("pyopenms") + from vimms.Deisotoping import deisotope_with_pyopenms + + peaks = [(100.0, 1e5), (101.003355, 5e4), (102.00671, 2e4)] + _, mzs, intensities = deisotope_with_pyopenms(peaks, min_isopeaks=2, max_isopeaks=3) + + assert len(mzs) <= len(peaks) + assert len(mzs) == len(intensities) + + +def test_pyopenms_deadduct_helper(): + pytest.importorskip("pyopenms") + import pyopenms as oms + + from vimms.Deisotoping import deadduct_with_pyopenms + + fmap = oms.FeatureMap() + feature = oms.Feature() + feature.setMZ(100.0) + feature.setIntensity(1e5) + fmap.push_back(feature) + + output = deadduct_with_pyopenms(fmap, adducts=["[M+H]+"]) + assert output.size() >= 1 + + +def test_pyopenms_deadduct_multiple_features(): + pytest.importorskip("pyopenms") + import pyopenms as oms + + from vimms.Deisotoping import deadduct_with_pyopenms + + fmap = oms.FeatureMap() + for mz, intensity in [(100.0, 1e5), (122.989218, 4e4)]: + feature = oms.Feature() + feature.setMZ(mz) + feature.setIntensity(intensity) + fmap.push_back(feature) + + output = deadduct_with_pyopenms(fmap, adducts=["[M+H]+", "[M+Na]+"]) + assert output.size() >= 1 + + +def test_pyopenms_deisotope_empty_peaks(): + pytest.importorskip("pyopenms") + from vimms.Deisotoping import deisotope_with_pyopenms + + spectrum, mzs, intensities = deisotope_with_pyopenms([]) + assert spectrum is None + assert mzs.size == 0 + assert intensities.size == 0 diff --git a/vimms/Chemicals.py b/vimms/Chemicals.py index 3bd626b..d167513 100644 --- a/vimms/Chemicals.py +++ b/vimms/Chemicals.py @@ -10,8 +10,6 @@ from collections import deque import numpy as np -import scipy -import scipy.stats from loguru import logger from vimms.ChemicalSamplers import ( @@ -26,14 +24,14 @@ PROTON_MASS, POSITIVE, NEGATIVE, - C12_PROPORTION, C13_MZ_DIFF, - C, MONO, - C13, load_obj, ADDUCT_NAMES_POS, ADDUCT_NAMES_NEG, + ADDUCT_PRIOR_POS, + ADDUCT_PRIOR_NEG, + NATURAL_ISOTOPES, ) from vimms.Noise import GaussianPeakNoise from vimms.Roi import make_roi, RoiBuilderParams @@ -70,15 +68,21 @@ class Isotopes: A class to represent an isotope of a chemical """ - def __init__(self, formula): + def __init__(self, formula, min_prob=1e-12, max_peaks=20, max_states=4000, mass_precision=8): """ Create an Isotope object Args: formula: the formula for the given isotope """ self.formula = formula + self.min_prob = min_prob + self.max_peaks = max_peaks + self.max_states = max_states + self.mass_precision = mass_precision - def get_isotopes(self, total_proportion): + def get_isotopes( + self, total_proportion, min_prob=None, max_peaks=None, max_states=None, mass_precision=None + ): """ Gets the isotope total proportion @@ -87,68 +91,107 @@ def get_isotopes(self, total_proportion): Returns: the computed isotope total proportion - TODO: Add functionality for elements other than Carbon """ - peaks = [() for i in range(len(self._get_isotope_proportions(total_proportion)))] - for i in range(len(peaks)): - peaks[i] += (self._get_isotope_mz(self._get_isotope_names(i)),) - peaks[i] += (self._get_isotope_proportions(total_proportion)[i],) - peaks[i] += (self._get_isotope_names(i),) + peaks = [] + distributions = self._get_isotope_distribution( + total_proportion=total_proportion, + min_prob=self.min_prob if min_prob is None else min_prob, + max_peaks=self.max_peaks if max_peaks is None else max_peaks, + max_states=self.max_states if max_states is None else max_states, + mass_precision=self.mass_precision if mass_precision is None else mass_precision, + ) + base_mz = self.formula._get_mz() + for idx, (mass_shift, proportion) in enumerate(distributions): + name = MONO if idx == 0 else f"M+{idx}" + peaks.append((base_mz + mass_shift, proportion, name)) return peaks - def _get_isotope_proportions(self, total_proportion): - """ - Get isotope proportion by sampling from a binomial pmf - - Args: - total_proportion: the total proportion to compute - - Returns: the computed isotope total proportion - - """ - proportions = [] - while sum(proportions) < total_proportion: - proportions.extend( - [ - scipy.stats.binom.pmf( - len(proportions), self.formula._get_n_element(C), 1 - C12_PROPORTION - ) - ] + def _get_isotope_distribution( + self, total_proportion, min_prob=1e-12, max_peaks=20, max_states=4000, mass_precision=8 + ): + distribution = [(0.0, 1.0)] + for element, count in self.formula.atoms.items(): + if count <= 0: + continue + isotopes = NATURAL_ISOTOPES.get(element) + if not isotopes or len(isotopes) == 1: + continue + mono_mass = isotopes[0][0] + base_distribution = [(mass - mono_mass, abundance) for mass, abundance in isotopes] + element_distribution = self._power_distribution( + base_distribution, + count, + min_prob=min_prob, + max_states=max_states, + mass_precision=mass_precision, + ) + distribution = self._convolve_distributions( + distribution, + element_distribution, + min_prob=min_prob, + max_states=max_states, + mass_precision=mass_precision, ) - normalised_proportions = [ - proportions[i] / sum(proportions) for i in range(len(proportions)) - ] - return normalised_proportions - - def _get_isotope_names(self, isotope_number): - """ - Get the isotope name given the number, e.g. 0 is the monoisotope - Args: - isotope_number: the isotope number - - Returns: the isotope name - - """ - if isotope_number == 0: - return MONO - else: - return str(isotope_number) + C13 - - def _get_isotope_mz(self, isotope): - """ - Get the isotope m/z value - Args: - isotope: the isotope name - - Returns: the isotope m/z value - """ - if isotope == MONO: - return self.formula._get_mz() - elif isotope[-3:] == C13: - return self.formula._get_mz() + float(isotope.split(C13)[0]) * C13_MZ_DIFF - else: - return None + distribution = [(shift, prob) for shift, prob in distribution if prob >= min_prob] + distribution.sort(key=lambda x: x[0]) + + selected = [] + cumulative = 0.0 + for mass_shift, prob in distribution: + selected.append((mass_shift, prob)) + cumulative += prob + if cumulative >= total_proportion or len(selected) >= max_peaks: + break + + total = sum(prob for _, prob in selected) + if total == 0: + return [(0.0, 1.0)] + return [(shift, prob / total) for shift, prob in selected] + + def _power_distribution(self, base_distribution, count, min_prob, max_states, mass_precision): + if count == 1: + return base_distribution + result = [(0.0, 1.0)] + power = base_distribution + remaining = count + while remaining > 0: + if remaining % 2 == 1: + result = self._convolve_distributions( + result, + power, + min_prob=min_prob, + max_states=max_states, + mass_precision=mass_precision, + ) + remaining //= 2 + if remaining: + power = self._convolve_distributions( + power, + power, + min_prob=min_prob, + max_states=max_states, + mass_precision=mass_precision, + ) + return result + + def _convolve_distributions(self, left, right, min_prob, max_states, mass_precision): + new_distribution = {} + for left_shift, left_prob in left: + for right_shift, right_prob in right: + prob = left_prob * right_prob + if prob < min_prob: + continue + shift = left_shift + right_shift + key = round(shift, mass_precision) + new_distribution[key] = new_distribution.get(key, 0.0) + prob + if not new_distribution: + return [] + distribution = list(new_distribution.items()) + if len(distribution) > max_states: + distribution.sort(key=lambda x: x[1], reverse=True) + distribution = distribution[:max_states] + return distribution class Adducts: @@ -156,24 +199,43 @@ class Adducts: A class to represent an adduct of a chemical """ - def __init__(self, formula, adduct_proportion_cutoff=0.05, adduct_prior_dict=None): + def __init__( + self, + formula, + adduct_proportion_cutoff=0.05, + adduct_prior_dict=None, + adduct_profile=None, + adduct_concentration=15.0, + ): """ Create an Adduct class Args: formula: the formula of this adduct adduct_proportion_cutoff: proportion cut-off of the adduct - adduct_prior_dict: custom adduct dictionary, if any + adduct_prior_dict: custom adduct dictionary or callable, if any + adduct_profile: preset profile name or dict of adduct priors + adduct_concentration: dirichlet concentration for adduct sampling """ + if callable(adduct_prior_dict): + adduct_prior_dict = adduct_prior_dict(formula) + + if adduct_prior_dict is None and adduct_profile is not None: + from vimms.Common import ADDUCT_PROFILE_PRESETS + + if isinstance(adduct_profile, str): + adduct_prior_dict = ADDUCT_PROFILE_PRESETS.get(adduct_profile) + if adduct_prior_dict is None: + raise ValueError(f"Unknown adduct profile '{adduct_profile}'") + else: + adduct_prior_dict = adduct_profile + if adduct_prior_dict is None: self.adduct_names = {POSITIVE: ADDUCT_NAMES_POS, NEGATIVE: ADDUCT_NAMES_NEG} self.adduct_prior = { - POSITIVE: np.ones(len(self.adduct_names[POSITIVE])) * 0.1, - NEGATIVE: np.ones(len(self.adduct_names[NEGATIVE])) * 0.1, + POSITIVE: np.array([ADDUCT_PRIOR_POS.get(name, 0.05) for name in ADDUCT_NAMES_POS]), + NEGATIVE: np.array([ADDUCT_PRIOR_NEG.get(name, 0.05) for name in ADDUCT_NAMES_NEG]), } - # give more weight to the first one, i.e. M+H - self.adduct_prior[POSITIVE][0] = 1.0 - self.adduct_prior[NEGATIVE][0] = 1.0 else: assert POSITIVE in adduct_prior_dict or NEGATIVE in adduct_prior_dict self.adduct_names = {k: list(adduct_prior_dict[k].keys()) for k in adduct_prior_dict} @@ -182,6 +244,7 @@ def __init__(self, formula, adduct_proportion_cutoff=0.05, adduct_prior_dict=Non } self.formula = formula self.adduct_proportion_cutoff = adduct_proportion_cutoff + self.adduct_concentration = adduct_concentration def get_adducts(self): """ @@ -204,15 +267,17 @@ def _get_adduct_proportions(self): Returns: adduct proportion after sampling """ - # TODO: replace this with something proper proportions = {} for k in self.adduct_prior: - proportions[k] = np.random.dirichlet(self.adduct_prior[k]) - while max(proportions[k]) < 0.2: - proportions[k] = np.random.dirichlet(self.adduct_prior[k]) + alpha = self.adduct_prior[k] * self.adduct_concentration + alpha = np.where(alpha > 0, alpha, 0.001) + proportions[k] = np.random.dirichlet(alpha) proportions[k][np.where(proportions[k] < self.adduct_proportion_cutoff)] = 0 - proportions[k] = proportions[k] / max(proportions[k]) - proportions[k].tolist() + if proportions[k].sum() == 0: + proportions[k] = np.zeros_like(proportions[k]) + proportions[k][np.argmax(alpha)] = 1.0 + else: + proportions[k] = proportions[k] / proportions[k].sum() assert len(proportions[k]) == len(self.adduct_names[k]) return proportions @@ -625,6 +690,8 @@ def __init__( ms2_sampler=UniformMS2Sampler(), adduct_proportion_cutoff=0.05, adduct_prior_dict=None, + adduct_profile=None, + adduct_concentration=15.0, ): """ Create a mixture of [vimms.Chemicals.KnownChemical][] objects. @@ -642,6 +709,8 @@ def __init__( fragmentation spectra. adduct_proportion_cutoff: proportion of adduct cut-off adduct_prior_dict: custom adduct dictionary + adduct_profile: preset name or dict of adduct priors + adduct_concentration: dirichlet concentration for adduct sampling """ self.formula_sampler = formula_sampler self.rt_and_intensity_sampler = rt_and_intensity_sampler @@ -649,6 +718,8 @@ def __init__( self.ms2_sampler = ms2_sampler self.adduct_proportion_cutoff = adduct_proportion_cutoff self.adduct_prior_dict = adduct_prior_dict + self.adduct_profile = adduct_profile + self.adduct_concentration = adduct_concentration # if self.database is not None: # logger.debug('Sorting database compounds by masses') @@ -691,6 +762,8 @@ def sample(self, n_chemicals, ms_levels, include_adducts_isotopes=True): formula, self.adduct_proportion_cutoff, adduct_prior_dict=self.adduct_prior_dict, + adduct_profile=self.adduct_profile, + adduct_concentration=self.adduct_concentration, ) chemicals.append( diff --git a/vimms/Common.py b/vimms/Common.py index 322189c..56a3f09 100644 --- a/vimms/Common.py +++ b/vimms/Common.py @@ -79,24 +79,25 @@ ADDUCT_NAMES_POS = [ "M+H", - "[M+ACN]+H", - "[M+CH3OH]+H", + "M+NH4", "[M+NH3]+H", "M+Na", "M+K", - "M+2Na-H", "M+ACN+Na", + "M+2Na-H", "M+2K+H", + "[M+ACN]+H", + "[M+CH3OH]+H", "[M+DMSO]+H", "[M+2ACN]+H", "2M+H", - "M+ACN+Na", "2M+NH4", ] -ADDUCT_NAMES_NEG = ["M-H"] +ADDUCT_NAMES_NEG = ["M-H", "M+Cl", "M+FA-H", "M+Ac-H"] ADDUCT_TERMS = { "M+H": (1, PROTON_MASS), + "M+NH4": (1, 18.033823), "[M+ACN]+H": (1, 42.033823), "[M+CH3OH]+H": (1, 33.033489), "[M+NH3]+H": (1, 18.033823), @@ -110,12 +111,71 @@ "2M+H": (2, 1.007276), "2M+NH4": (2, 18), "M-H": (1, -PROTON_MASS), + "M+Cl": (1, 34.96885268), + "M+FA-H": (1, 44.998201), + "M+Ac-H": (1, 59.013851), } # example prior dictionary to be passed when creating an # adducts object to only get M+H adducts out ADDUCT_DICT_POS_MH = {POSITIVE: {"M+H": 1.0}} +ADDUCT_PRIOR_POS = { + "M+H": 1.0, + "M+NH4": 0.3, + "[M+NH3]+H": 0.15, + "M+Na": 0.25, + "M+K": 0.15, + "M+ACN+Na": 0.05, + "M+2Na-H": 0.03, + "M+2K+H": 0.02, + "[M+ACN]+H": 0.08, + "[M+CH3OH]+H": 0.05, + "[M+DMSO]+H": 0.03, + "[M+2ACN]+H": 0.02, + "2M+H": 0.04, + "2M+NH4": 0.02, +} +ADDUCT_PRIOR_NEG = { + "M-H": 1.0, + "M+Cl": 0.2, + "M+FA-H": 0.35, + "M+Ac-H": 0.15, +} + +ADDUCT_PROFILE_PRESETS = { + "default": {POSITIVE: ADDUCT_PRIOR_POS, NEGATIVE: ADDUCT_PRIOR_NEG}, + "esi_positive": {POSITIVE: ADDUCT_PRIOR_POS}, + "esi_negative": {NEGATIVE: ADDUCT_PRIOR_NEG}, +} + +NATURAL_ISOTOPES = { + "C": [(12.0, 0.9893), (13.0033548378, 0.0107)], + "H": [(1.00782503214, 0.999885), (2.01410177812, 0.000115)], + "N": [(14.00307400524, 0.99636), (15.00010889888, 0.00364)], + "O": [ + (15.9949146221, 0.99757), + (16.9991317565, 0.00038), + (17.9991596129, 0.00205), + ], + "S": [ + (31.97207069, 0.9499), + (32.9714585, 0.0075), + (33.96786683, 0.0425), + (35.96708088, 0.0001), + ], + "Cl": [(34.96885268, 0.7576), (36.96590259, 0.2424)], + "Br": [(78.9183376, 0.5069), (80.9162906, 0.4931)], + "Si": [ + (27.9769265327, 0.92223), + (28.976494700, 0.04685), + (29.97377017, 0.03092), + ], + "P": [(30.973761512, 1.0)], + "F": [(18.998403205, 1.0)], + "I": [(126.904468, 1.0)], +} + ATOM_NAMES = ["C", "H", "N", "O", "P", "S", "Cl", "I", "Br", "Si", "F", "D"] ATOM_MASSES = { "C": 12.00000000000, diff --git a/vimms/Deisotoping.py b/vimms/Deisotoping.py new file mode 100644 index 0000000..bfc68f1 --- /dev/null +++ b/vimms/Deisotoping.py @@ -0,0 +1,281 @@ +from dataclasses import dataclass +from typing import Iterable, List, Tuple + +import numpy as np + +from vimms.Common import C13_MZ_DIFF, NATURAL_ISOTOPES + + +@dataclass(frozen=True) +class IsotopeCluster: + monoisotopic_mz: float + charge: int + peak_indices: Tuple[int, ...] + + +class Deisotoper: + def __init__( + self, + ppm_tolerance=10.0, + max_charge=3, + min_isotopes=2, + isotope_mass_diffs=None, + max_relative_intensity_increase=1.5, + max_relative_intensity_increase_heavy=3.0, + heavy_isotope_threshold=1.5, + ): + self.ppm_tolerance = ppm_tolerance + self.max_charge = max_charge + self.min_isotopes = min_isotopes + self.isotope_mass_diffs = ( + tuple(isotope_mass_diffs) + if isotope_mass_diffs is not None + else self._default_isotope_mass_diffs() + ) + self.max_relative_intensity_increase = max_relative_intensity_increase + self.max_relative_intensity_increase_heavy = max_relative_intensity_increase_heavy + self.heavy_isotope_threshold = heavy_isotope_threshold + + def deisotope(self, peaks: Iterable[Tuple[float, float]]) -> List[IsotopeCluster]: + peaks = np.array(list(peaks), dtype=float) + if peaks.size == 0: + return [] + + mzs = peaks[:, 0] + intensities = peaks[:, 1] + order = np.argsort(mzs) + mzs = mzs[order] + intensities = intensities[order] + + assigned = np.full(len(mzs), False) + clusters = [] + + for idx in range(len(mzs)): + if assigned[idx]: + continue + mz = mzs[idx] + charge = self._guess_charge(mzs, mz, idx) + cluster_indices = self._grow_cluster(mzs, intensities, idx, charge) + + if len(cluster_indices) >= self.min_isotopes: + for ci in cluster_indices: + assigned[ci] = True + clusters.append( + IsotopeCluster( + monoisotopic_mz=mz, + charge=charge, + peak_indices=tuple(order[cluster_indices]), + ) + ) + + return clusters + + def _guess_charge(self, mzs: np.ndarray, mz: float, idx: int) -> int: + best_charge = 1 + best_match = None + for charge in range(1, self.max_charge + 1): + match = self._find_isotope_peak(mzs, idx + 1, mz, charge, isotope_idx=1) + if match is None: + continue + _, _, delta = match + if best_match is None or delta < best_match: + best_match = delta + best_charge = charge + return best_charge + + def _grow_cluster( + self, mzs: np.ndarray, intensities: np.ndarray, start_idx: int, charge: int + ) -> List[int]: + cluster_indices = [start_idx] + isotope_idx = 1 + prev_intensity = intensities[start_idx] + while True: + match = self._find_isotope_peak( + mzs, cluster_indices[-1] + 1, mzs[start_idx], charge, isotope_idx + ) + if match is None: + break + match_idx, match_diff, _ = match + max_increase = self.max_relative_intensity_increase + if match_diff >= self.heavy_isotope_threshold: + max_increase = self.max_relative_intensity_increase_heavy + if intensities[match_idx] > prev_intensity * max_increase: + break + cluster_indices.append(match_idx) + prev_intensity = intensities[match_idx] + isotope_idx += 1 + return cluster_indices + + def _find_isotope_peak( + self, mzs: np.ndarray, start_idx: int, base_mz: float, charge: int, isotope_idx: int + ) -> Tuple[int, float, float] | None: + best = None + for diff in self.isotope_mass_diffs: + target = base_mz + (diff / charge) * isotope_idx + match_idx = self._find_peak(mzs, target, start_idx) + if match_idx is None: + continue + delta = abs(mzs[match_idx] - target) + if best is None or delta < best[2]: + best = (match_idx, diff, delta) + return best + + def _find_peak(self, mzs: np.ndarray, target: float, start_idx: int) -> int | None: + if start_idx >= len(mzs): + return None + left = start_idx + right = len(mzs) - 1 + while left <= right: + mid = (left + right) // 2 + if mzs[mid] < target: + left = mid + 1 + else: + right = mid - 1 + candidates = [] + for idx in (left - 1, left, left + 1): + if 0 <= idx < len(mzs): + candidates.append(idx) + for idx in candidates: + if self._ppm_error(mzs[idx], target) <= self.ppm_tolerance: + return idx + return None + + @staticmethod + def _ppm_error(mz: float, target: float) -> float: + return abs(mz - target) / target * 1e6 + + @staticmethod + def _default_isotope_mass_diffs( + min_abundance: float = 0.0005, max_shift: float = 4.0 + ) -> Tuple[float, ...]: + diffs = {round(C13_MZ_DIFF, 6)} + for isotopes in NATURAL_ISOTOPES.values(): + if len(isotopes) <= 1: + continue + mono_mass = isotopes[0][0] + for mass, abundance in isotopes[1:]: + if abundance < min_abundance: + continue + diff = mass - mono_mass + if 0 < diff <= max_shift: + diffs.add(round(diff, 6)) + return tuple(sorted(diffs)) + + +def deisotope_with_ms_deisotope( + peaks: Iterable[Tuple[float, float]], + charge_range: Tuple[int, int] = (1, 3), + averagine: str = "peptide", + ms1_tolerance: float = 10.0, +): + """ + Deisotope peaks using the optional ms_deisotope package. + + Args: + peaks: iterable of (mz, intensity) pairs. + charge_range: inclusive min/max charge range. + averagine: averagine model name used by ms_deisotope. + ms1_tolerance: ppm tolerance for isotopic matching. + + Returns: + The ms_deisotope deconvolution result object. + """ + from ms_deisotope.deconvolution import deconvolute_peaks + + peaks_array = np.array(list(peaks), dtype=float) + if peaks_array.size == 0: + return [] + + return deconvolute_peaks( + peaks_array, + charge_range=charge_range, + averagine=averagine, + ms1_tolerance=ms1_tolerance, + ) + + +def deisotope_with_pyopenms( + peaks: Iterable[Tuple[float, float]], + fragment_tolerance: float = 10.0, + fragment_unit_ppm: bool = True, + min_charge: int = 1, + max_charge: int = 3, + keep_only_deisotoped: bool = True, + min_isopeaks: int = 2, + max_isopeaks: int = 10, + make_single_charged: bool = False, +): + """ + Deisotope peaks using pyopenms Deisotoper on an MS1-like spectrum. + + Args: + peaks: iterable of (mz, intensity) pairs. + fragment_tolerance: m/z tolerance for matching isotopic peaks. + fragment_unit_ppm: whether the tolerance is in ppm. + min_charge: minimum charge to consider. + max_charge: maximum charge to consider. + keep_only_deisotoped: keep only deisotoped peaks in the output. + min_isopeaks: minimum number of isotopic peaks in a cluster. + max_isopeaks: maximum number of isotopic peaks in a cluster. + make_single_charged: convert all features to single charge if True. + + Returns: + Tuple of (spectrum, peak_mzs, peak_intensities) after deisotoping. + """ + import pyopenms as oms + + peaks_array = np.array(list(peaks), dtype=float) + if peaks_array.size == 0: + return None, np.array([]), np.array([]) + + spectrum = oms.MSSpectrum() + spectrum.set_peaks((peaks_array[:, 0], peaks_array[:, 1])) + + oms.Deisotoper.deisotopeAndSingleCharge( + spectrum, + fragment_tolerance, + fragment_unit_ppm, + min_charge, + max_charge, + keep_only_deisotoped, + min_isopeaks, + max_isopeaks, + make_single_charged, + ) + + mzs, intensities = spectrum.get_peaks() + return spectrum, np.array(mzs), np.array(intensities) + + +def deadduct_with_pyopenms( + feature_map, + adducts: List[str] | None = None, + max_charge: int = 3, + ppm_tolerance: float = 10.0, +): + """ + De-adduct features using pyopenms MetaboliteAdductDecharger. + + Args: + feature_map: pyopenms FeatureMap containing detected features. + adducts: list of adduct strings (e.g. ["[M+H]+", "[M+Na]+"]) or None for defaults. + max_charge: maximum charge to consider. + ppm_tolerance: mass tolerance used for matching (ppm). + + Returns: + De-adducted pyopenms FeatureMap. + """ + import pyopenms as oms + + decharger = oms.MetaboliteAdductDecharger() + params = decharger.getParameters() + params.setValue("mass_error_ppm", ppm_tolerance) + params.setValue("charge_max", max_charge) + if adducts: + adduct_info = oms.AdductInfo() + adduct_info.setAdducts(adducts) + decharger.setAdducts(adduct_info) + decharger.setParameters(params) + output = oms.FeatureMap() + decharger.decharge(feature_map, output) + return output