Spaces:
Sleeping
Sleeping
| """ | |
| Process and transform GuardBench leaderboard data. | |
| """ | |
| import json | |
| import os | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Tuple | |
| import numpy as np | |
| from src.display.utils import CATEGORIES, TEST_TYPES, METRICS | |
| # Constants for Integral Score calculation (mirrors guardbench library) | |
| MAX_PUNISHABLE_RUNTIME_MS = 6000.0 | |
| MIN_PUNISHABLE_RUNTIME_MS = 200.0 | |
| MAX_RUNTIME_PENALTY = 0.75 # Corresponds to 1.0 - MIN_TIME_FACTOR, library used 0.75 | |
| def calculate_integral_score(row: pd.Series) -> float: | |
| """ | |
| Calculate the integral score for a given model entry row. | |
| Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty. | |
| Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing. | |
| """ | |
| integral_score = 1.0 | |
| metric_count = 0 | |
| # Primary metric (using accuracy) | |
| for test_type in TEST_TYPES: | |
| metric_col = f"{test_type}_accuracy" | |
| if metric_col in row and pd.notna(row[metric_col]): | |
| # print(f"Found accuracy metric for {test_type}: {row[metric_col]}") | |
| integral_score *= row[metric_col] | |
| metric_count += 1 | |
| # print(f"Metric count: {metric_count}") | |
| # If no accuracy metrics were found at all, the score remains 1.0 before penalties. | |
| # The library returns 0.0 in this case (`return integral_score if count > 0 else 0.0`) | |
| # Let's add that check back before applying penalties. | |
| if metric_count == 0: | |
| return 0.0 | |
| # Error Penalty | |
| micro_error_col = "micro_avg_error_ratio" | |
| if micro_error_col in row and pd.notna(row[micro_error_col]): | |
| # Micro error is stored as %, convert back to ratio | |
| micro_error_ratio = row[micro_error_col] / 100.0 | |
| integral_score *= (1.0 - micro_error_ratio) | |
| # Runtime Penalty | |
| avg_runtime_ms = None # Initialize | |
| micro_runtime_col = "micro_avg_runtime_ms" | |
| if micro_runtime_col in row and pd.notna(row[micro_runtime_col]): | |
| avg_runtime_ms = row[micro_runtime_col] | |
| if avg_runtime_ms is not None: | |
| # Apply penalty based on runtime (only if micro avg runtime was found) | |
| runtime = max( | |
| min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS), | |
| MIN_PUNISHABLE_RUNTIME_MS, | |
| ) | |
| if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS: | |
| normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / ( | |
| MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS | |
| ) | |
| # Match reference library formula 1 | |
| time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time | |
| else: | |
| # Match reference library formula (though less critical when max==min) | |
| time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY) | |
| # Match reference library formula 2 (enforce minimum factor) | |
| time_factor = max(MAX_RUNTIME_PENALTY, time_factor) | |
| integral_score *= time_factor | |
| # Rooting is not done in the reference library's summary table calculation | |
| return integral_score | |
| def load_leaderboard_data(file_path: str) -> Dict: | |
| """ | |
| Load the leaderboard data from a JSON file. | |
| """ | |
| if not os.path.exists(file_path): | |
| version = "v0" | |
| if "_v" in file_path: | |
| version = file_path.split("_")[-1].split(".")[0] | |
| return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Ensure version field exists | |
| if "version" not in data: | |
| version = "v0" | |
| if "_v" in file_path: | |
| version = file_path.split("_")[-1].split(".")[0] | |
| data["version"] = version | |
| return data | |
| def save_leaderboard_data(data: Dict, file_path: str) -> None: | |
| """ | |
| Save the leaderboard data to a JSON file. | |
| """ | |
| # Ensure the directory exists | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| # Update the last_updated timestamp | |
| data["last_updated"] = datetime.now().isoformat() | |
| # Ensure version is set | |
| if "version" not in data: | |
| version = "v0" | |
| if "_v" in file_path: | |
| version = file_path.split("_")[-1].split(".")[0] | |
| data["version"] = version | |
| with open(file_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| def process_submission(submission_data: List[Dict]) -> List[Dict]: | |
| """ | |
| Process submission data and convert it to leaderboard entries. | |
| """ | |
| entries = [] | |
| for item in submission_data: | |
| # Create a new entry for the leaderboard | |
| entry = { | |
| "model_name": item.get("model_name", "Unknown Model"), | |
| "per_category_metrics": {}, | |
| "avg_metrics": {}, | |
| "submission_date": datetime.now().isoformat(), | |
| "version": item.get("version", "v0") | |
| } | |
| # Copy model metadata | |
| for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: | |
| if key in item: | |
| entry[key] = item[key] | |
| # Process per-category metrics | |
| if "per_category_metrics" in item: | |
| entry["per_category_metrics"] = item["per_category_metrics"] | |
| # Process average metrics | |
| if "avg_metrics" in item: | |
| entry["avg_metrics"] = item["avg_metrics"] | |
| entries.append(entry) | |
| return entries | |
| def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: | |
| """ | |
| Convert leaderboard data to a pandas DataFrame for display. | |
| """ | |
| rows = [] | |
| for entry in leaderboard_data.get("entries", []): | |
| model_name = entry.get("model_name", "Unknown Model") | |
| # Extract average metrics for main display | |
| row = { | |
| "model_name": model_name, | |
| "model_type": entry.get("model_type", "Unknown"), | |
| "mode": entry.get("mode", "Strict"), | |
| "submission_date": entry.get("submission_date", ""), | |
| "version": entry.get("version", "v0"), | |
| "guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() | |
| } | |
| # Add additional metadata fields if present | |
| for key in ["base_model", "revision", "precision", "weight_type"]: | |
| if key in entry: | |
| row[key] = entry[key] | |
| # CASE 1: Metrics are flat in the root | |
| for key, value in entry.items(): | |
| if any(test_type in key for test_type in TEST_TYPES) or \ | |
| key in ["average_f1", "average_recall", "average_precision", | |
| "macro_accuracy", "macro_recall", "total_evals_count"]: | |
| row[key] = value | |
| # CASE 2: Metrics are in avg_metrics structure | |
| avg_metrics = entry.get("avg_metrics", {}) | |
| if avg_metrics: | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics: | |
| metrics = avg_metrics[test_type] | |
| for metric in METRICS: | |
| if metric in metrics: | |
| col_name = f"{test_type}_{metric}" | |
| row[col_name] = metrics[metric] | |
| # Also add non-binary version for F1 scores | |
| if metric == "f1_binary": | |
| row[f"{test_type}_f1"] = metrics[metric] | |
| # Calculate averages if not present | |
| # Use accuracy for macro_accuracy | |
| if "macro_accuracy" not in row: | |
| accuracy_values = [] | |
| for test_type in TEST_TYPES: | |
| # Check avg_metrics structure first | |
| accuracy_val = None | |
| if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]): | |
| accuracy_val = avg_metrics[test_type]["accuracy"] | |
| # Check flat structure as fallback (might be redundant but safer) | |
| elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]): | |
| accuracy_val = row[f"{test_type}_accuracy"] | |
| if accuracy_val is not None: | |
| accuracy_values.append(accuracy_val) | |
| if accuracy_values: | |
| row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) | |
| # Use recall_binary for macro_recall | |
| if "macro_recall" not in row: | |
| recall_values = [] | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]): | |
| recall_values.append(avg_metrics[test_type]["recall_binary"]) | |
| if recall_values: | |
| row["macro_recall"] = sum(recall_values) / len(recall_values) | |
| if "total_evals_count" not in row: | |
| total_samples = 0 | |
| found_samples = False | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]): | |
| total_samples += avg_metrics[test_type]["sample_count"] | |
| found_samples = True | |
| if found_samples: | |
| row["total_evals_count"] = total_samples | |
| # Extract micro averages directly from entry if they exist (like in guardbench library) | |
| row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA) | |
| row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA) | |
| # Convert error ratio to percentage for consistency with display name | |
| if pd.notna(row["micro_avg_error_ratio"]): | |
| row["micro_avg_error_ratio"] *= 100 | |
| rows.append(row) | |
| # Create DataFrame and sort by average F1 score | |
| df = pd.DataFrame(rows) | |
| # Ensure all expected columns exist | |
| for test_type in TEST_TYPES: | |
| for metric in METRICS: | |
| col_name = f"{test_type}_{metric}" | |
| if col_name not in df.columns: | |
| df[col_name] = pd.NA # Use pd.NA for missing numeric data | |
| # Add non-binary F1 if binary exists and f1 is missing | |
| if metric == "f1_binary" and f"{test_type}_f1" not in df.columns: | |
| # Check if the binary column has data before copying | |
| if col_name in df.columns: | |
| df[f"{test_type}_f1"] = df[col_name] | |
| else: | |
| df[f"{test_type}_f1"] = pd.NA | |
| # Calculate Integral Score | |
| if not df.empty: | |
| df["integral_score"] = df.apply(calculate_integral_score, axis=1) | |
| # Sort by Integral Score instead of average_f1 | |
| df = df.sort_values(by="integral_score", ascending=False, na_position='last') | |
| else: | |
| # Add the column even if empty | |
| df["integral_score"] = pd.NA | |
| # Ensure summary columns exist | |
| summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"] | |
| for col in summary_cols: | |
| if col not in df.columns: | |
| df[col] = pd.NA | |
| # Remove old average columns if they somehow snuck in | |
| old_avg_cols = ["average_f1", "average_recall", "average_precision"] | |
| for col in old_avg_cols: | |
| if col in df.columns: | |
| df = df.drop(columns=[col]) | |
| # print("--- DataFrame before returning from leaderboard_to_dataframe ---") | |
| # print(df[['model_name', 'macro_accuracy', 'macro_recall', 'total_evals_count']].head()) | |
| # print("-------------------------------------------------------------") | |
| return df | |
| def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: | |
| """ | |
| Add new entries to the leaderboard, replacing any with the same model name. | |
| """ | |
| # Create a mapping of existing entries by model name and version | |
| existing_entries = { | |
| (entry["model_name"], entry.get("version", "v0")): i | |
| for i, entry in enumerate(leaderboard_data.get("entries", [])) | |
| } | |
| # Process each new entry | |
| for new_entry in new_entries: | |
| model_name = new_entry.get("model_name") | |
| version = new_entry.get("version", "v0") | |
| if (model_name, version) in existing_entries: | |
| # Replace existing entry | |
| leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry | |
| else: | |
| # Add new entry | |
| if "entries" not in leaderboard_data: | |
| leaderboard_data["entries"] = [] | |
| leaderboard_data["entries"].append(new_entry) | |
| # Update the last_updated timestamp | |
| leaderboard_data["last_updated"] = datetime.now().isoformat() | |
| return leaderboard_data | |
| def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: | |
| """ | |
| Process a JSONL submission file and extract entries. | |
| """ | |
| entries = [] | |
| try: | |
| with open(file_path, 'r') as f: | |
| for line in f: | |
| try: | |
| entry = json.loads(line) | |
| entries.append(entry) | |
| except json.JSONDecodeError as e: | |
| return [], f"Invalid JSON in submission file: {e}" | |
| if not entries: | |
| return [], "Submission file is empty" | |
| return entries, "Successfully processed submission" | |
| except Exception as e: | |
| return [], f"Error processing submission file: {e}" | |