"""Streamlit-based web interface for ProteoBench."""
import copy
import glob
import json
import logging
import os
import tempfile
import uuid
import zipfile
from datetime import datetime
from pprint import pformat
from typing import Any, Dict, Optional
import pages.texts.proteobench_builder as pbb
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
import streamlit_utils
from pages.pages_variables.Quant.lfq_DDA_ion_QExactive_variables import (
VariablesDDAQuant,
)
from streamlit_extras.let_it_rain import rain
from proteobench.io.params import ProteoBenchParameters
from proteobench.io.parsing.parse_settings import ParseSettingsBuilder
from proteobench.io.parsing.utils import add_maxquant_fixed_modifications
from proteobench.modules.quant.quant_lfq_ion_DDA_QExactive import (
DDAQuantIonModuleQExactive as IonModule,
)
from proteobench.plotting.plot_quant import PlotDataPoint
logger: logging.Logger = logging.getLogger(__name__)
[docs]
def compare_dictionaries(old_dict, new_dict):
"""
Generate a human-readable string describing differences between two dictionaries.
Parameters
----------
old_dict : dict
The old dictionary.
new_dict : dict
The new dictionary.
Returns
-------
str
The human-readable string describing the differences between the two dictionaries.
"""
changes = []
# Get all unique keys across both dictionaries
all_keys = set(old_dict.keys()).union(set(new_dict.keys()))
for key in all_keys:
old_value = old_dict.get(key, "[MISSING]")
new_value = new_dict.get(key, "[MISSING]")
if str(old_value) != str(new_value) and not (old_value == None and new_value == "[MISSING]"):
changes.append(f"- **{key}**: `{old_value}` → `{new_value}`")
if changes:
return "\n ### Parameter changes detected:\n" + "\n".join(changes)
else:
return "\n ### No parameter changes detected. \n"
[docs]
class QuantUIObjects:
"""
Main class for the Streamlit interface of ProteoBench quantification.
This class handles the creation of the Streamlit UI elements, including the main page layout,
input forms, results display, and data submission elements.
Parameters
----------
variables_quant : VariablesDDAQuant
The variables for the quantification module.
ionmodule : IonModule
The quantification module.
parsesettingsbuilder : ParseSettingsBuilder
The parse settings builder.
"""
def __init__(
self, variables_quant: VariablesDDAQuant, ionmodule: IonModule, parsesettingsbuilder: ParseSettingsBuilder
) -> None:
"""
Initialize the Streamlit UI objects for the quantification modules.
Parameters
----------
variables_quant : VariablesDDAQuant
The variables for the quantification module.
ionmodule : IonModule
The quantification module.
parsesettingsbuilder : ParseSettingsBuilder
The parse settings builder.
"""
self.variables_quant: VariablesDDAQuant = variables_quant
self.ionmodule: IonModule = ionmodule
self.parsesettingsbuilder: ParseSettingsBuilder = parsesettingsbuilder
self.user_input: Dict[str, Any] = dict()
# Create page config and sidebar
pbb.proteobench_page_config()
pbb.proteobench_sidebar()
self.first_point_plotted = False
st.session_state[self.variables_quant.submit] = False
self.stop_duplicating = False
if self.variables_quant.params_file_dict not in st.session_state.keys():
st.session_state[self.variables_quant.params_file_dict] = dict()
if self.variables_quant.slider_id_submitted_uuid not in st.session_state.keys():
st.session_state[self.variables_quant.slider_id_submitted_uuid] = str()
def _generate_text_area(self, input_format: str, content: dict, key: str = "") -> Any:
"""
Generate a text area input field.
Parameters
----------
input_format : str
The input format.
content : dict
The content of the text area.
key : str
The key of the text area.
Returns
-------
Any
The text area input field.
"""
placeholder = content.get("placeholder")
if key in st.session_state[self.variables_quant.params_file_dict].keys():
value = st.session_state[self.variables_quant.params_file_dict].get(key) # Get parsed value if available
else:
value = content.get("value", {}).get(input_format)
height = content.get("height", 200) # Default height if not specified
return st.text_area(
content["label"],
placeholder=placeholder,
key=self.variables_quant.prefix_params + key,
value=value,
height=height,
on_change=self.update_parameters_submission_form(
key, st.session_state.get(self.variables_quant.prefix_params + key, 0)
),
)
# Function to update session state dictionary
def _generate_text_input(self, input_format: str, content: dict, key: str = "", editable: bool = True) -> Any:
"""
Generate a text input field.
Parameters
----------
input_format : str
The input format.
content : dict
The content of the text input field.
key : str
The key of the text input field.
Returns
-------
Any
The text input field.
"""
placeholder = content.get("placeholder")
if key in st.session_state[self.variables_quant.params_file_dict].keys():
value = st.session_state[self.variables_quant.params_file_dict].get(key) # Get parsed value if available
else:
value = content.get("value", {}).get(input_format)
return st.text_input(
content["label"],
placeholder=placeholder,
key=self.variables_quant.prefix_params + key,
value=value,
on_change=self.update_parameters_submission_form(
key, st.session_state.get(self.variables_quant.prefix_params + key, 0)
),
disabled=not editable,
)
def _generate_number_input(self, content: dict, key: str = "", editable: bool = True) -> Any:
"""
Generate a number input field.
Parameters
----------
content : dict
The content of the number input field.
key : str
The key of the number input field.
editable : bool
Whether the number input field is editable.
Returns
-------
Any
The number input field.
"""
if key in st.session_state[self.variables_quant.params_file_dict].keys():
value = st.session_state[self.variables_quant.params_file_dict].get(key) # Get parsed value if available
else:
value = content.get("value", {}).get("min_value")
return st.number_input(
content["label"],
value=value,
key=self.variables_quant.prefix_params + key,
format=content["format"],
min_value=content["min_value"],
max_value=content["max_value"],
on_change=self.update_parameters_submission_form(
key, st.session_state.get(self.variables_quant.prefix_params + key, 0)
),
disabled=not editable,
)
def _generate_selectbox(self, input_format: str, content: dict, key: str = "", editable: bool = True) -> Any:
"""
Generate a selectbox input field.
Parameters
----------
input_format : str
The input format.
content : dict
The content of the selectbox.
key : str
The key of the selectbox.
editable : bool
Whether the selectbox is editable.
Returns
-------
Any
The selectbox input field.
"""
options = content.get("options", [])
if key in st.session_state[self.variables_quant.params_file_dict].keys():
value = st.session_state[self.variables_quant.params_file_dict].get(key) # Get parsed value if available
else:
value = content.get("value", {}).get(input_format)
index = options.index(value) if value in options else 0
return st.selectbox(
content["label"],
options,
key=self.variables_quant.prefix_params + key,
index=index,
on_change=self.update_parameters_submission_form(
key, st.session_state.get(self.variables_quant.prefix_params + key, 0)
),
disabled=not editable,
)
def _generate_checkbox(self, content: dict, key: str = "", editable: bool = True) -> Any:
"""
Generate a checkbox input field.
Parameters
----------
content : dict
The content of the checkbox.
key : str
The key of the checkbox.
editable : bool
Whether the checkbox is editable.
Returns
-------
Any
The checkbox input field.
"""
value = content.get("value", {})
if key in st.session_state[self.variables_quant.params_file_dict].keys():
value = st.session_state[self.variables_quant.params_file_dict].get(key)
return st.checkbox(
content["label"],
key=self.variables_quant.prefix_params + key,
value=value,
on_change=self.update_parameters_submission_form(
key, st.session_state.get(self.variables_quant.prefix_params + key, 0)
),
disabled=not editable,
)
[docs]
def initialize_main_slider(self) -> None:
"""
Initialize the slider for the main data.
"""
if self.variables_quant.slider_id_uuid not in st.session_state.keys():
st.session_state[self.variables_quant.slider_id_uuid] = uuid.uuid4()
if st.session_state[self.variables_quant.slider_id_uuid] not in st.session_state.keys():
st.session_state[st.session_state[self.variables_quant.slider_id_uuid]] = (
self.variables_quant.default_val_slider
)
[docs]
def generate_main_selectbox(self) -> None:
"""
Create the selectbox for the Streamlit UI.
"""
if self.variables_quant.selectbox_id_uuid not in st.session_state.keys():
st.session_state[self.variables_quant.selectbox_id_uuid] = uuid.uuid4()
try:
# TODO: Other labels based on different modules, e.g. mass tolerances are less relevant for DIA
st.selectbox(
"Select label to plot",
["None", "precursor_mass_tolerance", "fragment_mass_tolerance", "enable_match_between_runs"],
key=st.session_state[self.variables_quant.selectbox_id_uuid],
)
except Exception as e:
st.error(f"Unable to create the selectbox: {e}", icon="🚨")
[docs]
def generate_submitted_selectbox(self) -> None:
"""
Create the selectbox for the Streamlit UI.
"""
if self.variables_quant.selectbox_id_submitted_uuid not in st.session_state.keys():
st.session_state[self.variables_quant.selectbox_id_submitted_uuid] = uuid.uuid4()
try:
st.selectbox(
"Select label to plot",
["None", "precursor_mass_tolerance", "fragment_mass_tolerance"],
key=st.session_state[self.variables_quant.selectbox_id_submitted_uuid],
)
except Exception as e:
st.error(f"Unable to create the selectbox: {e}", icon="🚨")
[docs]
def initialize_submitted_slider(self) -> None:
"""
Initialize the slider for the submitted data.
"""
if self.variables_quant.slider_id_submitted_uuid not in st.session_state.keys():
st.session_state[self.variables_quant.slider_id_submitted_uuid] = uuid.uuid4()
if st.session_state[self.variables_quant.slider_id_submitted_uuid] not in st.session_state.keys():
st.session_state[st.session_state[self.variables_quant.slider_id_submitted_uuid]] = (
self.variables_quant.default_val_slider
)
[docs]
def display_submitted_results(self) -> None:
"""
Display the results section of the page for submitted data.
"""
# handled_submission = self.process_submission_form()
# if handled_submission == False:
# return
self.initialize_submitted_data_points()
data_points_filtered = self.filter_data_submitted_slider()
metric = st.radio(
"Select metric to plot",
options=["Median", "Mean"],
help="Toggle between median and mean absolute difference metrics.",
key="placeholder2", # TODO: add to variables
)
if len(data_points_filtered) == 0:
st.error("No datapoints available for plotting", icon="🚨")
try:
fig_metric = PlotDataPoint.plot_metric(
data_points_filtered,
metric=metric,
label=st.session_state[st.session_state[self.variables_quant.selectbox_id_submitted_uuid]],
)
st.plotly_chart(fig_metric, use_container_width=True)
except Exception as e:
st.error(f"Unable to plot the datapoints: {e}", icon="🚨")
st.session_state[self.variables_quant.table_id_uuid] = uuid.uuid4()
st.data_editor(
st.session_state[self.variables_quant.all_datapoints_submitted],
key=st.session_state[self.variables_quant.table_id_uuid],
on_change=self.handle_submitted_table_edits,
)
st.title("Public submission")
st.markdown(
"If you want to make this point — and the associated data — publicly available, please go to “Public Submission"
)
[docs]
def display_existing_results(self) -> None:
"""
Display the results section of the page for existing data.
"""
self.initialize_main_data_points()
data_points_filtered = self.filter_data_main_slider()
metric = st.radio(
"Select metric to plot",
options=["Median", "Mean"],
help="Toggle between median and mean absolute difference metrics.",
)
if len(data_points_filtered) == 0:
st.error("No datapoints available for plotting", icon="🚨")
try:
fig_metric = PlotDataPoint.plot_metric(
data_points_filtered,
label=st.session_state[st.session_state[self.variables_quant.selectbox_id_uuid]],
metric=metric,
)
st.plotly_chart(fig_metric, use_container_width=True)
except Exception as e:
st.error(f"Unable to plot the datapoints: {e}", icon="🚨")
st.dataframe(data_points_filtered)
self.display_download_section()
[docs]
def submit_to_repository(self, params) -> Optional[str]:
"""
Handle the submission process of the benchmark results to the ProteoBench repository.
Parameters
----------
params : ProteoBenchParameters
The parameters for the submission.
Returns
-------
str, optional
The URL of the pull request if the submission was successful.
"""
st.session_state[self.variables_quant.submit] = True
button_pressed = self.generate_submission_button() # None or 'button_pressed'
if not button_pressed: # if button_pressed is None
return None
# MaxQuant fixed modification handling
if self.user_input["input_format"] == "MaxQuant":
st.session_state[self.variables_quant.result_perf] = add_maxquant_fixed_modifications(
params, st.session_state[self.variables_quant.result_perf]
)
# Overwrite the dataframes for submission
self.copy_dataframes_for_submission()
self.clear_highlight_column()
pr_url = self.create_pull_request(params)
if pr_url:
self.save_intermediate_submission_data()
return pr_url
[docs]
def show_submission_success_message(self, pr_url) -> None:
"""
Handle the UI updates and notifications after a successful submission.
Parameters
----------
pr_url : str
The URL of the pull request.
"""
if st.session_state[self.variables_quant.submit]:
st.subheader("SUCCESS")
st.markdown(self.variables_quant.texts.ShortMessages.submission_processing_warning)
try:
st.write(f"Follow your submission approval here: [{pr_url}]({pr_url})")
except UnboundLocalError:
pass
st.session_state[self.variables_quant.submit] = False
rain(emoji="🎈", font_size=54, falling_speed=5, animation_length=1)
[docs]
def generate_submission_ui_elements(self) -> None:
"""
Create the UI elements necessary for data submission, including metadata uploader and comments section.
"""
try:
self.copy_dataframes_for_submission()
self.submission_ready = True
except:
self.submission_ready = False
st.error(":x: Please provide a result file", icon="🚨")
self.generate_metadata_uploader()
# TODO: change additional_params_json for other modules, to capture relevant parameters
[docs]
def generate_additional_parameters_fields(self) -> None:
"""
Create the additional parameters section of the form and initializes the parameter fields.
"""
with open(self.variables_quant.additional_params_json) as file:
config = json.load(file)
for key, value in config.items():
if key.lower() == "software_name":
editable = False
else:
editable = True
if key == "comments_for_plotting":
self.user_input[key] = self.generate_input_widget(self.user_input["input_format"], value, editable)
else:
self.user_input[key] = None
[docs]
def generate_text_area_widget(self, input_format: str, content: dict, editable: bool = True) -> Any:
"""
Generate a text area input field.
Parameters
----------
input_format : str
The input format.
content : dict
The content of the text area.
editable : bool
Whether the text area is editable.
Returns
-------
Any
The text area input field.
"""
placeholder = content.get("placeholder")
value = content.get("value", {}).get(input_format)
height = content.get("height", 200)
return st.text_area(
content["label"], placeholder=placeholder, value=value, height=height, disabled=not editable
)
[docs]
def initialize_main_data_points(self) -> None:
"""
Initialize the all_datapoints variable in the session state.
"""
if self.variables_quant.all_datapoints not in st.session_state.keys():
st.session_state[self.variables_quant.all_datapoints] = None
st.session_state[self.variables_quant.all_datapoints] = self.ionmodule.obtain_all_data_points(
all_datapoints=st.session_state[self.variables_quant.all_datapoints]
)
[docs]
def initialize_submitted_data_points(self) -> None:
"""
Initialize the all_datapoints variable in the session state.
"""
if self.variables_quant.all_datapoints_submitted not in st.session_state.keys():
st.session_state[self.variables_quant.all_datapoints_submitted] = None
st.session_state[self.variables_quant.all_datapoints_submitted] = self.ionmodule.obtain_all_data_points(
all_datapoints=st.session_state[self.variables_quant.all_datapoints_submitted]
)
[docs]
def filter_data_main_slider(self) -> pd.DataFrame:
"""
Filter the data points based on the slider value.
Returns
-------
pandas.DataFrame
The filtered data points.
"""
if self.variables_quant.slider_id_uuid in st.session_state.keys():
return self.ionmodule.filter_data_point(
st.session_state[self.variables_quant.all_datapoints],
st.session_state[st.session_state[self.variables_quant.slider_id_uuid]],
)
[docs]
def filter_data_submitted_slider(self) -> pd.DataFrame:
"""
Filter the data points based on the slider value.
Returns
-------
pd.DataFrame
The filtered data points.
"""
if (
self.variables_quant.slider_id_submitted_uuid in st.session_state.keys()
and self.variables_quant.all_datapoints_submitted in st.session_state.keys()
):
return self.ionmodule.filter_data_point(
st.session_state[self.variables_quant.all_datapoints_submitted],
st.session_state[st.session_state[self.variables_quant.slider_id_submitted_uuid]],
)
[docs]
def set_highlight_column_in_submitted_data(self) -> None:
"""
Initialize the highlight column in the data points.
"""
df = st.session_state[self.variables_quant.all_datapoints_submitted]
if (
self.variables_quant.highlight_list_submitted not in st.session_state.keys()
and "Highlight" not in df.columns
):
df.insert(0, "Highlight", [False] * len(df.index))
elif "Highlight" not in df.columns:
df.insert(0, "Highlight", st.session_state[self.variables_quant.highlight_list_submitted])
elif "Highlight" in df.columns:
# Not sure how 'Highlight' column became object dtype
df["Highlight"] = df["Highlight"].astype(bool).fillna(False)
# only needed for last elif, but to be sure apply always:
st.session_state[self.variables_quant.all_datapoints_submitted] = df
[docs]
def generate_main_slider(self) -> None:
"""
Create a slider input.
"""
if self.variables_quant.slider_id_uuid not in st.session_state:
st.session_state[self.variables_quant.slider_id_uuid] = uuid.uuid4()
slider_key = st.session_state[self.variables_quant.slider_id_uuid]
st.markdown(open(self.variables_quant.description_slider_md, "r").read())
st.select_slider(
label="Minimal precursor quantifications (# samples)",
options=[1, 2, 3, 4, 5, 6],
value=st.session_state.get(slider_key, self.variables_quant.default_val_slider),
key=slider_key,
)
[docs]
def generate_submitted_slider(self) -> None:
"""
Create a slider input.
"""
if self.variables_quant.slider_id_submitted_uuid not in st.session_state:
st.session_state[self.variables_quant.slider_id_submitted_uuid] = uuid.uuid4()
slider_key = st.session_state[self.variables_quant.slider_id_submitted_uuid]
st.markdown(open(self.variables_quant.description_slider_md, "r").read())
st.select_slider(
label="Minimal precursor quantifications (# samples)",
options=[1, 2, 3, 4, 5, 6],
value=st.session_state.get(slider_key, self.variables_quant.default_val_slider),
key=slider_key,
)
[docs]
def display_download_section(self, reset_uuid=False) -> None:
"""
Render the selector and area for raw data download.
Parameters
----------
reset_uuid : bool, optional
Whether to reset the UUID, by default False.
"""
if len(st.session_state[self.variables_quant.all_datapoints]) == 0:
st.error("No data available for download.", icon="🚨")
return
downloads_df = st.session_state[self.variables_quant.all_datapoints][["id", "intermediate_hash"]]
downloads_df.set_index("intermediate_hash", drop=False, inplace=True)
if self.variables_quant.placeholder_downloads_container not in st.session_state.keys() or reset_uuid:
st.session_state[self.variables_quant.placeholder_downloads_container] = st.empty()
st.session_state[self.variables_quant.download_selector_id_uuid] = uuid.uuid4()
with st.session_state[self.variables_quant.placeholder_downloads_container].container(border=True):
st.subheader("Download raw datasets")
st.selectbox(
"Select dataset",
downloads_df["intermediate_hash"],
index=None,
key=st.session_state[self.variables_quant.download_selector_id_uuid],
format_func=lambda x: downloads_df["id"][x],
)
if (
st.session_state[st.session_state[self.variables_quant.download_selector_id_uuid]] != None
and st.secrets["storage"]["dir"] != None
):
dataset_path = (
st.secrets["storage"]["dir"]
+ "/"
+ st.session_state[st.session_state[self.variables_quant.download_selector_id_uuid]]
)
if os.path.isdir(dataset_path):
files = os.listdir(dataset_path)
for file_name in files:
path_to_file = dataset_path + "/" + file_name
with open(path_to_file, "rb") as file:
st.download_button(file_name, file, file_name=file_name)
else:
st.write("Directory for this dataset does not exist, this should not happen.")
[docs]
def display_indepth_plots(self) -> None:
"""
Display the dataset selection dropdown and plot the selected dataset.
"""
if self.variables_quant.all_datapoints_submitted not in st.session_state.keys():
st.error("No data available for plotting.", icon="🚨")
return
if st.session_state[self.variables_quant.all_datapoints_submitted].empty:
st.error("No data available for plotting.", icon="🚨")
return
downloads_df = st.session_state[self.variables_quant.all_datapoints_submitted][["id", "intermediate_hash"]]
downloads_df.set_index("intermediate_hash", drop=False, inplace=True)
if self.variables_quant.placeholder_dataset_selection_container not in st.session_state.keys():
st.session_state[self.variables_quant.placeholder_dataset_selection_container] = st.empty()
st.session_state[self.variables_quant.dataset_selector_id_uuid] = uuid.uuid4()
st.subheader("Select dataset to plot")
dataset_options = [("Uploaded dataset", None)] + list(
zip(downloads_df["id"], downloads_df["intermediate_hash"])
)
dataset_selection = st.selectbox(
"Select dataset",
dataset_options,
index=0,
key=st.session_state[self.variables_quant.dataset_selector_id_uuid],
format_func=lambda x: x[0],
)
public_id, selected_hash = dataset_selection
self.generate_indepth_plots(True, public_id=public_id, public_hash=selected_hash)
[docs]
def clear_highlight_column(self) -> None:
"""
Remove the highlight column from the submission data if it exists.
"""
if "Highlight" in st.session_state[self.variables_quant.all_datapoints_submission].columns:
st.session_state[self.variables_quant.all_datapoints_submission].drop("Highlight", inplace=True, axis=1)
[docs]
def create_pull_request(self, params: Any) -> Optional[str]:
"""
Submit the pull request with the benchmark results and returns the PR URL.
Parameters
----------
params : Any
The parameters object.
Returns
-------
Optional[str]
The URL of the pull request.
"""
user_comments = self.user_input["comments_for_submission"]
changed_params_str = compare_dictionaries(self.params_file_dict_copy, params.__dict__)
try:
pr_url = self.ionmodule.clone_pr(
st.session_state[self.variables_quant.all_datapoints_submission],
params,
remote_git=self.variables_quant.github_link_pr,
submission_comments=user_comments + "\n" + changed_params_str,
)
except Exception as e:
st.error(f"Unable to create the pull request: {e}", icon="🚨")
pr_url = None
if not pr_url:
del st.session_state[self.variables_quant.submit]
return pr_url
[docs]
def copy_dataframes_for_submission(self) -> None:
"""
Create copies of the dataframes before submission.
"""
if st.session_state[self.variables_quant.all_datapoints_submitted] is not None:
st.session_state[self.variables_quant.all_datapoints_submission] = st.session_state[
self.variables_quant.all_datapoints_submitted
].copy()
if st.session_state[self.variables_quant.input_df] is not None:
st.session_state[self.variables_quant.input_df_submission] = st.session_state[
self.variables_quant.input_df
].copy()
if st.session_state[self.variables_quant.result_perf] is not None:
st.session_state[self.variables_quant.result_performance_submission] = st.session_state[
self.variables_quant.result_perf
].copy()
[docs]
def generate_confirmation_checkbox(self) -> None:
"""
Create the confirmation checkbox for metadata correctness.
"""
st.session_state[self.variables_quant.check_submission] = st.checkbox(
"I confirm that the metadata is correct",
)
self.stop_duplicating = True
[docs]
def execute_proteobench(self) -> None:
"""
Execute the ProteoBench benchmarking process.
"""
if self.variables_quant.all_datapoints_submitted not in st.session_state:
self.initialize_main_data_points()
result_performance, all_datapoints, input_df = self.run_benchmarking_process()
st.session_state[self.variables_quant.all_datapoints_submitted] = all_datapoints
self.set_highlight_column_in_submitted_data()
st.session_state[self.variables_quant.result_perf] = result_performance
st.session_state[self.variables_quant.input_df] = input_df
[docs]
def run_benchmarking_process(self):
"""
Execute the benchmarking process and returns the results.
Returns
-------
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
The benchmarking results, all data points, and the input data frame.
"""
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(self.user_input["input_csv"].getbuffer())
# reload buffer: https://stackoverflow.com/a/64478151/9684872
self.user_input["input_csv"].seek(0)
if st.session_state[self.variables_quant.slider_id_submitted_uuid] in st.session_state.keys():
set_slider_val = st.session_state[st.session_state[self.variables_quant.slider_id_submitted_uuid]]
else:
set_slider_val = self.variables_quant.default_val_slider
if self.variables_quant.all_datapoints_submitted in st.session_state.keys():
all_datapoints = st.session_state[self.variables_quant.all_datapoints_submitted]
else:
all_datapoints = st.session_state[self.variables_quant.all_datapoints]
return self.ionmodule.benchmarking(
self.user_input["input_csv"],
self.user_input["input_format"],
self.user_input,
all_datapoints,
default_cutoff_min_prec=set_slider_val,
)
[docs]
def handle_submitted_table_edits(self) -> None:
"""Callback function for handling edits made to the data table in the UI."""
edits = st.session_state[st.session_state[self.variables_quant.table_id_uuid]]["edited_rows"].items()
for k, v in edits:
try:
st.session_state[self.variables_quant.all_datapoints_submitted][list(v.keys())[0]].iloc[k] = list(
v.values()
)[0]
except TypeError:
return
st.session_state[self.variables_quant.highlight_list_submitted] = list(
st.session_state[self.variables_quant.all_datapoints_submitted]["Highlight"]
)
st.session_state[self.variables_quant.placeholder_table] = st.session_state[
self.variables_quant.all_datapoints_submitted
]
if len(st.session_state[self.variables_quant.all_datapoints]) == 0:
st.error("No datapoints available for plotting", icon="🚨")
try:
fig_metric = PlotDataPoint.plot_metric(
st.session_state[self.variables_quant.all_datapoints],
label=st.session_state[st.session_state[self.variables_quant.selectbox_id_uuid]],
)
except Exception as e:
st.error(f"Unable to plot the datapoints: {e}", icon="🚨")
st.session_state[self.variables_quant.fig_metric] = fig_metric
[docs]
def load_user_parameters(self) -> Any:
"""
Read and process the parameter files provided by the user.
Returns
-------
Any
The parsed parameters.
"""
params = None
try:
params = self.ionmodule.load_params_file(
self.user_input[self.variables_quant.meta_data], self.user_input["input_format"]
)
st.session_state[self.variables_quant.params_json_dict] = (
params.__dict__ if hasattr(params, "__dict__") else params
)
except KeyError:
st.error("Parsing of meta parameters file for this software is not supported yet.", icon="🚨")
except Exception as e:
input_f = self.user_input["input_format"]
st.error(
f"Unexpected error while parsing file. Make sure you provided a meta parameters file produced by {input_f}: {e}",
icon="🚨",
)
return params
[docs]
def generate_additional_parameters_fields_submission(self) -> None:
"""
Create the additional parameters section of the form and initializes the parameter fields.
"""
st.markdown(self.variables_quant.texts.ShortMessages.initial_parameters)
# Load JSON config
with open(self.variables_quant.additional_params_json) as file:
config = json.load(file)
# Check if parsed values exist in session state
parsed_params = st.session_state.get(self.variables_quant.params_json_dict, {})
st_col1, st_col2, st_col3 = st.columns(3)
input_param_len = int(len(config.items()) / 3)
for idx, (key, value) in enumerate(config.items()):
if key.lower() == "software_name":
editable = False
else:
editable = True
if idx < input_param_len:
with st_col1:
self.user_input[key] = self.generate_input_widget(
self.user_input["input_format"], value, key, editable=editable
)
elif idx < input_param_len * 2:
with st_col2:
self.user_input[key] = self.generate_input_widget(
self.user_input["input_format"], value, key, editable=editable
)
else:
with st_col3:
self.user_input[key] = self.generate_input_widget(
self.user_input["input_format"], value, key, editable=editable
)
[docs]
def generate_sample_name(self) -> str:
"""
Generate a unique sample name based on the input format, software version, and the current timestamp.
Returns
-------
str
The generated sample name.
"""
time_stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
sample_name = "-".join(
[
self.user_input["input_format"],
time_stamp,
]
)
return sample_name
[docs]
def display_public_submission_ui(self) -> None:
"""
Display the public submission section of the page in Tab 4.
"""
# Initialize Unchecked submission box variable
if self.variables_quant.check_submission not in st.session_state:
st.session_state[self.variables_quant.check_submission] = False
if self.variables_quant.first_new_plot:
self.generate_submission_ui_elements()
if self.user_input[self.variables_quant.meta_data]:
params = self.load_user_parameters()
st.session_state[self.variables_quant.params_file_dict] = params.__dict__
self.params_file_dict_copy = copy.deepcopy(params.__dict__)
print(self.params_file_dict_copy)
self.generate_additional_parameters_fields_submission()
self.generate_comments_section()
self.generate_confirmation_checkbox()
else:
params = None
if st.session_state[self.variables_quant.check_submission] and params != None:
get_form_values = self.get_form_values()
params = ProteoBenchParameters(**get_form_values)
pr_url = self.submit_to_repository(params)
if self.submission_ready == False:
return
if (
st.session_state[self.variables_quant.check_submission]
and params != None
and self.variables_quant.submit in st.session_state
and pr_url != None
):
self.show_submission_success_message(pr_url)
[docs]
def generate_indepth_plots(
self, recalculate: bool, public_id: Optional[str], public_hash: Optional[str]
) -> go.Figure:
"""
Generate and return plots based on the current benchmark data in Tab 2.5.
Parameters
----------
recalculate : bool
Whether to recalculate the plots.
public_id : Optional[str], optional
The dataset to plot, either "Uploaded dataset" or name of public run.
public_hash : Optional[str], optional
The hash of the selected public dataset. If None, the uploaded dataset is displayed.
Returns
-------
go.Figure
The generated plots for the selected dataset.
"""
# no uploaded dataset and no public dataset selected? nothing to plot!
if self.variables_quant.result_perf not in st.session_state.keys():
if public_hash is None:
st.error(":x: Please submit a result file or select a public run for display", icon="🚨")
return False
elif public_id == "Uploaded dataset":
st.error(":x: Please submit a result file in the Submit New Data Tab", icon="🚨")
return False
if public_id == "Uploaded dataset":
performance_data = st.session_state[self.variables_quant.result_perf]
else:
# Downloading the public performance data
performance_data = None
if st.secrets["storage"]["dir"] != None:
dataset_path = os.path.join(st.secrets["storage"]["dir"], public_hash)
# Define the path and the pattern
pattern = os.path.join(dataset_path, "*_data.zip")
# Use glob to find files matching the pattern
zip_files = glob.glob(pattern)
# Check that at least one match was found
if not zip_files:
st.error(":x: Could not find the files on the server", icon="🚨")
return
# (Optional) handle multiple matches if necessary
zip_path = zip_files[0] # Assumes first match is the desired one
# Open the ZIP file and extract the desired CSV
with zipfile.ZipFile(zip_path) as z:
with z.open("result_performance.csv.csv") as f:
performance_data = pd.read_csv(f)
# Filter the data based on the slider condition (as before)
performance_data = performance_data[
performance_data["nr_observed"] >= st.session_state[st.session_state[self.variables_quant.slider_id_uuid]]
]
if recalculate:
parse_settings = self.parsesettingsbuilder.build_parser(self.user_input["input_format"])
fig_logfc = PlotDataPoint.plot_fold_change_histogram(
performance_data, parse_settings.species_expected_ratio()
)
fig_CV = PlotDataPoint.plot_CV_violinplot(performance_data)
fig_MA = PlotDataPoint.plot_ma_plot(
performance_data,
parse_settings.species_expected_ratio(),
)
st.session_state[self.variables_quant.fig_cv] = fig_CV
st.session_state[self.variables_quant.fig_logfc] = fig_logfc
else:
fig_logfc = st.session_state[self.variables_quant.fig_logfc]
fig_CV = st.session_state[self.variables_quant.fig_cv]
fig_MA = st.session_state[self.variables_quant.fig_ma_plot]
if self.variables_quant.first_new_plot:
col1, col2 = st.columns(2)
col1.subheader("Log2 Fold Change distributions by species.")
col1.markdown(
"""
log2 fold changes calculated from {}
""".format(
public_id
)
)
col1.plotly_chart(fig_logfc, use_container_width=True)
col2.subheader("Coefficient of variation distribution in Condition A and B.")
col2.markdown(
"""
CVs calculated from {}
""".format(
public_id
)
)
col2.plotly_chart(fig_CV, use_container_width=True)
col1.markdown("---") # optional horizontal separator
col1.subheader("MA plot")
col1.markdown(
"""
MA plot calculated from {}
""".format(
public_id
)
)
# Example: plot another figure or add any other Streamlit element
# st.plotly_chart(fig_additional, use_container_width=True)
col1.plotly_chart(fig_MA, use_container_width=True)
else:
pass
st.subheader("Sample of the processed file for {}".format(public_id))
st.markdown(open(self.variables_quant.description_table_md, "r").read())
st.session_state[self.variables_quant.df_head] = st.dataframe(performance_data.head(100))
st.subheader("Download table")
random_uuid = uuid.uuid4()
if public_id == "Uploaded dataset":
# user uploaded data does not have sample name yet
sample_name = self.generate_sample_name()
else:
# use public run name as sample name
sample_name = public_id
st.download_button(
label="Download",
data=streamlit_utils.save_dataframe(performance_data),
file_name=f"{sample_name}.csv",
mime="text/csv",
key=f"{random_uuid}",
)
return fig_logfc
[docs]
def display_all_data_results_main(self) -> None:
"""Display the results for all data in Tab 1."""
st.title("Results (All Data)")
self.initialize_main_slider()
self.generate_main_slider()
self.generate_main_selectbox()
self.display_existing_results()
[docs]
def display_all_data_results_submitted(self) -> None:
"""Display the results for all data in Tab 3."""
st.title("Results (All Data)")
self.initialize_submitted_slider()
self.generate_submitted_slider()
self.generate_submitted_selectbox()
self.display_submitted_results()
if __name__ == "__main__":
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
QuantUIObjects()