Skip to content

Commit 3a714e3

Browse files
authored
Merge pull request #114 from yueqixuan/dev
fix: improve quantms convert
2 parents 3d4d085 + 49f3bb1 commit 3a714e3

File tree

5 files changed

+160
-247
lines changed

5 files changed

+160
-247
lines changed

quantmsio/core/quantms/feature.py

Lines changed: 38 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,9 @@
88

99
# MsstatsIN functionality now available in MzTabIndexer
1010
from quantmsio.core.quantms.mztab import MzTabIndexer
11-
from quantmsio.core.quantms.psm import Psm
12-
from quantmsio.core.sdrf import SDRFHandler
13-
from quantmsio.operate.tools import get_protein_accession, get_ahocorasick
11+
from quantmsio.operate.tools import get_protein_accession
1412
from quantmsio.utils.file_utils import (
15-
close_file,
1613
extract_protein_list,
17-
save_slice_file,
1814
ParquetBatchWriter,
1915
)
2016

@@ -61,11 +57,17 @@ def extract_psm_msg(self, protein_str=None):
6157
"""
6258
# Get PSMs from the indexer
6359
try:
60+
self.logger.info("Get PSMs from the indexer.")
6461
psms_df = self._indexer.get_psms()
6562
if psms_df.empty:
6663
return {}, {}
6764

65+
self.logger.info("Get metadata from the indexer.")
6866
metadata_df = self._indexer.get_metadata()
67+
68+
self.logger.info(
69+
"Calling get_ms_run_reference_map to retrieve MS run reference map."
70+
)
6971
ms_reference_map = self.get_ms_run_reference_map(psms_df, metadata_df)
7072
psms_df["reference_file_name"] = psms_df["spectra_ref_file"].map(
7173
ms_reference_map
@@ -81,6 +83,7 @@ def extract_psm_msg(self, protein_str=None):
8183
pep_dict = {}
8284

8385
# Create mapping dictionaries
86+
self.logger.info("Start creating mapping dictionaries.")
8487
for _, row in psms_df.iterrows():
8588
# Map key: (reference_file_name, peptidoform, precursor_charge)
8689
reference_file_name = row.get("reference_file_name", "")
@@ -116,6 +119,8 @@ def extract_psm_msg(self, protein_str=None):
116119
scan_info[1] if len(scan_info) > 1 else "",
117120
]
118121

122+
self.logger.info("Finished creating mapping dictionaries.")
123+
119124
return map_dict, pep_dict
120125

121126
except Exception as e:
@@ -367,6 +372,17 @@ def transform_msstats_in(self, file_num=10, protein_str=None):
367372
]
368373

369374
if not batch.empty:
375+
376+
# Unique peptide indicator from PSM Table
377+
unique_peptide_df = self._indexer.get_unique_from_psm_table()
378+
379+
batch = pd.merge(
380+
batch,
381+
unique_peptide_df,
382+
on=["pg_accessions", "peptidoform"],
383+
how="left",
384+
)
385+
370386
# Aggregate data to create feature-level records with intensities array
371387
aggregated_features = self._aggregate_msstats_to_features(
372388
batch, experiment_type
@@ -412,7 +428,7 @@ def _aggregate_msstats_to_features(self, msstats_batch, experiment_type):
412428
if "channel" in row and row["channel"] is not None
413429
else ("LFQ" if experiment_type == "LFQ" else "Unknown")
414430
),
415-
"intensity": float(row.get("Intensity", 0.0)),
431+
"intensity": float(row.get("intensity", 0.0)),
416432
}
417433
intensities.append(intensity_entry)
418434

@@ -428,6 +444,7 @@ def _aggregate_msstats_to_features(self, msstats_batch, experiment_type):
428444
"pg_accessions": [protein_name] if protein_name else [],
429445
"anchor_protein": protein_name or "",
430446
"rt": first_row.get("rt", None),
447+
"unique": float(first_row.get("unique", 1)),
431448
# Will add more fields in subsequent processing steps
432449
}
433450

@@ -471,8 +488,16 @@ def merge_psm(rows, index):
471488
)
472489

473490
def generate_feature(self, file_num=10, protein_str=None):
491+
492+
feature_count = 0
474493
for msstats in self.generate_feature_report(file_num, protein_str):
475494
feature = self.transform_feature(msstats)
495+
496+
feature_count += len(feature)
497+
self.logger.info(
498+
f"Generated {len(feature)} features, the total to {feature_count}."
499+
)
500+
476501
yield feature
477502

478503
def generate_feature_report(self, file_num=10, protein_str=None):
@@ -525,34 +550,6 @@ def merge_psm(rows, index):
525550
axis=1,
526551
)
527552

528-
@staticmethod
529-
def slice(df, partitions):
530-
cols = df.columns
531-
if not isinstance(partitions, list):
532-
raise Exception(f"{partitions} is not a list")
533-
if len(partitions) == 0:
534-
raise Exception(f"{partitions} is empty")
535-
for partion in partitions:
536-
if partion not in cols:
537-
raise Exception(f"{partion} does not exist")
538-
for key, df in df.groupby(partitions):
539-
yield key, df
540-
541-
def generate_slice_feature(
542-
self,
543-
partitions,
544-
file_num=10,
545-
protein_str=None,
546-
duckdb_max_memory="16GB",
547-
duckdb_threads=4,
548-
):
549-
for msstats in self.generate_feature_report(
550-
file_num, protein_str, duckdb_max_memory, duckdb_threads
551-
):
552-
for key, df in self.slice(msstats, partitions):
553-
feature = self.transform_feature(df)
554-
yield key, feature
555-
556553
@staticmethod
557554
def transform_feature(df):
558555
return pa.Table.from_pandas(df, schema=FEATURE_SCHEMA)
@@ -577,10 +574,7 @@ def write_feature_to_file(
577574
try:
578575
for feature_df in self.generate_feature(file_num, protein_str):
579576
if feature_df.num_rows > 0:
580-
# The schema is applied when creating the table
581-
feature_df = feature_df.to_pandas()
582-
records = feature_df.to_dict("records")
583-
batch_writer.write_batch(records)
577+
batch_writer.write_batch(feature_df.to_pylist())
584578
finally:
585579
batch_writer.close()
586580

@@ -592,36 +586,6 @@ def write_feature_to_file(
592586
# Clean up the temporary MzTabIndexer
593587
self._indexer.cleanup_duckdb()
594588

595-
# def write_features_to_file(
596-
# self,
597-
# output_folder,
598-
# filename,
599-
# partitions,
600-
# file_num=10,
601-
# protein_file=None,
602-
# duckdb_max_memory="16GB",
603-
# duckdb_threads=4,
604-
# ):
605-
# logger = logging.getLogger("quantmsio.core.feature")
606-
607-
# # Log input and output paths
608-
# logger.info(f"Input mzTab file: {self._indexer._mztab_path}")
609-
# logger.info(f"Output folder: {output_folder}")
610-
# logger.info(f"Base filename: {filename}")
611-
# if protein_file:
612-
# logger.info(f"Protein filter file: {protein_file}")
613-
614-
# pqwriters = {}
615-
# protein_list = extract_protein_list(protein_file) if protein_file else None
616-
# protein_str = "|".join(protein_list) if protein_list else None
617-
# for key, feature in self.generate_slice_feature(
618-
# partitions, file_num, protein_str, duckdb_max_memory, duckdb_threads
619-
# ):
620-
# pqwriters = save_slice_file(
621-
# feature, pqwriters, output_folder, key, filename
622-
# )
623-
# close_file(pqwriters)
624-
625589
@staticmethod
626590
def generate_best_scan(rows, pep_dict):
627591
key = (rows["peptidoform"], rows["precursor_charge"])
@@ -679,7 +643,6 @@ def add_additional_msg(self, msstats, pep_dict):
679643
msstats.loc[:, "ion_mobility"] = None
680644
msstats.loc[:, "start_ion_mobility"] = None
681645
msstats.loc[:, "stop_ion_mobility"] = None
682-
msstats.loc[:, "unique"] = None # Will be set based on protein mapping
683646

684647
def _extract_sequence_from_peptidoform(self, peptidoform):
685648
"""Extract plain sequence from peptidoform by removing modifications"""
@@ -839,8 +802,13 @@ def convert_to_parquet_format(res):
839802
for col in complex_columns:
840803
if col in res.columns:
841804
# Ensure proper structure for complex fields
842-
if col == "intensities" or col == "modifications":
805+
if (
806+
col == "intensities"
807+
or col == "modifications"
808+
or col == "additional_scores"
809+
):
843810
res[col] = res[col].apply(Feature._ensure_list_type)
811+
844812
elif col == "file_metadata":
845813
# file_metadata should be a dict for each record
846814
res[col] = res[col].apply(Feature._ensure_dict_type)

quantmsio/core/quantms/mztab.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import pandas as pd
1313
import pyarrow as pa
1414
import pyarrow.parquet as pq
15+
import re
1516

1617
# Local application imports
1718
from quantmsio.core.common import OPENMS_PROTEIN_QVALUE_WORDS
@@ -2929,3 +2930,50 @@ def optimize_database(self):
29292930
self.vacuum_database()
29302931

29312932
self.logger.info("Database optimization completed")
2933+
2934+
def get_unique_from_psm_table(self):
2935+
database_query = """
2936+
SELECT COUNT(DISTINCT database) AS database_count
2937+
FROM psms;
2938+
"""
2939+
2940+
if self._duckdb.execute(database_query).fetchone()[0] > 1:
2941+
print(
2942+
"Cannot calculate unique values when multiple databases are present in the PSM table."
2943+
)
2944+
2945+
else:
2946+
unique_query = """
2947+
SELECT DISTINCT
2948+
accession,
2949+
"opt_global_cv_MS:1000889_peptidoform_sequence" AS peptidoform,
2950+
"unique"
2951+
FROM psms;
2952+
"""
2953+
2954+
unique_peptide = self._duckdb.execute(unique_query).df()
2955+
2956+
unique_peptide["pg_accessions"] = unique_peptide["accession"].apply(
2957+
dedup_accession
2958+
)
2959+
unique_peptide = (
2960+
unique_peptide[["pg_accessions", "peptidoform", "unique"]]
2961+
.drop_duplicates()
2962+
.reset_index(drop=True)
2963+
)
2964+
2965+
return unique_peptide
2966+
2967+
2968+
def dedup_accession(accession_str):
2969+
if pd.isna(accession_str):
2970+
return accession_str
2971+
seen = set()
2972+
result = []
2973+
for item in re.split(r"[;,]", accession_str):
2974+
item = item.strip()
2975+
if item not in seen:
2976+
seen.add(item)
2977+
result.append(item)
2978+
2979+
return ";".join(result)

0 commit comments

Comments
 (0)