Source code for proteobench.datapoint.quant_datapoint

"""
This module provides functionality for handling and processing quantitative datapoints in the ProteoBench framework.
"""

from __future__ import annotations

import dataclasses
import hashlib
import logging
from collections import ChainMap, defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict

import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score

import proteobench
from proteobench.datapoint.datapoint_base import DatapointBase


[docs] def filter_df_numquant_epsilon( row: Dict[str, Any], min_quant: int = 3, metric: str = "median", mode: str = "global" ) -> float | None: """ Extract the 'median_abs_epsilon' value from a row (assumed to be a dictionary). Parameters ---------- row : dict The row from which to extract the value. Expected to be a dictionary. min_quant : int or str, optional The key for the desired value. Defaults to 3. metric : str The metric to be calculated. Should be either median or mean, defaults to median. mode : str, optional The mode of metric calculation, defaults to "global". Returns ------- float or None The 'median_abs_epsilon' value if found, otherwise None. """ if not row: # Handle empty dictionary return None if isinstance(list(row.keys())[0], str): min_quant = str(min_quant) if isinstance(row, dict) and min_quant in row and isinstance(row[min_quant], dict): # Try with mode suffix first (new format) metric_key_with_mode = "{}_abs_epsilon_{}".format(metric, mode) result = row[min_quant].get(metric_key_with_mode) # Fallback to legacy format without mode suffix (for old datapoints) if result is None: metric_key_legacy = "{}_abs_epsilon".format(metric) result = row[min_quant].get(metric_key_legacy) return result return None
[docs] def filter_df_numquant_nr_prec(row: pd.Series, min_quant: int = 3) -> int | None: """ Extract the 'nr_prec' value from a row (assumed to be a dictionary). Parameters ---------- row : pd.Series The row from which to extract the value. Expected to be a dictionary or Series. min_quant : int or str, optional The key for the desired value. Defaults to 3. Returns ------- int, None The 'nr_prec' value if found, otherwise None. """ if isinstance(list(row.keys())[0], str): min_quant = str(min_quant) if isinstance(row, dict) and min_quant in row and isinstance(row[min_quant], dict): return row[min_quant].get("nr_prec") return None
[docs] def compute_roc_auc(df: pd.DataFrame, unchanged_species: str = None) -> float: """ Compute ROC-AUC for distinguishing unchanged from changed species. Uses absolute log2 fold change as the score to separate species that should show no change (e.g., HUMAN with 1:1 ratio) from species that should show change (e.g., YEAST, ECOLI with different ratios). Parameters ---------- df : pd.DataFrame DataFrame with 'species' and 'log2_A_vs_B' columns. Optionally 'log2_expectedRatio' for auto-detecting unchanged species. unchanged_species : str, optional Species name for the unchanged/control group. If None, auto-detects from data as the species with smallest absolute expected log2 ratio. Returns ------- float ROC-AUC score, or np.nan if computation is not possible (e.g., only one class present or all scores are NaN). """ if "species" not in df.columns or "log2_A_vs_B" not in df.columns: return np.nan if len(df) == 0: return np.nan # Auto-detect unchanged species if not provided if unchanged_species is None: unchanged_species = _detect_unchanged_species(df) if unchanged_species is None: return np.nan y_true = (df["species"] != unchanged_species).astype(int) y_score = df["log2_A_vs_B"].abs() # Need both classes and valid scores if len(y_true.unique()) < 2 or y_score.isna().all(): return np.nan # Handle NaN scores by dropping them valid_mask = ~y_score.isna() if valid_mask.sum() < 2: return np.nan return roc_auc_score(y_true[valid_mask], y_score[valid_mask])
def _detect_unchanged_species(df: pd.DataFrame) -> str | None: """ Detect the unchanged species from the data. The unchanged species is the one with the smallest absolute expected log2 ratio (i.e., ratio closest to 1:1). Parameters ---------- df : pd.DataFrame DataFrame with 'species' and 'log2_expectedRatio' columns. Returns ------- str or None Species name for the unchanged group, or None if detection fails. """ if "log2_expectedRatio" not in df.columns or "species" not in df.columns: return None if len(df) == 0: return None # Get unique species-ratio pairs and find the one closest to 1:1 species_ratios = df[["species", "log2_expectedRatio"]].drop_duplicates() idx = species_ratios["log2_expectedRatio"].abs().idxmin() return species_ratios.loc[idx, "species"]
[docs] def compute_roc_auc_directional(df: pd.DataFrame) -> float: """ Compute directional ROC-AUC for distinguishing changed from unchanged species. Unlike the abs-based ROC-AUC, this method accounts for the expected direction of fold change for each species: - For species with positive expected log2 ratio (e.g., YEAST): Uses raw log2_FC - For species with negative expected log2 ratio (e.g., ECOLI): Uses -log2_FC This approach is more robust to systematic bias where the unchanged species may not be centered at zero. Parameters ---------- df : pd.DataFrame DataFrame with 'species', 'log2_A_vs_B', and 'log2_expectedRatio' columns. Returns ------- float Average ROC-AUC score across all changed species, or np.nan if computation is not possible. """ required_cols = ["species", "log2_A_vs_B", "log2_expectedRatio"] if not all(col in df.columns for col in required_cols): return np.nan if len(df) == 0: return np.nan # Detect unchanged species (closest to 1:1 ratio) unchanged_species = _detect_unchanged_species(df) if unchanged_species is None: return np.nan # Get unique species and their expected ratios species_ratios = df[["species", "log2_expectedRatio"]].drop_duplicates() changed_species = species_ratios[species_ratios["species"] != unchanged_species] if len(changed_species) == 0: return np.nan roc_aucs = [] for _, row in changed_species.iterrows(): species_name = row["species"] expected_ratio = row["log2_expectedRatio"] # Filter to unchanged and this changed species df_binary = df[df["species"].isin([unchanged_species, species_name])].copy() if len(df_binary) == 0: continue # Labels: 1 for changed species, 0 for unchanged y_true = (df_binary["species"] == species_name).astype(int) # Score: Use direction based on expected ratio # If expected ratio > 0, changed species should have higher log2_FC # If expected ratio < 0, changed species should have lower log2_FC (so negate) if expected_ratio >= 0: y_score = df_binary["log2_A_vs_B"] else: y_score = -df_binary["log2_A_vs_B"] # Need both classes and valid scores if len(y_true.unique()) < 2 or y_score.isna().all(): continue # Handle NaN scores valid_mask = ~y_score.isna() if valid_mask.sum() < 2: continue try: auc = roc_auc_score(y_true[valid_mask], y_score[valid_mask]) roc_aucs.append(auc) except ValueError: continue if len(roc_aucs) == 0: return np.nan return np.mean(roc_aucs)
[docs] @dataclass class QuantDatapointHYE(DatapointBase): """ A data structure used to store the results of a quantification benchmark run. This class extends DatapointBase to implement quantification-specific metrics and metadata storage for LFQ benchmarking runs. Attributes: id (str): Unique identifier for the benchmark run. software_name (str): Name of the software used in the benchmark. software_version (str): Version of the software. search_engine (str): Name of the search engine used. search_engine_version (str): Version of the search engine. ident_fdr_psm (float): False discovery rate for PSMs. ident_fdr_peptide (float): False discovery rate for peptides. ident_fdr_protein (float): False discovery rate for proteins. enable_match_between_runs (bool): Whether matching between runs is enabled. precursor_mass_tolerance (str): Mass tolerance for precursor ions. fragment_mass_tolerance (str): Mass tolerance for fragment ions. enzyme (str): Enzyme used for digestion. allowed_miscleavages (int): Number of allowed miscleavages. min_peptide_length (int): Minimum peptide length. max_peptide_length (int): Maximum peptide length. is_temporary (bool): Whether the data is temporary. intermediate_hash (str): Hash of the intermediate result. results (dict): A dictionary of metrics for the benchmark run. median_abs_epsilon_global (float): Median absolute epsilon value for the benchmark. mean_abs_epsilon_global (float): Mean absolute epsilon value for the benchmark. median_abs_epsilon_eq_species (float): Median absolute epsilon value for equivalently weighted species. mean_abs_epsilon_eq_species (float): Mean absolute epsilon value for equivalently weighted species. median_abs_epsilon_precision_global (float): Median absolute precision epsilon (deviation from empirical center). mean_abs_epsilon_precision_global (float): Mean absolute precision epsilon (deviation from empirical center). median_abs_epsilon_precision_eq_species (float): Median absolute precision epsilon for equivalently weighted species. mean_abs_epsilon_precision_eq_species (float): Mean absolute precision epsilon for equivalently weighted species. nr_prec (int): Number of precursors identified. comments (str): Any additional comments. proteobench_version (str): Version of the Proteobench tool used. """ id: str = None software_name: str = None software_version: int = 0 search_engine: str = None search_engine_version: int = 0 ident_fdr_psm: int = 0 ident_fdr_peptide: int = 0 ident_fdr_protein: int = 0 enable_match_between_runs: bool = False precursor_mass_tolerance: str = None fragment_mass_tolerance: str = None enzyme: str = None allowed_miscleavages: int = 0 min_peptide_length: int = 0 max_peptide_length: int = 0 is_temporary: bool = True intermediate_hash: str = "" results: dict = None median_abs_epsilon_global: float = 0 mean_abs_epsilon_global: float = 0 median_abs_epsilon_eq_species: float = 0 mean_abs_epsilon_eq_species: float = 0 median_abs_epsilon_precision_global: float = 0 mean_abs_epsilon_precision_global: float = 0 median_abs_epsilon_precision_eq_species: float = 0 mean_abs_epsilon_precision_eq_species: float = 0 nr_prec: int = 0 comments: str = "" proteobench_version: str = ""
[docs] def generate_id(self) -> None: """ Generate a unique ID for the benchmark run by combining the software name and a timestamp. This ID is used to uniquely identify each run of the benchmark. """ time_stamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.id = "_".join([self.software_name, str(time_stamp)]) logging.info(f"Assigned the following ID to this run: {self.id}")
[docs] @staticmethod def generate_datapoint( intermediate: pd.DataFrame, input_format: str, user_input: dict, default_cutoff_min_prec: int = 3 ) -> pd.Series: """ Generate a Datapoint object containing metadata and results from the benchmark run. Parameters ---------- intermediate : pd.DataFrame The intermediate DataFrame containing benchmark results. input_format : str The format of the input data (e.g., file format). user_input : dict User-defined input values for the benchmark. default_cutoff_min_prec : int, optional The default minimum precursor cutoff value. Defaults to 3. Returns ------- pd.Series A Pandas Series containing the Datapoint's attributes as key-value pairs. """ current_datetime = datetime.now() formatted_datetime = current_datetime.strftime("%Y%m%d_%H%M%S_%f") if "comments_for_plotting" not in user_input.keys(): user_input["comments_for_plotting"] = "" try: user_input = defaultdict( user_input.default_factory, # Preserve the default factory {key: ("" if value is None else value) for key, value in user_input.items()}, ) except AttributeError: user_input = {key: ("" if value is None else value) for key, value in user_input.items()} result_datapoint = QuantDatapointHYE( id=input_format + "_" + user_input["software_version"] + "_" + formatted_datetime, software_name=input_format, software_version=user_input["software_version"], search_engine=user_input["search_engine"], search_engine_version=user_input["search_engine_version"], ident_fdr_psm=user_input["ident_fdr_psm"], ident_fdr_peptide=user_input["ident_fdr_peptide"], ident_fdr_protein=user_input["ident_fdr_protein"], enable_match_between_runs=user_input["enable_match_between_runs"], precursor_mass_tolerance=user_input["precursor_mass_tolerance"], fragment_mass_tolerance=user_input["fragment_mass_tolerance"], enzyme=user_input["enzyme"], allowed_miscleavages=user_input["allowed_miscleavages"], min_peptide_length=user_input["min_peptide_length"], max_peptide_length=user_input["max_peptide_length"], intermediate_hash=str(hashlib.sha1(intermediate.to_string().encode("utf-8")).hexdigest()), comments=user_input["comments_for_plotting"], proteobench_version=proteobench.__version__, ) result_datapoint.generate_id() results = dict( ChainMap(*[QuantDatapointHYE.get_metrics(intermediate, nr_observed) for nr_observed in range(1, 7)]) ) result_datapoint.results = results result_datapoint.median_abs_epsilon_global = result_datapoint.results[default_cutoff_min_prec][ "median_abs_epsilon_global" ] result_datapoint.mean_abs_epsilon_global = result_datapoint.results[default_cutoff_min_prec][ "mean_abs_epsilon_global" ] result_datapoint.median_abs_epsilon_eq_species = result_datapoint.results[default_cutoff_min_prec][ "median_abs_epsilon_eq_species" ] result_datapoint.mean_abs_epsilon_eq_species = result_datapoint.results[default_cutoff_min_prec][ "mean_abs_epsilon_eq_species" ] result_datapoint.median_abs_epsilon_precision_global = result_datapoint.results[default_cutoff_min_prec][ "median_abs_epsilon_precision_global" ] result_datapoint.mean_abs_epsilon_precision_global = result_datapoint.results[default_cutoff_min_prec][ "mean_abs_epsilon_precision_global" ] result_datapoint.median_abs_epsilon_precision_eq_species = result_datapoint.results[default_cutoff_min_prec][ "median_abs_epsilon_precision_eq_species" ] result_datapoint.mean_abs_epsilon_precision_eq_species = result_datapoint.results[default_cutoff_min_prec][ "mean_abs_epsilon_precision_eq_species" ] result_datapoint.nr_prec = result_datapoint.results[default_cutoff_min_prec]["nr_prec"] results_series = pd.Series(dataclasses.asdict(result_datapoint)) return results_series
[docs] def get_epsilon_metrics(df: pd.DataFrame, min_nr_observed: int, agg: str = "median") -> dict[str, float]: """ Compute epsilon-based accuracy metrics using specified aggregation. Parameters ---------- df : pd.DataFrame DataFrame with epsilon column (deviation from expected ratio) min_nr_observed : int Filter threshold for minimum observations agg : str Aggregation method: "median" or "mean" Returns ------- dict Accuracy metrics: global, equal-species average, and per-species values """ df_slice = df[df["nr_observed"] >= min_nr_observed] agg_func = (lambda x: x.abs().median()) if agg == "median" else (lambda x: x.abs().mean()) per_species_acc = df_slice.groupby("species")["epsilon"].apply(agg_func) return { f"{agg}_abs_epsilon_global": agg_func(df_slice["epsilon"]), f"{agg}_abs_epsilon_eq_species": per_species_acc.mean(), **{f"{agg}_abs_epsilon_{sp}": v for sp, v in per_species_acc.items()}, }
[docs] def get_precision_metrics(df: pd.DataFrame, min_nr_observed: int, agg: str = "median") -> dict[str, float]: """ Compute precision metrics directly from log2FC (log2_A_vs_B) column. Precision measures deviation from the empirical center (reproducibility), computed independently from expected ratios. Parameters ---------- df : pd.DataFrame DataFrame with log2_A_vs_B and species columns min_nr_observed : int Filter threshold for minimum observations agg : str Aggregation method: "median" or "mean" Returns ------- dict Precision metrics including: - {agg}_log2_empirical_{species}: Center of log2FC distribution per species - {agg}_abs_epsilon_precision_global: Global aggregated precision - {agg}_abs_epsilon_precision_eq_species: Equal-weighted species average - {agg}_abs_epsilon_precision_{species}: Per-species precision values """ df_slice = df[df["nr_observed"] >= min_nr_observed] # Compute empirical center per species center_func = (lambda x: x.median()) if agg == "median" else (lambda x: x.mean()) per_species_center = df_slice.groupby("species")["log2_A_vs_B"].transform(center_func) # Precision = deviation from empirical center epsilon_precision = df_slice["log2_A_vs_B"] - per_species_center # Aggregate precision per species agg_func = (lambda x: x.abs().median()) if agg == "median" else (lambda x: x.abs().mean()) # Create temp df for groupby prec_df = df_slice[["species"]].copy() prec_df["epsilon_precision"] = epsilon_precision per_species_prec = prec_df.groupby("species")["epsilon_precision"].apply(agg_func) # Get the empirical centers per species empirical_centers = df_slice.groupby("species")["log2_A_vs_B"].apply(center_func) return { **{f"{agg}_log2_empirical_{sp}": v for sp, v in empirical_centers.items()}, f"{agg}_abs_epsilon_precision_global": agg_func(epsilon_precision), f"{agg}_abs_epsilon_precision_eq_species": per_species_prec.mean(), **{f"{agg}_abs_epsilon_precision_{sp}": v for sp, v in per_species_prec.items()}, }
[docs] def get_cv_metrics(df: pd.DataFrame, min_nr_observed: int) -> dict[str, float]: """Compute CV quantiles.""" df_slice = df[df["nr_observed"] >= min_nr_observed] cv_q = df_slice[["CV_A", "CV_B"]].quantile([0.5, 0.75, 0.9, 0.95]) cv_avg = cv_q.mean(axis=1) return { "CV_median": cv_avg.loc[0.50], "CV_q75": cv_avg.loc[0.75], "CV_q90": cv_avg.loc[0.90], "CV_q95": cv_avg.loc[0.95], }
[docs] def get_roc_metrics(df: pd.DataFrame, min_nr_observed: int) -> dict[str, float]: """Compute ROC-AUC metrics.""" df_slice = df[df["nr_observed"] >= min_nr_observed] return { "roc_auc": compute_roc_auc(df_slice), "roc_auc_directional": compute_roc_auc_directional(df_slice), }
[docs] def get_count_metrics(df: pd.DataFrame, min_nr_observed: int) -> dict[str, int]: """Compute precursor counts (total and per-species).""" df_slice = df[df["nr_observed"] >= min_nr_observed] species_counts = df_slice["species"].value_counts().to_dict() return { "nr_prec": len(df_slice), **{f"nr_prec_{sp}": count for sp, count in species_counts.items()}, }
[docs] def get_metrics(df: pd.DataFrame, min_nr_observed: int = 1) -> dict[int, dict[str, float]]: """ Compute all benchmark metrics (backward compatible wrapper). Merges: epsilon (accuracy) + precision + cv + roc + counts """ df_slice = df[df["nr_observed"] >= min_nr_observed] return { min_nr_observed: { **QuantDatapointHYE.get_epsilon_metrics(df, min_nr_observed, "median"), **QuantDatapointHYE.get_epsilon_metrics(df, min_nr_observed, "mean"), **QuantDatapointHYE.get_precision_metrics(df, min_nr_observed, "median"), **QuantDatapointHYE.get_precision_metrics(df, min_nr_observed, "mean"), **QuantDatapointHYE.get_cv_metrics(df, min_nr_observed), **QuantDatapointHYE.get_roc_metrics(df, min_nr_observed), **QuantDatapointHYE.get_count_metrics(df, min_nr_observed), "variance_epsilon_global": df_slice["epsilon"].var(), } }
[docs] @staticmethod def get_metrics_old(df: pd.DataFrame, min_nr_observed: int = 1) -> Dict[int, Dict[str, float]]: """ Compute various statistical metrics from the provided DataFrame for the benchmark. Parameters ---------- df : pd.DataFrame The DataFrame containing the benchmark results. min_nr_observed : int, optional The minimum number of observed values for a valid computation. Defaults to 1. Returns ------- dict A dictionary containing computed metrics such as 'median_abs_epsilon', 'variance_epsilon', etc. """ # Filter DataFrame by the minimum number of observations df_slice = df[df["nr_observed"] >= min_nr_observed] nr_prec = len(df_slice) # Calculate the median absolute epsilon (insensitive to outliers) median_abs_epsilon = df_slice["epsilon"].abs().median() # Calculate the mean absolute epsilon (sensitive to outliers) mean_abs_epsilon = df_slice["epsilon"].abs().mean() # Calculate the variance of epsilon (sensitive to outliers) variance_epsilon = df_slice["epsilon"].var() # Compute the median of the coefficient of variation (CV) for both 'CV_A' and 'CV_B' cv_median = (df_slice["CV_A"].median() + df_slice["CV_B"].median()) / 2 cv_q75 = (df_slice["CV_A"].quantile(0.75) + df_slice["CV_B"].quantile(0.75)) / 2 cv_q90 = (df_slice["CV_A"].quantile(0.9) + df_slice["CV_B"].quantile(0.9)) / 2 cv_q95 = (df_slice["CV_A"].quantile(0.95) + df_slice["CV_B"].quantile(0.95)) / 2 return { min_nr_observed: { "median_abs_epsilon": median_abs_epsilon, "mean_abs_epsilon": mean_abs_epsilon, "variance_epsilon": variance_epsilon, "nr_prec": nr_prec, "CV_median": cv_median, "CV_q90": cv_q90, "CV_q75": cv_q75, "CV_q95": cv_q95, } }