"""
This module provides functionality for handling and processing quantitative datapoints in the ProteoBench framework.
"""
from __future__ import annotations
import dataclasses
import hashlib
import logging
from collections import ChainMap, defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score
import proteobench
[docs]
def filter_df_numquant_epsilon(
row: Dict[str, Any], min_quant: int = 3, metric: str = "median", mode: str = "global"
) -> float | None:
"""
Extract the 'median_abs_epsilon' value from a row (assumed to be a dictionary).
Parameters
----------
row : dict
The row from which to extract the value. Expected to be a dictionary.
min_quant : int or str, optional
The key for the desired value. Defaults to 3.
metric : str
The metric to be calculated. Should be either median or mean, defaults to median.
mode : str, optional
The mode of metric calculation, defaults to "global".
Returns
-------
float or None
The 'median_abs_epsilon' value if found, otherwise None.
"""
if not row: # Handle empty dictionary
return None
if isinstance(list(row.keys())[0], str):
min_quant = str(min_quant)
if isinstance(row, dict) and min_quant in row and isinstance(row[min_quant], dict):
# Try with mode suffix first (new format)
metric_key_with_mode = "{}_abs_epsilon_{}".format(metric, mode)
result = row[min_quant].get(metric_key_with_mode)
# Fallback to legacy format without mode suffix (for old datapoints)
if result is None:
metric_key_legacy = "{}_abs_epsilon".format(metric)
result = row[min_quant].get(metric_key_legacy)
return result
return None
[docs]
def filter_df_numquant_nr_prec(row: pd.Series, min_quant: int = 3) -> int | None:
"""
Extract the 'nr_prec' value from a row (assumed to be a dictionary).
Parameters
----------
row : pd.Series
The row from which to extract the value. Expected to be a dictionary or Series.
min_quant : int or str, optional
The key for the desired value. Defaults to 3.
Returns
-------
int, None
The 'nr_prec' value if found, otherwise None.
"""
if isinstance(list(row.keys())[0], str):
min_quant = str(min_quant)
if isinstance(row, dict) and min_quant in row and isinstance(row[min_quant], dict):
return row[min_quant].get("nr_prec")
return None
[docs]
def compute_roc_auc(df: pd.DataFrame, unchanged_species: str = None) -> float:
"""
Compute ROC-AUC for distinguishing unchanged from changed species.
Uses absolute log2 fold change as the score to separate species that should
show no change (e.g., HUMAN with 1:1 ratio) from species that should show
change (e.g., YEAST, ECOLI with different ratios).
Parameters
----------
df : pd.DataFrame
DataFrame with 'species' and 'log2_A_vs_B' columns.
Optionally 'log2_expectedRatio' for auto-detecting unchanged species.
unchanged_species : str, optional
Species name for the unchanged/control group. If None, auto-detects
from data as the species with smallest absolute expected log2 ratio.
Returns
-------
float
ROC-AUC score, or np.nan if computation is not possible
(e.g., only one class present or all scores are NaN).
"""
if "species" not in df.columns or "log2_A_vs_B" not in df.columns:
return np.nan
if len(df) == 0:
return np.nan
# Auto-detect unchanged species if not provided
if unchanged_species is None:
unchanged_species = _detect_unchanged_species(df)
if unchanged_species is None:
return np.nan
y_true = (df["species"] != unchanged_species).astype(int)
y_score = df["log2_A_vs_B"].abs()
# Need both classes and valid scores
if len(y_true.unique()) < 2 or y_score.isna().all():
return np.nan
# Handle NaN scores by dropping them
valid_mask = ~y_score.isna()
if valid_mask.sum() < 2:
return np.nan
return roc_auc_score(y_true[valid_mask], y_score[valid_mask])
def _detect_unchanged_species(df: pd.DataFrame) -> str | None:
"""
Detect the unchanged species from the data.
The unchanged species is the one with the smallest absolute expected log2 ratio
(i.e., ratio closest to 1:1).
Parameters
----------
df : pd.DataFrame
DataFrame with 'species' and 'log2_expectedRatio' columns.
Returns
-------
str or None
Species name for the unchanged group, or None if detection fails.
"""
if "log2_expectedRatio" not in df.columns or "species" not in df.columns:
return None
if len(df) == 0:
return None
# Get unique species-ratio pairs and find the one closest to 1:1
species_ratios = df[["species", "log2_expectedRatio"]].drop_duplicates()
idx = species_ratios["log2_expectedRatio"].abs().idxmin()
return species_ratios.loc[idx, "species"]
[docs]
def compute_roc_auc_directional(df: pd.DataFrame) -> float:
"""
Compute directional ROC-AUC for distinguishing changed from unchanged species.
Unlike the abs-based ROC-AUC, this method accounts for the expected direction
of fold change for each species:
- For species with positive expected log2 ratio (e.g., YEAST): Uses raw log2_FC
- For species with negative expected log2 ratio (e.g., ECOLI): Uses -log2_FC
This approach is more robust to systematic bias where the unchanged species
may not be centered at zero.
Parameters
----------
df : pd.DataFrame
DataFrame with 'species', 'log2_A_vs_B', and 'log2_expectedRatio' columns.
Returns
-------
float
Average ROC-AUC score across all changed species, or np.nan if computation
is not possible.
"""
required_cols = ["species", "log2_A_vs_B", "log2_expectedRatio"]
if not all(col in df.columns for col in required_cols):
return np.nan
if len(df) == 0:
return np.nan
# Detect unchanged species (closest to 1:1 ratio)
unchanged_species = _detect_unchanged_species(df)
if unchanged_species is None:
return np.nan
# Get unique species and their expected ratios
species_ratios = df[["species", "log2_expectedRatio"]].drop_duplicates()
changed_species = species_ratios[species_ratios["species"] != unchanged_species]
if len(changed_species) == 0:
return np.nan
roc_aucs = []
for _, row in changed_species.iterrows():
species_name = row["species"]
expected_ratio = row["log2_expectedRatio"]
# Filter to unchanged and this changed species
df_binary = df[df["species"].isin([unchanged_species, species_name])].copy()
if len(df_binary) == 0:
continue
# Labels: 1 for changed species, 0 for unchanged
y_true = (df_binary["species"] == species_name).astype(int)
# Score: Use direction based on expected ratio
# If expected ratio > 0, changed species should have higher log2_FC
# If expected ratio < 0, changed species should have lower log2_FC (so negate)
if expected_ratio >= 0:
y_score = df_binary["log2_A_vs_B"]
else:
y_score = -df_binary["log2_A_vs_B"]
# Need both classes and valid scores
if len(y_true.unique()) < 2 or y_score.isna().all():
continue
# Handle NaN scores
valid_mask = ~y_score.isna()
if valid_mask.sum() < 2:
continue
try:
auc = roc_auc_score(y_true[valid_mask], y_score[valid_mask])
roc_aucs.append(auc)
except ValueError:
continue
if len(roc_aucs) == 0:
return np.nan
return np.mean(roc_aucs)
[docs]
@dataclass
class QuantDatapoint:
"""
A data structure used to store the results of a benchmark run.
Attributes:
id (str): Unique identifier for the benchmark run.
software_name (str): Name of the software used in the benchmark.
software_version (str): Version of the software.
search_engine (str): Name of the search engine used.
search_engine_version (str): Version of the search engine.
ident_fdr_psm (float): False discovery rate for PSMs.
ident_fdr_peptide (float): False discovery rate for peptides.
ident_fdr_protein (float): False discovery rate for proteins.
enable_match_between_runs (bool): Whether matching between runs is enabled.
precursor_mass_tolerance (str): Mass tolerance for precursor ions.
fragment_mass_tolerance (str): Mass tolerance for fragment ions.
enzyme (str): Enzyme used for digestion.
allowed_miscleavages (int): Number of allowed miscleavages.
min_peptide_length (int): Minimum peptide length.
max_peptide_length (int): Maximum peptide length.
is_temporary (bool): Whether the data is temporary.
intermediate_hash (str): Hash of the intermediate result.
results (dict): A dictionary of metrics for the benchmark run.
median_abs_epsilon_global (float): Median absolute epsilon value for the benchmark.
mean_abs_epsilon_global (float): Mean absolute epsilon value for the benchmark.
median_abs_epsilon_eq_species (float): Median absolute epsilon value for equivalently weighted species.
mean_abs_epsilon_eq_species (float): Mean absolute epsilon value for equivalently weighted species.
median_abs_epsilon_precision_global (float): Median absolute precision epsilon (deviation from empirical center).
mean_abs_epsilon_precision_global (float): Mean absolute precision epsilon (deviation from empirical center).
median_abs_epsilon_precision_eq_species (float): Median absolute precision epsilon for equivalently weighted species.
mean_abs_epsilon_precision_eq_species (float): Mean absolute precision epsilon for equivalently weighted species.
nr_prec (int): Number of precursors identified.
comments (str): Any additional comments.
proteobench_version (str): Version of the Proteobench tool used.
"""
id: str = None
software_name: str = None
software_version: int = 0
search_engine: str = None
search_engine_version: int = 0
ident_fdr_psm: int = 0
ident_fdr_peptide: int = 0
ident_fdr_protein: int = 0
enable_match_between_runs: bool = False
precursor_mass_tolerance: str = None
fragment_mass_tolerance: str = None
enzyme: str = None
allowed_miscleavages: int = 0
min_peptide_length: int = 0
max_peptide_length: int = 0
is_temporary: bool = True
intermediate_hash: str = ""
results: dict = None
median_abs_epsilon_global: float = 0
mean_abs_epsilon_global: float = 0
median_abs_epsilon_eq_species: float = 0
mean_abs_epsilon_eq_species: float = 0
median_abs_epsilon_precision_global: float = 0
mean_abs_epsilon_precision_global: float = 0
median_abs_epsilon_precision_eq_species: float = 0
mean_abs_epsilon_precision_eq_species: float = 0
nr_prec: int = 0
comments: str = ""
proteobench_version: str = ""
[docs]
def generate_id(self) -> None:
"""
Generate a unique ID for the benchmark run by combining the software name and a timestamp.
This ID is used to uniquely identify each run of the benchmark.
"""
time_stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.id = "_".join([self.software_name, str(time_stamp)])
logging.info(f"Assigned the following ID to this run: {self.id}")
[docs]
@staticmethod
def generate_datapoint(
intermediate: pd.DataFrame, input_format: str, user_input: dict, default_cutoff_min_prec: int = 3
) -> pd.Series:
"""
Generate a Datapoint object containing metadata and results from the benchmark run.
Parameters
----------
intermediate : pd.DataFrame
The intermediate DataFrame containing benchmark results.
input_format : str
The format of the input data (e.g., file format).
user_input : dict
User-defined input values for the benchmark.
default_cutoff_min_prec : int, optional
The default minimum precursor cutoff value. Defaults to 3.
Returns
-------
pd.Series
A Pandas Series containing the Datapoint's attributes as key-value pairs.
"""
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("%Y%m%d_%H%M%S_%f")
if "comments_for_plotting" not in user_input.keys():
user_input["comments_for_plotting"] = ""
try:
user_input = defaultdict(
user_input.default_factory, # Preserve the default factory
{key: ("" if value is None else value) for key, value in user_input.items()},
)
except AttributeError:
user_input = {key: ("" if value is None else value) for key, value in user_input.items()}
result_datapoint = QuantDatapoint(
id=input_format + "_" + user_input["software_version"] + "_" + formatted_datetime,
software_name=input_format,
software_version=user_input["software_version"],
search_engine=user_input["search_engine"],
search_engine_version=user_input["search_engine_version"],
ident_fdr_psm=user_input["ident_fdr_psm"],
ident_fdr_peptide=user_input["ident_fdr_peptide"],
ident_fdr_protein=user_input["ident_fdr_protein"],
enable_match_between_runs=user_input["enable_match_between_runs"],
precursor_mass_tolerance=user_input["precursor_mass_tolerance"],
fragment_mass_tolerance=user_input["fragment_mass_tolerance"],
enzyme=user_input["enzyme"],
allowed_miscleavages=user_input["allowed_miscleavages"],
min_peptide_length=user_input["min_peptide_length"],
max_peptide_length=user_input["max_peptide_length"],
intermediate_hash=str(hashlib.sha1(intermediate.to_string().encode("utf-8")).hexdigest()),
comments=user_input["comments_for_plotting"],
proteobench_version=proteobench.__version__,
)
result_datapoint.generate_id()
results = dict(
ChainMap(*[QuantDatapoint.get_metrics(intermediate, nr_observed) for nr_observed in range(1, 7)])
)
result_datapoint.results = results
result_datapoint.median_abs_epsilon_global = result_datapoint.results[default_cutoff_min_prec][
"median_abs_epsilon_global"
]
result_datapoint.mean_abs_epsilon_global = result_datapoint.results[default_cutoff_min_prec][
"mean_abs_epsilon_global"
]
result_datapoint.median_abs_epsilon_eq_species = result_datapoint.results[default_cutoff_min_prec][
"median_abs_epsilon_eq_species"
]
result_datapoint.mean_abs_epsilon_eq_species = result_datapoint.results[default_cutoff_min_prec][
"mean_abs_epsilon_eq_species"
]
result_datapoint.median_abs_epsilon_precision_global = result_datapoint.results[default_cutoff_min_prec][
"median_abs_epsilon_precision_global"
]
result_datapoint.mean_abs_epsilon_precision_global = result_datapoint.results[default_cutoff_min_prec][
"mean_abs_epsilon_precision_global"
]
result_datapoint.median_abs_epsilon_precision_eq_species = result_datapoint.results[default_cutoff_min_prec][
"median_abs_epsilon_precision_eq_species"
]
result_datapoint.mean_abs_epsilon_precision_eq_species = result_datapoint.results[default_cutoff_min_prec][
"mean_abs_epsilon_precision_eq_species"
]
result_datapoint.nr_prec = result_datapoint.results[default_cutoff_min_prec]["nr_prec"]
results_series = pd.Series(dataclasses.asdict(result_datapoint))
return results_series
[docs]
def get_epsilon_metrics(df: pd.DataFrame, min_nr_observed: int, agg: str = "median") -> dict[str, float]:
"""
Compute epsilon-based accuracy metrics using specified aggregation.
Parameters
----------
df : pd.DataFrame
DataFrame with epsilon column (deviation from expected ratio)
min_nr_observed : int
Filter threshold for minimum observations
agg : str
Aggregation method: "median" or "mean"
Returns
-------
dict
Accuracy metrics: global, equal-species average, and per-species values
"""
df_slice = df[df["nr_observed"] >= min_nr_observed]
agg_func = (lambda x: x.abs().median()) if agg == "median" else (lambda x: x.abs().mean())
per_species_acc = df_slice.groupby("species")["epsilon"].apply(agg_func)
return {
f"{agg}_abs_epsilon_global": agg_func(df_slice["epsilon"]),
f"{agg}_abs_epsilon_eq_species": per_species_acc.mean(),
**{f"{agg}_abs_epsilon_{sp}": v for sp, v in per_species_acc.items()},
}
[docs]
def get_precision_metrics(df: pd.DataFrame, min_nr_observed: int, agg: str = "median") -> dict[str, float]:
"""
Compute precision metrics directly from log2FC (log2_A_vs_B) column.
Precision measures deviation from the empirical center (reproducibility),
computed independently from expected ratios.
Parameters
----------
df : pd.DataFrame
DataFrame with log2_A_vs_B and species columns
min_nr_observed : int
Filter threshold for minimum observations
agg : str
Aggregation method: "median" or "mean"
Returns
-------
dict
Precision metrics including:
- {agg}_log2_empirical_{species}: Center of log2FC distribution per species
- {agg}_abs_epsilon_precision_global: Global aggregated precision
- {agg}_abs_epsilon_precision_eq_species: Equal-weighted species average
- {agg}_abs_epsilon_precision_{species}: Per-species precision values
"""
df_slice = df[df["nr_observed"] >= min_nr_observed]
# Compute empirical center per species
center_func = (lambda x: x.median()) if agg == "median" else (lambda x: x.mean())
per_species_center = df_slice.groupby("species")["log2_A_vs_B"].transform(center_func)
# Precision = deviation from empirical center
epsilon_precision = df_slice["log2_A_vs_B"] - per_species_center
# Aggregate precision per species
agg_func = (lambda x: x.abs().median()) if agg == "median" else (lambda x: x.abs().mean())
# Create temp df for groupby
prec_df = df_slice[["species"]].copy()
prec_df["epsilon_precision"] = epsilon_precision
per_species_prec = prec_df.groupby("species")["epsilon_precision"].apply(agg_func)
# Get the empirical centers per species
empirical_centers = df_slice.groupby("species")["log2_A_vs_B"].apply(center_func)
return {
**{f"{agg}_log2_empirical_{sp}": v for sp, v in empirical_centers.items()},
f"{agg}_abs_epsilon_precision_global": agg_func(epsilon_precision),
f"{agg}_abs_epsilon_precision_eq_species": per_species_prec.mean(),
**{f"{agg}_abs_epsilon_precision_{sp}": v for sp, v in per_species_prec.items()},
}
[docs]
def get_cv_metrics(df: pd.DataFrame, min_nr_observed: int) -> dict[str, float]:
"""Compute CV quantiles."""
df_slice = df[df["nr_observed"] >= min_nr_observed]
cv_q = df_slice[["CV_A", "CV_B"]].quantile([0.5, 0.75, 0.9, 0.95])
cv_avg = cv_q.mean(axis=1)
return {
"CV_median": cv_avg.loc[0.50],
"CV_q75": cv_avg.loc[0.75],
"CV_q90": cv_avg.loc[0.90],
"CV_q95": cv_avg.loc[0.95],
}
[docs]
def get_roc_metrics(df: pd.DataFrame, min_nr_observed: int) -> dict[str, float]:
"""Compute ROC-AUC metrics."""
df_slice = df[df["nr_observed"] >= min_nr_observed]
return {
"roc_auc": compute_roc_auc(df_slice),
"roc_auc_directional": compute_roc_auc_directional(df_slice),
}
[docs]
def get_count_metrics(df: pd.DataFrame, min_nr_observed: int) -> dict[str, int]:
"""Compute precursor counts (total and per-species)."""
df_slice = df[df["nr_observed"] >= min_nr_observed]
species_counts = df_slice["species"].value_counts().to_dict()
return {
"nr_prec": len(df_slice),
**{f"nr_prec_{sp}": count for sp, count in species_counts.items()},
}
[docs]
def get_metrics(df: pd.DataFrame, min_nr_observed: int = 1) -> dict[int, dict[str, float]]:
"""
Compute all benchmark metrics (backward compatible wrapper).
Merges: epsilon (accuracy) + precision + cv + roc + counts
"""
df_slice = df[df["nr_observed"] >= min_nr_observed]
return {
min_nr_observed: {
**QuantDatapoint.get_epsilon_metrics(df, min_nr_observed, "median"),
**QuantDatapoint.get_epsilon_metrics(df, min_nr_observed, "mean"),
**QuantDatapoint.get_precision_metrics(df, min_nr_observed, "median"),
**QuantDatapoint.get_precision_metrics(df, min_nr_observed, "mean"),
**QuantDatapoint.get_cv_metrics(df, min_nr_observed),
**QuantDatapoint.get_roc_metrics(df, min_nr_observed),
**QuantDatapoint.get_count_metrics(df, min_nr_observed),
"variance_epsilon_global": df_slice["epsilon"].var(),
}
}
[docs]
@staticmethod
def get_metrics_old(df: pd.DataFrame, min_nr_observed: int = 1) -> Dict[int, Dict[str, float]]:
"""
Compute various statistical metrics from the provided DataFrame for the benchmark.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the benchmark results.
min_nr_observed : int, optional
The minimum number of observed values for a valid computation. Defaults to 1.
Returns
-------
dict
A dictionary containing computed metrics such as 'median_abs_epsilon', 'variance_epsilon', etc.
"""
# Filter DataFrame by the minimum number of observations
df_slice = df[df["nr_observed"] >= min_nr_observed]
nr_prec = len(df_slice)
# Calculate the median absolute epsilon (insensitive to outliers)
median_abs_epsilon = df_slice["epsilon"].abs().median()
# Calculate the mean absolute epsilon (sensitive to outliers)
mean_abs_epsilon = df_slice["epsilon"].abs().mean()
# Calculate the variance of epsilon (sensitive to outliers)
variance_epsilon = df_slice["epsilon"].var()
# Compute the median of the coefficient of variation (CV) for both 'CV_A' and 'CV_B'
cv_median = (df_slice["CV_A"].median() + df_slice["CV_B"].median()) / 2
cv_q75 = (df_slice["CV_A"].quantile(0.75) + df_slice["CV_B"].quantile(0.75)) / 2
cv_q90 = (df_slice["CV_A"].quantile(0.9) + df_slice["CV_B"].quantile(0.9)) / 2
cv_q95 = (df_slice["CV_A"].quantile(0.95) + df_slice["CV_B"].quantile(0.95)) / 2
return {
min_nr_observed: {
"median_abs_epsilon": median_abs_epsilon,
"mean_abs_epsilon": mean_abs_epsilon,
"variance_epsilon": variance_epsilon,
"nr_prec": nr_prec,
"CV_median": cv_median,
"CV_q90": cv_q90,
"CV_q75": cv_q75,
"CV_q95": cv_q95,
}
}