Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 71 additions & 5 deletions alphaquant/cluster/cluster_missingval.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,60 @@

PVALUE_THRESHOLD_FOR_INTENSITY_BASED_COUNTING = 0.1

# Determines at which level missing value testing is performed.
# Set once based on tree structure, then reused.
MISSINGVAL_TEST_LEVEL = None


def determine_missingval_test_level(root_node):
"""Determine the appropriate level for missing value statistical testing.

Inspects the tree structure rooted at *root_node* and sets the module-level
global ``MISSINGVAL_TEST_LEVEL`` to one of:

* ``"mod_seq_charge"`` -- fragment-level data where mod_seq_charge nodes exist.
* ``"base"`` -- all other hierarchies (precursor-only, peptide-only, gene-only).

Scenarios:
1) ``mod_seq_charge`` nodes exist in the tree -> test at ``mod_seq_charge`` level.
2) Leaf parent type is ``mod_seq`` -> test at ``base`` ion level.
3) Leaf parent type is ``seq`` -> test at ``base`` ion level.
4) Leaf parent type is ``gene`` -> test at ``base`` ion level.

Args:
root_node (anytree.Node): Root of a protein tree. Must have at least
one leaf with a parent.

Raises:
ValueError: If the leaf parent type does not match any expected pattern.

Side effects:
Sets the module-level global ``MISSINGVAL_TEST_LEVEL``.
"""
global MISSINGVAL_TEST_LEVEL

# Check if mod_seq_charge nodes exist (fragment-level data)
mod_seq_charge_nodes = anytree.search.findall(root_node, filter_=lambda node: node.type == "mod_seq_charge")
if len(mod_seq_charge_nodes) > 0:
MISSINGVAL_TEST_LEVEL = "mod_seq_charge"
return

# For all other cases, check what's one level above leaves
leaf_parent_type = root_node.leaves[0].parent.type

if leaf_parent_type == "mod_seq":
# Scenario 2: charged peptides without fragments
MISSINGVAL_TEST_LEVEL = "base"
elif leaf_parent_type == "seq":
# Scenario 3: peptides without charge info
MISSINGVAL_TEST_LEVEL = "base"
elif leaf_parent_type == "gene":
# Scenario 4: simplest hierarchy, leaves directly under gene
MISSINGVAL_TEST_LEVEL = "base"
else:
raise ValueError(f"Unexpected tree structure: leaf parent type is '{leaf_parent_type}'. "
f"Expected one of: 'mod_seq', 'seq', 'gene', or tree with 'mod_seq_charge' nodes.")

def create_protnode_from_missingval_ions(gene_name,diffions, normed_c1, normed_c2):
return MissingValProtNodeCreator(gene_name, diffions, normed_c1, normed_c2).prot_node

Expand Down Expand Up @@ -76,11 +130,21 @@ def _assign_properties_to_missingval_base_ions(self, root_node):


@staticmethod
def _get_nodes_to_test(root_node): #get the nodes in the lowest level that is relevant for the binomial test
if root_node.leaves[0].parent.type == "mod_seq": #when AlphaQuant works with precursors only (not fragments), the precursors themselves are the "base ions" and the "mod_seq_charge" node does not exist
return root_node.children
else:
def _get_nodes_to_test(root_node):
"""Get the nodes at which to perform the missing value statistical test.

Uses MISSINGVAL_TEST_LEVEL which is set once based on tree structure.
"""
global MISSINGVAL_TEST_LEVEL

# Set the test level if not already determined
if MISSINGVAL_TEST_LEVEL is None:
determine_missingval_test_level(root_node)

if MISSINGVAL_TEST_LEVEL == "mod_seq_charge":
return anytree.search.findall(root_node, filter_=lambda node: node.type == "mod_seq_charge")
else: # "base"
return root_node.leaves


def _propagate_properties_to_nodes_to_test(self,nodes_to_test): #goes through each node to test and merges the properties from it's base to the node itself
Expand Down Expand Up @@ -134,7 +198,9 @@ def _aggregate_node_properties_missingval(self, node):
node.c1_has_values = any(child.c1_has_values for child in childs)
node.c2_has_values = any(child.c2_has_values for child in childs)
if hasattr(childs[0], "z_val"):
node.z_val = aq_cluster_utils.sum_and_re_scale_zvalues([child.z_val for child in childs])
node.z_val = aq_cluster_utils.sum_and_re_scale_zvalues(
[child.z_val for child in childs]
)
node.p_val = aq_cluster_utils.transform_znormed_to_pval(node.z_val)


Expand Down
177 changes: 133 additions & 44 deletions alphaquant/diffquant/background_distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,160 @@
from numba import njit
from statistics import NormalDist
import alphaquant.diffquant.diffutils as aqdiffutils

import alphaquant.config.variables as aqvariables




class ConditionBackgrounds():
"""Orchestrates background distribution calculation.

For single-pool mode, delegates to one ``_BackgroundCalculation``.
When ``split_by_ion_type=True`` and the config defines both fragment ions
and MS1 isotopes, runs a separate calculation per ion type and combines
the resulting per-ion dicts.
"""

def __init__(self, normed_condition_df, p2z, ion2varscore=None, split_by_ion_type=False):
"""Initialise condition backgrounds.

def __init__(self, normed_condition_df, p2z):
self.backgrounds = []
Args:
normed_condition_df (pd.DataFrame): Normalised intensity matrix
(ions x samples) for one condition.
p2z (dict): Pre-computed p-value to z-value lookup.
ion2varscore (dict | None): Optional per-ion variance-predictor
scores used for sorting ions before background partitioning.
When None, ions are sorted by median intensity.
split_by_ion_type (bool): If True **and** the current config
defines both fragment ions and MS1 isotopes, build separate
background pools for each ion type. Defaults to False.
"""
self.ion2background = {}
self.ion2nonNanvals = {}
self.ion2allvals = {}
self.idx2ion = {}
self.init_ion2nonNanvals(normed_condition_df)
self.context_ranges = []
self.select_intensity_ranges(p2z)

if split_by_ion_type and self._has_multiple_ion_types():
self._build_split(normed_condition_df, p2z, ion2varscore)
else:
self._build_single(normed_condition_df, p2z, ion2varscore)

self.all_intensities = np.concatenate(list(self.ion2nonNanvals.values()))
self.num_replicates = len(next(iter(self.ion2allvals.values())))

def _build_single(self, normed_condition_df, p2z, ion2varscore):
"""Build backgrounds from a single pool of all ions."""
calc = _BackgroundCalculation(normed_condition_df, p2z, ion2varscore)
self._update_backgrounds(calc)

def _build_split(self, normed_condition_df, p2z, ion2varscore):
"""Run a separate calculation for fragment ions and MS1 isotopes.

Falls back to single-pool when either subset has fewer than 10 ions.
"""
ion_type_group = self._split_by_ion_type(normed_condition_df.index)
if ion_type_group["FRGION"].sum() < 10 or ion_type_group["MS1ISOTOPES"].sum() < 10:
self._build_single(normed_condition_df, p2z, ion2varscore)
return

for marker, mask in ion_type_group.items():
sub_df = normed_condition_df.loc[mask].copy()
LOGGER.info(f"Building background for ion type '{marker}' ({len(sub_df)} ions)")
calc = _BackgroundCalculation(sub_df, p2z, ion2varscore=ion2varscore)
self._update_backgrounds(calc)

def _update_backgrounds(self, calc):
"""Merge results from a ``_BackgroundCalculation`` into this instance."""
self.ion2background.update(calc.ion2background)
self.ion2nonNanvals.update(calc.ion2nonNanvals)
self.ion2allvals.update(calc.ion2allvals)

@staticmethod
def _has_multiple_ion_types():
"""Return True if the config defines both fragment ions and MS1 isotopes."""
if aqvariables.CONFIG_DICT is None:
return False
ion_hierarchy = aqvariables.CONFIG_DICT.get("ion_hierarchy", {})
return "fragion" in ion_hierarchy and "ms1iso" in ion_hierarchy

@staticmethod
def _split_by_ion_type(index):
"""Split an index into fragment-ion and MS1-isotope subsets."""
index_str = np.array([str(x) for x in index])
frgion_mask = np.array(["FRGION" in s for s in index_str])
ms1_mask = np.array(["MS1ISOTOPES" in s for s in index_str])
return {"FRGION": frgion_mask, "MS1ISOTOPES": ms1_mask}


class _BackgroundCalculation():
"""Computes background distributions for a single pool of ions.

Sorts the ions (by variance-predictor score or median intensity),
partitions them into overlapping intensity ranges, and creates a
BackGroundDistribution for each range.

After construction the following dicts are populated:
``ion2background``, ``ion2nonNanvals``, ``ion2allvals``.
"""

def __init__(self, normed_condition_df, p2z, ion2varscore=None):
self.ion2background = {}
self.ion2nonNanvals = {}
self.ion2allvals = {}

self._sort_and_index(normed_condition_df, ion2varscore)
self._create_background_distributions(p2z)

def _sort_and_index(self, normed_condition_df, ion2varscore):
"""Sort ions and build index-to-ion mappings.

def init_ion2nonNanvals(self, normed_condition_df):
normed_condition_df['median'] = normed_condition_df.median(numeric_only=True, axis=1)
normed_condition_df = normed_condition_df.sort_values(by='median').drop('median', axis=1)
self.normed_condition_df = normed_condition_df
#nonan_array = get_nonna_array(normed_condition_df.to_numpy())
#self.ion2nonNanvals = dict(zip(normed_condition_df.index, nonan_array))
When *ion2varscore* is provided, ions are sorted by their
variance-predictor score (rank-based). Otherwise, falls back to
sorting by row-wise median intensity.

After sorting, populates ``ion2nonNanvals``, ``ion2allvals``, and
the private ``_idx2ion`` mapping.

Args:
normed_condition_df (pd.DataFrame): Normalised intensity matrix
(ions x samples).
ion2varscore (dict | None): Mapping from ion id to a combined
variance-predictor score, or None for median-intensity sorting.
"""
if ion2varscore is not None:
sort_scores = normed_condition_df.index.map(
lambda x: ion2varscore.get(x, 0.5)
)
normed_condition_df = normed_condition_df.assign(
_sort_score=sort_scores
).sort_values(by='_sort_score').drop('_sort_score', axis=1)
else:
normed_condition_df = normed_condition_df.assign(
median=normed_condition_df.median(numeric_only=True, axis=1)
).sort_values(by='median').drop('median', axis=1)
self._normed_condition_df = normed_condition_df
self.ion2nonNanvals = aqutils.get_non_nas_from_pd_df(normed_condition_df)
self.ion2allvals = aqutils.get_ionints_from_pd_df(normed_condition_df)
self.idx2ion = dict(zip(range(len(normed_condition_df.index)), normed_condition_df.index))
self._idx2ion = dict(zip(range(len(normed_condition_df.index)), normed_condition_df.index))

def _create_background_distributions(self, p2z):
"""Partition sorted ions into overlapping intensity ranges and build backgrounds.

def select_intensity_ranges(self, p2z):
Creates ``BackGroundDistribution`` objects for overlapping windows
of ions and assigns every ion to one of these distributions via
``self.ion2background``.

Args:
p2z (dict): Pre-computed p-value to z-value lookup (passed through
to ``BackGroundDistribution``).
"""
total_available_comparisons =0
num_contexts = 10
cumulative_counts = np.zeros(self.normed_condition_df.shape[0])
num_contexts = aqvariables.NUM_BG_CONTEXTS
cumulative_counts = np.zeros(self._normed_condition_df.shape[0])

for idx ,count in enumerate(self.normed_condition_df.count(axis=1)):
for idx ,count in enumerate(self._normed_condition_df.count(axis=1)):
total_available_comparisons+=count-1
cumulative_counts[idx] = int(total_available_comparisons/2)


#assign the context sizes
context_size = np.max([1000, int(total_available_comparisons/(1+num_contexts/2))])
if context_size> total_available_comparisons:
context_size = int(total_available_comparisons/2)
Expand All @@ -64,28 +174,20 @@ def select_intensity_ranges(self, p2z):
middle_idx = int(np.searchsorted(cumulative_counts, halfcontext_size))
end_idx = int(np.searchsorted(cumulative_counts, context_size))


context_boundaries[0] = 0
context_boundaries[1] = middle_idx
context_boundaries[2] = end_idx
while context_boundaries[1] < len(cumulative_counts):
bgdist = BackGroundDistribution(context_boundaries[0], context_boundaries[2], self.ion2nonNanvals, self.idx2ion, p2z)
self.context_ranges.append([context_boundaries[0], context_boundaries[2]])
self.assign_ions2bgdists(context_boundaries[0], context_boundaries[2], bgdist)
self.backgrounds.append(bgdist)
bgdist = BackGroundDistribution(context_boundaries[0], context_boundaries[2], self.ion2nonNanvals, self._idx2ion, p2z)
for idx in range(context_boundaries[0], context_boundaries[2]):
self.ion2background[self._idx2ion[idx]] = bgdist
context_boundaries[0] = context_boundaries[1]
context_boundaries[1] = context_boundaries[2]
end_idx = np.searchsorted(cumulative_counts, context_size + cumulative_counts[context_boundaries[0]])
if end_idx > len(cumulative_counts)-(context_boundaries[1]-context_boundaries[0])/1.5:
end_idx = len(cumulative_counts)
context_boundaries[2] = end_idx

def assign_ions2bgdists(self, boundaries1, boundaries2, bgdist):
ion2bg_local = {} #dict(map(lambda _idx : (self.normed_condition_df.index.values[_idx], bgdist), range(boundaries1, boundaries2)))
for idx in range(boundaries1, boundaries2):
ion2bg_local.update({self.idx2ion.get(idx) : bgdist})
self.ion2background.update(ion2bg_local)

# Cell
import numpy as np
import random
Expand Down Expand Up @@ -451,10 +553,6 @@ def get_doublediff_bg(deed_ion1, deed_ion2, deedpair2doublediffdist, p2z):

return subtr_bg

def invert_deedkey(deedkey):
return (deedkey[1], deedkey[0])


# Cell
from numba import njit

Expand Down Expand Up @@ -509,12 +607,3 @@ def transform_cumulative_into_fc2count(cumulative, min_fc):
fcs, counts = _transform_cumulative_vectorized(cumulative, min_fc)
return dict(zip(fcs, counts))

# Cell
@njit
def get_cumul_from_freq(freq):
res = np.zeros(len(freq), dtype=np.int64)
res[0] = freq[0]
for i in range(1,len(freq)):
res[i] = res[i-1] + freq[i]

return res
Loading
Loading