Spaces:
Sleeping
Sleeping
| """ | |
| Process and transform GuardBench leaderboard data. | |
| """ | |
| import json | |
| import os | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Tuple | |
| from src.display.utils import CATEGORIES, TEST_TYPES, METRICS | |
| def load_leaderboard_data(file_path: str) -> Dict: | |
| """ | |
| Load the leaderboard data from a JSON file. | |
| """ | |
| if not os.path.exists(file_path): | |
| version = "v0" | |
| if "_v" in file_path: | |
| version = file_path.split("_")[-1].split(".")[0] | |
| return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Ensure version field exists | |
| if "version" not in data: | |
| version = "v0" | |
| if "_v" in file_path: | |
| version = file_path.split("_")[-1].split(".")[0] | |
| data["version"] = version | |
| return data | |
| def save_leaderboard_data(data: Dict, file_path: str) -> None: | |
| """ | |
| Save the leaderboard data to a JSON file. | |
| """ | |
| # Ensure the directory exists | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| # Update the last_updated timestamp | |
| data["last_updated"] = datetime.now().isoformat() | |
| # Ensure version is set | |
| if "version" not in data: | |
| version = "v0" | |
| if "_v" in file_path: | |
| version = file_path.split("_")[-1].split(".")[0] | |
| data["version"] = version | |
| with open(file_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| def process_submission(submission_data: List[Dict]) -> List[Dict]: | |
| """ | |
| Process submission data and convert it to leaderboard entries. | |
| """ | |
| entries = [] | |
| for item in submission_data: | |
| # Create a new entry for the leaderboard | |
| entry = { | |
| "model_name": item.get("model_name", "Unknown Model"), | |
| "per_category_metrics": {}, | |
| "avg_metrics": {}, | |
| "submission_date": datetime.now().isoformat(), | |
| "version": item.get("version", "v0") | |
| } | |
| # Copy model metadata | |
| for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: | |
| if key in item: | |
| entry[key] = item[key] | |
| # Process per-category metrics | |
| if "per_category_metrics" in item: | |
| entry["per_category_metrics"] = item["per_category_metrics"] | |
| # Process average metrics | |
| if "avg_metrics" in item: | |
| entry["avg_metrics"] = item["avg_metrics"] | |
| entries.append(entry) | |
| return entries | |
| def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: | |
| """ | |
| Convert leaderboard data to a pandas DataFrame for display. | |
| """ | |
| rows = [] | |
| for entry in leaderboard_data.get("entries", []): | |
| model_name = entry.get("model_name", "Unknown Model") | |
| # Extract average metrics for main display | |
| row = { | |
| "model_name": model_name, | |
| "model_type": entry.get("model_type", "Unknown"), | |
| "submission_date": entry.get("submission_date", ""), | |
| "version": entry.get("version", "v0"), | |
| "guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() | |
| } | |
| # Add additional metadata fields if present | |
| for key in ["base_model", "revision", "precision", "weight_type"]: | |
| if key in entry: | |
| row[key] = entry[key] | |
| # CASE 1: Metrics are flat in the root | |
| for key, value in entry.items(): | |
| if any(test_type in key for test_type in TEST_TYPES) or key in ["average_f1", "average_recall", "average_precision"]: | |
| row[key] = value | |
| # CASE 2: Metrics are in avg_metrics structure | |
| avg_metrics = entry.get("avg_metrics", {}) | |
| if avg_metrics: | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics: | |
| metrics = avg_metrics[test_type] | |
| for metric in METRICS: | |
| if metric in metrics: | |
| col_name = f"{test_type}_{metric}" | |
| row[col_name] = metrics[metric] | |
| # Also add non-binary version for F1 scores | |
| if metric == "f1_binary": | |
| row[f"{test_type}_f1"] = metrics[metric] | |
| # Calculate averages if not present | |
| if "average_f1" not in row: | |
| f1_values = [] | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics and "f1_binary" in avg_metrics[test_type]: | |
| f1_values.append(avg_metrics[test_type]["f1_binary"]) | |
| if f1_values: | |
| row["average_f1"] = sum(f1_values) / len(f1_values) | |
| if "average_recall" not in row: | |
| recall_values = [] | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type]: | |
| recall_values.append(avg_metrics[test_type]["recall_binary"]) | |
| if recall_values: | |
| row["average_recall"] = sum(recall_values) / len(recall_values) | |
| if "average_precision" not in row: | |
| precision_values = [] | |
| for test_type in TEST_TYPES: | |
| if test_type in avg_metrics and "precision_binary" in avg_metrics[test_type]: | |
| precision_values.append(avg_metrics[test_type]["precision_binary"]) | |
| if precision_values: | |
| row["average_precision"] = sum(precision_values) / len(precision_values) | |
| rows.append(row) | |
| # Create DataFrame and sort by average F1 score | |
| df = pd.DataFrame(rows) | |
| # Ensure all expected columns exist | |
| for test_type in TEST_TYPES: | |
| if f"{test_type}_f1" not in df.columns: | |
| df[f"{test_type}_f1"] = None | |
| if f"{test_type}_f1_binary" not in df.columns: | |
| df[f"{test_type}_f1_binary"] = None | |
| if f"{test_type}_recall_binary" not in df.columns: | |
| df[f"{test_type}_recall_binary"] = None | |
| if f"{test_type}_precision_binary" not in df.columns: | |
| df[f"{test_type}_precision_binary"] = None | |
| if not df.empty and "average_f1" in df.columns: | |
| df = df.sort_values(by="average_f1", ascending=False) | |
| return df | |
| def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: | |
| """ | |
| Add new entries to the leaderboard, replacing any with the same model name. | |
| """ | |
| # Create a mapping of existing entries by model name and version | |
| existing_entries = { | |
| (entry["model_name"], entry.get("version", "v0")): i | |
| for i, entry in enumerate(leaderboard_data.get("entries", [])) | |
| } | |
| # Process each new entry | |
| for new_entry in new_entries: | |
| model_name = new_entry.get("model_name") | |
| version = new_entry.get("version", "v0") | |
| if (model_name, version) in existing_entries: | |
| # Replace existing entry | |
| leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry | |
| else: | |
| # Add new entry | |
| if "entries" not in leaderboard_data: | |
| leaderboard_data["entries"] = [] | |
| leaderboard_data["entries"].append(new_entry) | |
| # Update the last_updated timestamp | |
| leaderboard_data["last_updated"] = datetime.now().isoformat() | |
| return leaderboard_data | |
| def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: | |
| """ | |
| Process a JSONL submission file and extract entries. | |
| """ | |
| entries = [] | |
| try: | |
| with open(file_path, 'r') as f: | |
| for line in f: | |
| try: | |
| entry = json.loads(line) | |
| entries.append(entry) | |
| except json.JSONDecodeError as e: | |
| return [], f"Invalid JSON in submission file: {e}" | |
| if not entries: | |
| return [], "Submission file is empty" | |
| return entries, "Successfully processed submission" | |
| except Exception as e: | |
| return [], f"Error processing submission file: {e}" | |