Source code for proteobench.utils.server_io

import io
import json
import os
import zipfile
from collections import defaultdict

import pandas as pd
import requests
import toml
from bs4 import BeautifulSoup

from proteobench.modules.quant.quant_lfq_ion_DDA_QExactive import (
    DDAQuantIonModuleQExactive,
)
from proteobench.modules.quant.quant_lfq_ion_DIA_AIF import DIAQuantIonModuleAIF
from proteobench.modules.quant.quant_lfq_ion_DIA_Astral import DIAQuantIonModuleAstral
from proteobench.modules.quant.quant_lfq_ion_DIA_diaPASEF import (
    DIAQuantIonModulediaPASEF,
)
from proteobench.modules.quant.quant_lfq_ion_DIA_singlecell import (
    DIAQuantIonModulediaSC,
)
from proteobench.modules.quant.quant_lfq_peptidoform_DDA import (
    DDAQuantPeptidoformModule,
)
from proteobench.modules.quant.quant_lfq_peptidoform_DIA import (
    DIAQuantPeptidoformModule,
)

# Dictionary mapping module name strings to their classes
MODULE_CLASSES = {
    "DDAQuantIonModuleQExactive": DDAQuantIonModuleQExactive,
    "DIAQuantIonModuleAIF": DIAQuantIonModuleAIF,
    "DIAQuantIonModuleAstral": DIAQuantIonModuleAstral,
    "DIAQuantIonModulediaPASEF": DIAQuantIonModulediaPASEF,
    "DIAQuantIonModulediaSC": DIAQuantIonModulediaSC,
    "DDAQuantPeptidoformModule": DDAQuantPeptidoformModule,
    "DIAQuantPeptidoformModule": DIAQuantPeptidoformModule,
}

DATASETS_BASE_URL = "https://proteobench.cubimed.rub.de/datasets/"


[docs] def download_file(url: str, local_path: str, chunk_size: int = 8192) -> str: """ Download a file from URL to local path. Parameters ---------- url : str URL to download from local_path : str Local path to save file chunk_size : int Size of chunks for streaming download (default: 8192) Returns ------- str Path to downloaded file """ os.makedirs(os.path.dirname(local_path), exist_ok=True) print(f"Downloading file from {url}...") response = requests.get(url, stream=True) response.raise_for_status() with open(local_path, "wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: f.write(chunk) print(f"File downloaded to {local_path}") return local_path
[docs] def dataset_folder_exists(intermediate_hash: str, base_url: str = DATASETS_BASE_URL) -> bool: """ Check if a dataset folder already exists on the public server for a given intermediate hash. First tries a direct HEAD to the folder URL, then falls back to parsing the index page. Args: intermediate_hash: The hash to check for base_url: Base URL of the datasets server Returns: True if the dataset folder exists, False otherwise """ if not intermediate_hash: return False folder_url = f"{base_url.rstrip('/')}/{intermediate_hash.strip('/')}/" try: resp = requests.head(folder_url, allow_redirects=True, timeout=5) if resp.status_code == 200: return True # Some servers may redirect. If it ends in the folder, treat as exists. if resp.status_code in (301, 302, 303, 307, 308) and resp.headers.get("Location", "").rstrip("/").endswith( f"/{intermediate_hash.strip('/')}" ): return True except Exception: pass # Fallback: parse directory listing try: resp = requests.get(base_url, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") folder_links = { a.get("href", "").strip("/").split("/")[0] for a in soup.find_all("a") if a.get("href", "").endswith("/") } return intermediate_hash.strip("/") in folder_links except Exception: return False
[docs] def get_merged_json( repo_url="https://github.com/Proteobench/Results_quant_ion_DDA/archive/refs/heads/main.zip", write_to_file=False, outfile_name="combined_results.json", ): # Download ZIP archive from GitHub response = requests.get(repo_url) zip_bytes = io.BytesIO(response.content) # Extract ZIP contents to a local folder with zipfile.ZipFile(zip_bytes) as zip_ref: zip_ref.extractall(repo_url.split("/")[-5]) # Prepare base directory base_path = f"{repo_url.split('/')[-5]}/{repo_url.split('/')[-5]}-main" # Initialize combined JSON container combined_json = [] # Walk through directory and read all JSON files for root, dirs, files in os.walk(base_path): for file in files: if file.endswith(".json"): file_path = os.path.join(root, file) with open(file_path, "r", encoding="utf-8") as f: try: data = json.load(f) combined_json.append(data) except json.JSONDecodeError as e: print(f"Error reading {file_path}: {e}") if write_to_file: # Write combined JSON to a single output file (optional) with open(outfile_name, "w", encoding="utf-8") as out_file: json.dump(combined_json, out_file, ensure_ascii=False, indent=2) print(f"Combined {len(combined_json)} JSON files into 'combined_results.json'.") df = pd.json_normalize(combined_json) return df
[docs] def get_raw_data(df, base_url="https://proteobench.cubimed.rub.de/datasets/", output_directory="extracted_files"): hash_vis_dir = {} # Extract the hash list from the DataFrame hash_list = df["intermediate_hash"].tolist() # Fetch folder names from the webpage response = requests.get(base_url) response.raise_for_status() # Check for errors soup = BeautifulSoup(response.text, "html.parser") folder_links = [link["href"].strip("/") for link in soup.find_all("a") if link["href"].endswith("/")] # Filter folder links based on the hash list matching_folders = [folder for folder in folder_links if folder in hash_list] # Download and extract zip files from matching folders for folder in matching_folders: extract_dir = f"{output_directory}/{folder}" if os.path.exists(extract_dir) and os.listdir(extract_dir): print(f"Folder already exists and is not empty, skipping download: {extract_dir}") hash_vis_dir[folder] = extract_dir continue folder_url = f"{base_url}{folder}/" print(f"Processing folder: {folder_url}") # Fetch the folder page folder_response = requests.get(folder_url) folder_response.raise_for_status() folder_soup = BeautifulSoup(folder_response.text, "html.parser") zip_files = [link["href"] for link in folder_soup.find_all("a") if link["href"].endswith(".zip")] # Process each .zip file for zip_file in zip_files: zip_url = f"{folder_url}{zip_file}" print(f"Downloading: {zip_url}") # Download with a progress bar zip_response = requests.get(zip_url, stream=True) zip_response.raise_for_status() zip_filename = os.path.basename(zip_file) total_size = int(zip_response.headers.get("content-length", 0)) block_size = 1024 # 1 KB # Save the zip file with open(zip_filename, "wb") as f: for data in zip_response.iter_content(block_size): f.write(data) # Extract the zip file os.makedirs(extract_dir, exist_ok=True) with zipfile.ZipFile(zip_filename, "r") as zip_ref: zip_ref.extractall(extract_dir) print(f"Extracted contents to: {extract_dir}") # Cleanup downloaded .zip file os.remove(zip_filename) hash_vis_dir[folder] = extract_dir return hash_vis_dir
[docs] def make_submission(submission_files=[], token="", module_name=""): for submission_settings in submission_files: # TODO change to the correct module # Dictionary mapping module name strings to their classes if module_name not in MODULE_CLASSES: raise ValueError(f"Module {module_name} not recognized. Available modules: {list(MODULE_CLASSES.keys())}") module_class = MODULE_CLASSES[module_name] module_obj = module_class(token="") results_df = module_obj.obtain_all_data_points(all_datapoints=None) param_file = submission_settings["param_file"] input_file = submission_settings["input_file"] input_type = submission_settings["input_type"] default_cutoff_min_prec = submission_settings["default_cutoff_min_prec"] user_comments = submission_settings["user_comments"] user_config = defaultdict(lambda: "") results_intermediates, results_df_new, parsed_input = module_obj.benchmarking( input_file, input_type, user_config, results_df, default_cutoff_min_prec=default_cutoff_min_prec, ) results_df_new.tail(5) try: param_obj = module_obj.load_params_file([param_file], input_type) except: continue pr_url = module_obj.clone_pr( results_df_new, param_obj, remote_git="", submission_comments=user_comments, ) return pr_url