"""
Module for parsing precursor ion data from various formats.
"""
import math
import os
import re
import warnings
import pandas as pd
[docs]
def aggregate_modification_column(
input_string_seq: str,
input_string_modifications: str,
special_locations: dict = {
"Any N-term": 0,
"Any C-term": -1,
"Protein N-term": 0,
"Protein C-term": -1,
},
) -> str:
"""
Aggregate modifications into a string representing the modified sequence.
Parameters
----------
input_string_seq : str
The input sequence string.
input_string_modifications : str
The modifications applied to the sequence.
special_locations : dict, optional
A dictionary specifying special locations for modifications.
Returns
-------
str
The modified sequence string with aggregated modifications.
"""
all_mods = []
for m in input_string_modifications.split("; "):
if len(m) == 0:
continue
mod_location = m.split(" (")[1].rstrip(")")
mod_name = m.split(" (")[0]
if mod_location in special_locations.keys():
if special_locations[mod_location] == -1: # C-Term
all_mods.append(("-[" + mod_name + "]", len(input_string_seq)))
else: # N-Term
all_mods.append(("[" + mod_name + "]-", special_locations[mod_location]))
continue
all_mods.append(("[" + mod_name + "]", int(mod_location[1:])))
all_mods.sort(key=lambda x: x[1], reverse=True)
for name, loc in all_mods:
input_string_seq = input_string_seq[:loc] + name + input_string_seq[loc:]
return input_string_seq
[docs]
def aggregate_modification_sites_column(
input_string_seq: str,
input_string_modifications: str,
input_string_sites: str,
) -> str:
"""
Aggregate modification sites into a string representing the modified sequence with sites.
Parameters
----------
input_string_seq : str
The input sequence string.
input_string_modifications : str
The modifications applied to the sequence.
input_string_sites : str
The positions of the modifications.
Returns
-------
str
The modified sequence string with modification sites.
"""
if isinstance(input_string_modifications, float) and math.isnan(input_string_modifications):
return input_string_seq
mods_list = input_string_modifications.split(";")
sites_list = list(map(int, str(input_string_sites).split(";")))
mods_and_sites = sorted(zip(mods_list, sites_list), key=lambda x: x[1], reverse=True)
for mod, site in mods_and_sites:
if not mod:
continue
mod_name = mod.split("@")[0]
if site == 0:
input_string_seq = input_string_seq[:site] + f"[{mod_name}]-" + input_string_seq[site:]
elif site == -1:
input_string_seq = input_string_seq[:site] + f"-[{mod_name}]" + input_string_seq[site:]
else:
input_string_seq = input_string_seq[:site] + f"[{mod_name}]" + input_string_seq[site:]
return input_string_seq
[docs]
def count_chars(input_string: str, isalpha: bool = True, isupper: bool = True) -> int:
"""
Count the number of characters in the string that match the given criteria.
Parameters
----------
input_string : str
The input string.
isalpha : bool, optional
Whether to count alphabetic characters. Defaults to True.
isupper : bool, optional
Whether to count uppercase characters. Defaults to True.
Returns
-------
int
The count of characters that match the criteria.
"""
if isalpha and isupper:
return sum(1 for char in input_string if char.isalpha() and char.isupper())
if isalpha:
return sum(1 for char in input_string if char.isalpha())
if isupper:
return sum(1 for char in input_string if char.isupper())
[docs]
def get_stripped_seq(input_string: str, isalpha: bool = True, isupper: bool = True) -> str:
"""
Get a stripped version of the sequence containing only characters that match the given criteria.
Parameters
----------
input_string : str
The input string.
isalpha : bool, optional
Whether to include alphabetic characters. Defaults to True.
isupper : bool, optional
Whether to include uppercase characters. Defaults to True.
Returns
-------
str
The stripped sequence.
"""
if isalpha and isupper:
return "".join(char for char in input_string if char.isalpha() and char.isupper())
if isalpha:
return "".join(char for char in input_string if char.isalpha())
if isupper:
return "".join(char for char in input_string if char.isupper())
[docs]
def match_brackets(
input_string: str,
pattern: str = r"\[([^]]+)\]",
isalpha: bool = True,
isupper: bool = True,
) -> tuple:
"""
Match and extract bracketed modifications from the string.
Parameters
----------
input_string : str
The input string.
pattern : str, optional
The regular expression pattern for matching modifications. Defaults to "\\[([^]]+)\\]".
isalpha : bool, optional
Whether to match alphabetic characters. Defaults to True.
isupper : bool, optional
Whether to match uppercase characters. Defaults to True.
Returns
-------
tuple
A tuple containing the matched modifications and their positions.
"""
matches = [(match.group(), match.start(), match.end()) for match in re.finditer(pattern, input_string)]
positions = (count_chars(input_string[0 : m[1]], isalpha=isalpha, isupper=isupper) for m in matches)
mods = (m[0] for m in matches)
return mods, positions
[docs]
def to_lowercase(match) -> str:
"""
Convert a match to lowercase.
Parameters
----------
match : re.Match
The match object from a regular expression.
Returns
-------
str
The lowercase version of the matched string.
"""
return match.group(0).lower()
def _load_maxquant(input_csv: str) -> pd.DataFrame:
"""
Load a MaxQuant output file.
Parameters
----------
input_csv : str
The path to the MaxQuant output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
return pd.read_csv(input_csv, sep="\t", low_memory=False)
def _load_alphapept(input_csv: str) -> pd.DataFrame:
"""
Load a AlphaPept output file.
Parameters
----------
input_csv : str
The path to the AlphaPept output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
return pd.read_csv(input_csv, low_memory=False, dtype={"charge": int})
def _load_sage(input_csv: str) -> pd.DataFrame:
"""
Load a Sage output file.
Parameters
----------
input_csv : str
The path to the Sage output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
return pd.read_csv(input_csv, sep="\t", low_memory=False)
def _load_fragpipe(input_csv: str) -> pd.DataFrame:
"""
Load a FragPipe output file.
Parameters
----------
input_csv : str
The path to the FragPipe output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
input_data_frame["Protein"] = input_data_frame["Protein"] + "," + input_data_frame["Mapped Proteins"].fillna("")
return input_data_frame
def _load_wombat(input_csv: str) -> pd.DataFrame:
"""
Load a WOMBAT output file.
Parameters
----------
input_csv : str
The path to the WOMBAT output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep=",")
mapper_path = os.path.join(os.path.dirname(__file__), "io_parse_settings/mapper.csv")
mapper_df = pd.read_csv(mapper_path).set_index("gene_name")
mapper = mapper_df["description"].to_dict()
non_strings = input_data_frame["protein_group"][
~input_data_frame["protein_group"].apply(lambda x: isinstance(x, str))
]
input_data_frame["protein_group"] = input_data_frame["protein_group"].map(
lambda x: ";".join([mapper[protein] if protein in mapper.keys() else protein for protein in x.split(",")])
)
input_data_frame["proforma"] = input_data_frame["modified_peptide"]
return input_data_frame
def _load_prolinestudio_msangel(input_csv: str) -> pd.DataFrame:
"""
Load a MSAngel/ProlineStudio output file.
Parameters
----------
input_csv : str
The path to the MSAngel/ProlineStudio output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_excel(
input_csv, sheet_name="Quantified peptide ions", header=0, index_col=None, engine="calamine"
)
input_data_frame["modifications"] = input_data_frame["modifications"].fillna("")
input_data_frame["subsets_accessions"] = input_data_frame["subsets_accessions"].fillna("")
input_data_frame["proforma"] = input_data_frame.apply(
lambda x: aggregate_modification_column(x.sequence, x.modifications),
axis=1,
)
# combine the sameset and subset accessions:
# first combine the accessions:
input_data_frame["proteins"] = input_data_frame["samesets_accessions"] + input_data_frame[
"subsets_accessions"
].apply(lambda x: "; " + x if len(x) > 0 else "")
# then sort the unique accessions:
input_data_frame["proteins"] = input_data_frame["proteins"].apply(lambda x: "; ".join(sorted(x.split("; "))))
# drop the duplicates:
input_data_frame.drop_duplicates(subset=["proforma", "master_quant_peptide_ion_charge", "proteins"], inplace=True)
# combine the duplicated precursor ions because proline reports one row per precursor + accession:
group_cols = ["proforma", "master_quant_peptide_ion_charge"]
agg_funcs = {col: "first" for col in input_data_frame.columns if col not in group_cols + ["proteins"]}
input_data_frame = (
input_data_frame.groupby(group_cols).agg({"proteins": lambda x: "; ".join(x), **agg_funcs}).reset_index()
)
return input_data_frame
def _load_i2masschroq(input_csv: str) -> pd.DataFrame:
"""
Load a i2MassChroQ output file.
Parameters
----------
input_csv : str
The path to the i2MassChroQ output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
input_data_frame["proforma"] = input_data_frame["ProForma"]
return input_data_frame
def _load_custom(input_csv: str) -> pd.DataFrame:
"""
Load a custom output file.
Parameters
----------
input_csv : str
The path to the custom output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
input_data_frame["proforma"] = input_data_frame["Modified sequence"]
return input_data_frame
def _load_diann(input_csv: str) -> pd.DataFrame:
"""
Load a DIA-NN output file.
Parameters
----------
input_csv : str
The path to the DIA-NN output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
if isinstance(input_csv, str):
filename = input_csv
else: # streamlit OpenedFile object
filename = input_csv.name
if filename.endswith(".parquet"):
input_data_frame = pd.read_parquet(input_csv)
else:
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
return input_data_frame
def _load_alphadia(input_csv: str) -> pd.DataFrame:
"""
Load a AlphaDIA output file.
Parameters
----------
input_csv : str
The path to the AlphaDIA output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
mapper_path = os.path.join(os.path.dirname(__file__), "io_parse_settings/mapper.csv")
mapper_df = pd.read_csv(mapper_path).set_index("gene_name")
mapper = mapper_df["description"].to_dict()
input_data_frame["genes"] = input_data_frame["genes"].map(
lambda x: ";".join([mapper[protein] if protein in mapper.keys() else protein for protein in x.split(";")])
)
input_data_frame["proforma"] = input_data_frame.apply(
lambda x: aggregate_modification_sites_column(x.sequence, x.mods, x.mod_sites),
axis=1,
)
return input_data_frame
def _load_fragpipe_diann_quant(input_csv: str) -> pd.DataFrame:
"""
Load a FragPipe (DIA-NN) output file.
Parameters
----------
input_csv : str
The path to the FragPipe (DIA-NN) output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
mapper_path = os.path.join(os.path.dirname(__file__), "io_parse_settings/mapper.csv")
mapper_df = pd.read_csv(mapper_path).set_index("gene_name")
mapper = mapper_df["description"].to_dict()
input_data_frame["Protein.Names"] = input_data_frame["Protein.Ids"].map(mapper)
return input_data_frame
def _load_spectronaut(input_csv: str) -> pd.DataFrame:
"""
Load a Spectronaut output file.
Parameters
----------
input_csv : str
The path to the Spectronaut output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
if input_data_frame["FG.Quantity"].dtype == object:
try:
input_csv.seek(0)
except AttributeError:
# if input_csv is a PathPosix object, it does not have a seek method
# This can occur when the io util functions are used.
# Should probably be fixed some way in the future
pass
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t", decimal=",")
input_data_frame["FG.LabeledSequence"] = input_data_frame["FG.LabeledSequence"].str.strip("_")
mapper_path = os.path.join(os.path.dirname(__file__), "io_parse_settings/mapper.csv")
mapper_df = pd.read_csv(mapper_path).set_index("gene_name")
mapper = mapper_df["description"].to_dict()
input_data_frame["PG.ProteinGroups"] = input_data_frame["PG.ProteinGroups"].str.split(";")
input_data_frame["PG.ProteinGroups"] = input_data_frame["PG.ProteinGroups"].map(
lambda x: [mapper[protein] if protein in mapper.keys() else protein for protein in x]
)
input_data_frame["PG.ProteinGroups"] = input_data_frame["PG.ProteinGroups"].str.join(";")
return input_data_frame
def _load_metamorpheus(input_csv: str) -> pd.DataFrame:
"""
Load a MetaMorpheus output file (FlashLFQ: AllQuantifiedPeaks.tsv).
Parameters
----------
input_csv : str
The path to the MetaMorpheus output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False, sep="\t")
mapper_path = os.path.join(os.path.dirname(__file__), "io_parse_settings/mapper.csv")
mapper_df = pd.read_csv(mapper_path).set_index("gene_name")
mapper = mapper_df["description"].to_dict()
input_data_frame["Proteins"] = input_data_frame["Protein Group"].map(
lambda x: ";".join([mapper.get(protein, protein) for protein in x.split(";")])
)
# TODO: discuss how to handle multiple mapped precursors
input_data_frame = input_data_frame[input_data_frame["Full Sequences Mapped"] == 1]
return input_data_frame
def _load_msaid(input_csv: str) -> pd.DataFrame:
"""
Load a MSAID output file.
Parameters
----------
input_csv : str
The path to the MSAID output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
return pd.read_csv(input_csv, low_memory=False, sep="\t")
def _load_peaks(input_csv: str) -> pd.DataFrame:
"""
Load a PEAKS output file.
Parameters
----------
input_csv : str
The path to the PEAKS output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
return pd.read_csv(input_csv, low_memory=False, sep=",")
def _load_quantms(input_csv: str) -> pd.DataFrame:
"""
Load a QuantMS output file.
Parameters
----------
input_csv : str
The path to the QuantMS output file.
Returns
-------
pd.DataFrame
The loaded dataframe.
"""
input_data_frame = pd.read_csv(input_csv, low_memory=False)
input_data_frame = input_data_frame.assign(
proforma=input_data_frame["PeptideSequence"].str.replace(
r"\(([^)]+)\)",
r"",
regex=True,
),
)
input_data_frame["Sequence"] = input_data_frame["PeptideSequence"].str.replace(r"\(([^)]+)\)", r"", regex=True)
return input_data_frame
_LOAD_FUNCTIONS = {
"MaxQuant": _load_maxquant,
"AlphaPept": _load_alphapept,
"Sage": _load_sage,
"FragPipe": _load_fragpipe,
"WOMBAT": _load_wombat,
"ProlineStudio": _load_prolinestudio_msangel,
"MSAngel": _load_prolinestudio_msangel,
"i2MassChroQ": _load_i2masschroq,
"Custom": _load_custom,
"DIA-NN": _load_diann,
"AlphaDIA": _load_alphadia,
"FragPipe (DIA-NN quant)": _load_fragpipe_diann_quant,
"Spectronaut": _load_spectronaut,
"MSAID": _load_msaid,
"PEAKS": _load_peaks,
"quantms": _load_quantms,
"MetaMorpheus": _load_metamorpheus,
}