import json import gzip import os import shutil import secrets import gradio as gr from gradio_leaderboard import Leaderboard, ColumnFilter, SelectColumns import pandas as pd import numpy as np from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import snapshot_download from io import StringIO from typing import Dict, List, Optional from dataclasses import dataclass, field from copy import deepcopy from src.about import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, EVALUATION_QUEUE_TEXT_SUBGRAPH, EVALUATION_QUEUE_TEXT_CAUSALVARIABLE, INTRODUCTION_TEXT, LLM_BENCHMARKS_TEXT, TITLE, ) from src.display.css_html_js import custom_css from src.display.utils import ( BENCHMARK_COLS_MIB_SUBGRAPH, COLS, COLS_MIB_SUBGRAPH, COLS_MULTIMODAL, EVAL_COLS, EVAL_TYPES, AutoEvalColumn, AutoEvalColumn_mib_subgraph, AutoEvalColumn_mib_causalgraph, fields, ) from src.envs import API, EVAL_REQUESTS_SUBGRAPH, EVAL_REQUESTS_CAUSALGRAPH, QUEUE_REPO_SUBGRAPH, QUEUE_REPO_CAUSALGRAPH, REPO_ID, TOKEN, RESULTS_REPO_MIB_SUBGRAPH, EVAL_RESULTS_MIB_SUBGRAPH_PATH, RESULTS_REPO_MIB_CAUSALGRAPH, EVAL_RESULTS_MIB_CAUSALGRAPH_PATH from src.populate import get_evaluation_queue_df, get_leaderboard_df_mib_subgraph, get_leaderboard_df_mib_causalgraph from src.submission.submit import upload_to_queue, remove_submission from src.submission.check_validity import verify_circuit_submission, verify_causal_variable_submission, check_rate_limit, parse_huggingface_url from src.about import TasksMib_Subgraph, TasksMib_Causalgraph from gradio_leaderboard import SelectColumns, Leaderboard import pandas as pd from typing import List, Dict, Optional from dataclasses import fields import math class SmartSelectColumns(SelectColumns): """ Enhanced SelectColumns component matching exact original parameters. """ def __init__( self, benchmark_keywords: Optional[List[str]] = None, model_keywords: Optional[List[str]] = None, initial_selected: Optional[List[str]] = None, label: Optional[str] = None, show_label: bool = True, info: Optional[str] = None, allow: bool = True ): # Match exact parameters from working SelectColumns super().__init__( default_selection=initial_selected or [], cant_deselect=[], allow=allow, label=label, show_label=show_label, info=info ) self.benchmark_keywords = benchmark_keywords or [] self.model_keywords = model_keywords or [] # Store groups for later use self._groups = {} def get_filtered_groups(self, columns: List[str]) -> Dict[str, List[str]]: """Get column groups based on keywords.""" filtered_groups = {} # Add benchmark groups for benchmark in self.benchmark_keywords: matching_cols = [ col for col in columns if benchmark in col.lower() ] if matching_cols: filtered_groups[f"Benchmark group for {benchmark}"] = matching_cols # Add model groups for model in self.model_keywords: matching_cols = [ col for col in columns if model in col.lower() ] if matching_cols: filtered_groups[f"Model group for {model}"] = matching_cols self._groups = filtered_groups return filtered_groups import re @dataclass class SubstringSelectColumns(SelectColumns): """ Extends SelectColumns to support filtering columns by predefined substrings. When a substring is selected, all columns containing that substring will be selected. """ substring_groups: Dict[str, List[str]] = field(default_factory=dict) selected_substrings: List[str] = field(default_factory=list) def __post_init__(self): # Ensure default_selection is a list if self.default_selection is None: self.default_selection = [] # Build reverse mapping of column to substrings self.column_to_substrings = {} for substring, patterns in self.substring_groups.items(): for pattern in patterns: # Convert glob-style patterns to regex regex = re.compile(pattern.replace('*', '.*')) # Find matching columns in default_selection for col in self.default_selection: if regex.search(col): if col not in self.column_to_substrings: self.column_to_substrings[col] = [] self.column_to_substrings[col].append(substring) # Apply initial substring selections if self.selected_substrings: self.update_selection_from_substrings() def update_selection_from_substrings(self) -> List[str]: """ Updates the column selection based on selected substrings. Returns the new list of selected columns. """ selected_columns = self.cant_deselect.copy() # If no substrings selected, show all columns if not self.selected_substrings: selected_columns.extend([ col for col in self.default_selection if col not in self.cant_deselect ]) return selected_columns # Add columns that match any selected substring for col, substrings in self.column_to_substrings.items(): if any(s in self.selected_substrings for s in substrings): if col not in selected_columns: selected_columns.append(col) return selected_columns def restart_space(): API.restart_space(repo_id=REPO_ID) ### Space initialisation - refresh caches try: if os.path.exists(EVAL_REQUESTS_SUBGRAPH): shutil.rmtree(EVAL_REQUESTS_SUBGRAPH) snapshot_download( repo_id=QUEUE_REPO_SUBGRAPH, local_dir=EVAL_REQUESTS_SUBGRAPH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: if os.path.exists(EVAL_REQUESTS_CAUSALGRAPH): shutil.rmtree(EVAL_REQUESTS_CAUSALGRAPH) snapshot_download( repo_id=QUEUE_REPO_CAUSALGRAPH, local_dir=EVAL_REQUESTS_CAUSALGRAPH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: if os.path.exists(EVAL_RESULTS_MIB_SUBGRAPH_PATH): shutil.rmtree(EVAL_RESULTS_MIB_SUBGRAPH_PATH) snapshot_download( repo_id=RESULTS_REPO_MIB_SUBGRAPH, local_dir=EVAL_RESULTS_MIB_SUBGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: if os.path.exists(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH): shutil.rmtree(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH) snapshot_download( repo_id=RESULTS_REPO_MIB_CAUSALGRAPH, local_dir=EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() def _sigmoid(x): try: return 1 / (1 + math.exp(-2 * (x-1))) except: return "-" LEADERBOARD_DF_MIB_SUBGRAPH_FPL = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH) LEADERBOARD_DF_MIB_SUBGRAPH_FEQ = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH, metric_type="CMD") # In app.py, modify the LEADERBOARD initialization LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED, LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED = get_leaderboard_df_mib_causalgraph( EVAL_RESULTS_MIB_CAUSALGRAPH_PATH ) ( finished_eval_queue_df_subgraph, pending_eval_queue_df_subgraph, ) = get_evaluation_queue_df(EVAL_REQUESTS_SUBGRAPH, EVAL_COLS, "Circuit") ( finished_eval_queue_df_causalvariable, pending_eval_queue_df_causalvariable, ) = get_evaluation_queue_df(EVAL_REQUESTS_CAUSALGRAPH, EVAL_COLS, "Causal Variable") finished_eval_queue = pd.concat((finished_eval_queue_df_subgraph, finished_eval_queue_df_causalvariable)) pending_eval_queue = pd.concat((pending_eval_queue_df_subgraph, pending_eval_queue_df_causalvariable)) def init_leaderboard_mib_subgraph(dataframe, track): """Initialize the subgraph leaderboard with display names for better readability.""" if dataframe is None or dataframe.empty: raise ValueError("Leaderboard DataFrame is empty or None.") print("\nDebugging DataFrame columns:", dataframe.columns.tolist()) model_name_mapping = { "qwen2_5": "Qwen-2.5", "gpt2": "GPT-2", "gemma2": "Gemma-2", "llama3": "Llama-3.1" } benchmark_mapping = { "ioi": "IOI", "mcqa": "MCQA", "arithmetic_addition": "Arithmetic (+)", "arithmetic_subtraction": "Arithmetic (-)", "arc_easy": "ARC (Easy)", "arc_challenge": "ARC (Challenge)" } display_mapping = {} for task in TasksMib_Subgraph: for model in task.value.models: field_name = f"{task.value.benchmark}_{model}" display_name = f"{benchmark_mapping[task.value.benchmark]} - {model_name_mapping[model]}" display_mapping[field_name] = display_name # Now when creating benchmark groups, we'll use display names benchmark_groups = [] for task in TasksMib_Subgraph: benchmark = task.value.benchmark benchmark_cols = [ display_mapping[f"{benchmark}_{model}"] # Use display name from our mapping for model in task.value.models if f"{benchmark}_{model}" in dataframe.columns ] if benchmark_cols: benchmark_groups.append(benchmark_cols) print(f"\nBenchmark group for {benchmark}:", benchmark_cols) # Similarly for model groups model_groups = [] all_models = list(set(model for task in TasksMib_Subgraph for model in task.value.models)) for model in all_models: model_cols = [ display_mapping[f"{task.value.benchmark}_{model}"] # Use display name for task in TasksMib_Subgraph if model in task.value.models and f"{task.value.benchmark}_{model}" in dataframe.columns ] if model_cols: model_groups.append(model_cols) print(f"\nModel group for {model}:", model_cols) # Combine all groups using display names all_groups = benchmark_groups + model_groups all_columns = [col for group in all_groups for col in group] renamed_df = dataframe.rename(columns=display_mapping) all_columns = renamed_df.columns.tolist() # Original code return Leaderboard( value=renamed_df, # Use DataFrame with display names datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)], search_columns=["Method"], hide_columns=["eval_name"], interactive=False, ), renamed_df def init_leaderboard_mib_causalgraph(dataframe, track): model_name_mapping = { "Qwen2ForCausalLM": "Qwen-2.5", "GPT2ForCausalLM": "GPT-2", "GPT2LMHeadModel": "GPT-2", "Gemma2ForCausalLM": "Gemma-2", "LlamaForCausalLM": "Llama-3.1" } benchmark_mapping = { "ioi_task": "IOI", "4_answer_MCQA": "MCQA", "arithmetic_addition": "Arithmetic (+)", "arithmetic_subtraction": "Arithmetic (-)", "ARC_easy": "ARC (Easy)", "RAVEL": "RAVEL" } target_variables_mapping = { "output_token": "Output Token", "output_position": "Output Position", "answer_pointer": "Answer Pointer", "answer": "Answer", "Continent": "Continent", "Language": "Language", "Country": "Country", "Language": "Language" } display_mapping = {} for task in TasksMib_Causalgraph: for model in task.value.models: for target_variables in task.value.target_variables: field_name = f"{model}_{task.value.col_name}_{target_variables}" display_name = f"{benchmark_mapping[task.value.col_name]} - {model_name_mapping[model]} - {target_variables_mapping[target_variables]}" display_mapping[field_name] = display_name renamed_df = dataframe.rename(columns=display_mapping) # Create only necessary columns return Leaderboard( value=renamed_df, datatype=[c.type for c in fields(AutoEvalColumn_mib_causalgraph)], search_columns=["Method"], hide_columns=["eval_name"], bool_checkboxgroup_label="Hide models", interactive=False, ), renamed_df def init_leaderboard(dataframe, track): if dataframe is None or dataframe.empty: raise ValueError("Leaderboard DataFrame is empty or None.") # filter for correct track dataframe = dataframe.loc[dataframe["Track"] == track] return Leaderboard( value=dataframe, datatype=[c.type for c in fields(AutoEvalColumn)], select_columns=SelectColumns( default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default], cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden], label="Select Columns to Display:", ), search_columns=[AutoEvalColumn.model.name], hide_columns=[c.name for c in fields(AutoEvalColumn) if c.hidden], bool_checkboxgroup_label="Hide models", interactive=False, ) def process_json(temp_file): if temp_file is None: return {} # Handle file upload try: file_path = temp_file.name if file_path.endswith('.gz'): with gzip.open(file_path, 'rt') as f: data = json.load(f) else: with open(file_path, 'r') as f: data = json.load(f) except Exception as e: raise gr.Error(f"Error processing file: {str(e)}") gr.Markdown("Upload successful!") return data def get_hf_username(hf_repo): hf_repo = hf_repo.rstrip("/") parts = hf_repo.split("/") username = parts[-2] return username # Define the preset substrings for filtering PRESET_SUBSTRINGS = ["IOI", "MCQA", "Arithmetic", "ARC", "GPT-2", "Qwen-2.5", "Gemma-2", "Llama-3.1"] TASK_SUBSTRINGS = ["IOI", "MCQA", "Arithmetic", "ARC"] TASK_CAUSAL_SUBSTRINGS = ["IOI", "MCQA", "ARC (Easy)", "RAVEL"] MODEL_SUBSTRINGS = ["GPT-2", "Qwen-2.5", "Gemma-2", "Llama-3.1"] def filter_columns_by_substrings(dataframe: pd.DataFrame, selected_task_substrings: List[str], selected_model_substrings: List[str]) -> pd.DataFrame: """ Filter columns based on the selected substrings. """ original_dataframe = deepcopy(dataframe) if not selected_task_substrings and not selected_model_substrings: return dataframe # No filtering if no substrings are selected if not selected_task_substrings: # Filter columns that contain any of the selected model substrings filtered_columns = [ col for col in dataframe.columns if any(sub.lower() in col.lower() for sub in selected_model_substrings) or col == "Method" ] return dataframe[filtered_columns] elif not selected_model_substrings: # Filter columns that contain any of the selected task substrings filtered_columns = [ col for col in dataframe.columns if any(sub.lower() in col.lower() for sub in selected_task_substrings) or col == "Method" ] return dataframe[filtered_columns] # Filter columns by task first. Use AND logic to combine with model filtering filtered_columns = [ col for col in dataframe.columns if any(sub.lower() in col.lower() for sub in selected_task_substrings) or col == "Method" ] filtered_columns = [ col for col in dataframe[filtered_columns].columns if any(sub.lower() in col.lower() for sub in selected_model_substrings) or col == "Method" ] return dataframe[filtered_columns] def update_leaderboard(dataframe: pd.DataFrame, selected_task_substrings: List[str], selected_model_substrings: List[str], ascending: bool = False): """ Update the leaderboard based on the selected substrings. """ filtered_dataframe = filter_columns_by_substrings(dataframe, selected_task_substrings, selected_model_substrings) if len(selected_task_substrings) >= 2 or len(selected_task_substrings) == 0: if len(selected_model_substrings) >= 2 or len(selected_model_substrings) == 0: show_average = True else: show_average = False else: show_average = False def _transform_floats(df): df_transformed = df.copy() # Apply transformation row by row for i, row in df_transformed.iterrows(): # Apply sigmoid only to numeric values in the row df_transformed.loc[i] = row.apply(lambda x: _sigmoid(x) if isinstance(x, (float, int)) else x) return df_transformed if show_average: # Replace "-" with NaN for calculation, then use skipna=False to get NaN when any value is missing numeric_data = filtered_dataframe.replace("-", np.nan) means = numeric_data.mean(axis=1, skipna=False) # Apply the same transformation for computing scores s_filtered_dataframe = _transform_floats(filtered_dataframe) s_numeric_data = s_filtered_dataframe.replace("-", np.nan) s_means = s_numeric_data.mean(axis=1, skipna=False) # Set Average and Score columns # Keep numeric values as NaN for proper sorting, convert to "-" only for display if needed filtered_dataframe.loc[:, "Average"] = means.round(2) filtered_dataframe.loc[:, "Score"] = s_means.round(2) # Sort by Average with NaN values at the end filtered_dataframe = filtered_dataframe.sort_values(by=["Average"], ascending=ascending, na_position='last') # After sorting, convert NaN back to "-" for display filtered_dataframe.loc[:, "Average"] = filtered_dataframe["Average"].fillna("-") filtered_dataframe.loc[:, "Score"] = filtered_dataframe["Score"].fillna("-") return filtered_dataframe def process_url(url): # Add your URL processing logic here return f"You entered the URL: {url}" demo = gr.Blocks(css=custom_css) with demo: gr.HTML(TITLE) gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") with gr.Tabs(elem_classes="tab-buttons") as tabs: with gr.TabItem("Circuit Localization", elem_id="subgraph", id=0): with gr.Tabs() as subgraph_tabs: with gr.TabItem("CPR", id=0): # Add description for filters gr.Markdown(""" ### Filtering Options Use the dropdown menus below to filter results by specific tasks or models. You can combine filters to see specific task-model combinations. """) # CheckboxGroup for selecting substrings task_substring_checkbox = gr.CheckboxGroup( choices=TASK_SUBSTRINGS, label="View tasks:", value=TASK_SUBSTRINGS, # Default to all substrings selected ) model_substring_checkbox = gr.CheckboxGroup( choices = MODEL_SUBSTRINGS, label = "View models:", value = MODEL_SUBSTRINGS ) leaderboard, data = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH_FPL, "Subgraph") original_leaderboard = gr.State(value=data) ascending = gr.State(value=False) # Update the leaderboard when the user selects/deselects substrings task_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard ) model_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard ) print(f"Leaderboard is {leaderboard}") with gr.TabItem("CMD", id=1): # Add description for filters gr.Markdown(""" ### Filtering Options Use the dropdown menus below to filter results by specific tasks or models. You can combine filters to see specific task-model combinations. """) # CheckboxGroup for selecting substrings task_substring_checkbox = gr.CheckboxGroup( choices=TASK_SUBSTRINGS, label="View tasks:", value=TASK_SUBSTRINGS, # Default to all substrings selected ) model_substring_checkbox = gr.CheckboxGroup( choices = MODEL_SUBSTRINGS, label = "View models:", value = MODEL_SUBSTRINGS ) leaderboard, data = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH_FEQ, "Subgraph") original_leaderboard = gr.State(value=data) ascending = gr.State(value=True) # Update the leaderboard when the user selects/deselects substrings task_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard ) model_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard ) print(f"Leaderboard is {leaderboard}") # Then modify the Causal Graph tab section with gr.TabItem("Causal Variable Localization", elem_id="causalgraph", id=1): with gr.Tabs() as causalgraph_tabs: with gr.TabItem("Highest View", id=0): gr.Markdown(""" ### Filtering Options Use the dropdown menus below to filter results by specific tasks or models. You can combine filters to see specific task-model combinations. """) task_substring_checkbox = gr.CheckboxGroup( choices=TASK_CAUSAL_SUBSTRINGS, label="View tasks:", value=TASK_CAUSAL_SUBSTRINGS, # Default to all substrings selected ) model_substring_checkbox = gr.CheckboxGroup( choices = MODEL_SUBSTRINGS, label = "View models:", value = MODEL_SUBSTRINGS ) leaderboard_aggregated, data = init_leaderboard_mib_causalgraph( LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED, "Causal Graph" ) original_leaderboard = gr.State(value=data) ascending = gr.State(value=False) task_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard_aggregated ) model_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard_aggregated ) with gr.TabItem("Averaged View", id=1): task_substring_checkbox = gr.CheckboxGroup( choices=TASK_CAUSAL_SUBSTRINGS, label="View tasks:", value=TASK_CAUSAL_SUBSTRINGS, # Default to all substrings selected ) model_substring_checkbox = gr.CheckboxGroup( choices = MODEL_SUBSTRINGS, label = "View models:", value = MODEL_SUBSTRINGS ) leaderboard_averaged, data = init_leaderboard_mib_causalgraph( LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED, "Causal Graph" ) original_leaderboard = gr.State(value=data) ascending = gr.State(value=False) task_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard_averaged ) model_substring_checkbox.change( fn=update_leaderboard, inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending], outputs=leaderboard_averaged ) with gr.TabItem("Submit", elem_id="llm-benchmark-tab-table", id=2): # Track selection track = gr.Radio( choices=[ "Circuit Localization Track", "Causal Variable Localization Track" ], label="Select Competition Track", elem_id="track_selector" ) with gr.Column(visible=False, elem_id="bordered-column") as circuit_ui: with gr.Row(): gr.Markdown(EVALUATION_QUEUE_TEXT_SUBGRAPH, elem_classes="markdown-text") with gr.Row(): hf_repo_circ = gr.Textbox( label="HuggingFace Repository URL", placeholder="https://huggingface.co/username/repo/path", info="Must be a valid HuggingFace URL pointing to folders containing either 1 importance score file per task/model, or " \ "9 circuit files per task/model (.json or .pt)." ) level = gr.Radio( choices=[ "Edge", "Node (submodule)", "Node (neuron)" ], label="Level of granularity", info="Is your circuit defined by its inclusion/exclusion of certain edges (e.g., MLP1 to H10L12), of certain submodules (e.g., MLP1), or of neurons " \ "within those submodules (e.g., MLP1 neuron 295)?" ) with gr.Column(visible=False, elem_id="bordered-column") as causal_ui: gr.Markdown(EVALUATION_QUEUE_TEXT_CAUSALVARIABLE, elem_classes="markdown-text") with gr.Row(): hf_repo_cg = gr.Textbox( label="HuggingFace Repository URL", placeholder="https://huggingface.co/username/repo/path", info="Must be a valid HuggingFace URL pointing to a file containing the trained featurizer (.pt). " ) # Common fields with gr.Group(): gr.Markdown("### Submission Information") method_name = gr.Textbox(label="Method Name") description = gr.Textbox(label="One-line Description") contact_email = gr.Textbox(label="Contact Email") # Dynamic UI logic def toggle_ui(track): circuit = track == "Circuit Localization Track" causal = not circuit return { circuit_ui: gr.Group(visible=circuit), causal_ui: gr.Group(visible=causal) } track.change(toggle_ui, track, [circuit_ui, causal_ui]) # Submission handling status = gr.Textbox(label="Submission Status", visible=False) def handle_submission(track, hf_repo_circ, hf_repo_cg, level, method_name, description, contact_email): errors = [] warnings = [] breaking_error = False hf_repo = hf_repo_circ if "Circuit" in track else hf_repo_cg # Validate common fields if not method_name.strip(): errors.append("Method name is required") if "@" not in contact_email or "." not in contact_email: errors.append("Valid email address is required") if "Circuit" in track and not level: errors.append("Level of granularity is required") if len(description.strip()) > 150: warnings.append("Description longer than 150 characters and will be truncated.") description = description.strip()[:150] if not description.strip(): errors.append("Description is required") if not hf_repo.startswith("https://huggingface.co/") and not hf_repo.startswith("http://huggingface.co/"): errors.append(f"Invalid HuggingFace URL - must start with https://huggingface.co/") breaking_error = True else: repo_id, subfolder, revision = parse_huggingface_url(hf_repo) if repo_id is None: errors.append("Could not read username or repo name from HF URL") breaking_error = True else: user_name, repo_name = repo_id.split("/") under_rate_limit, time_left = check_rate_limit(track, user_name, contact_email) if not under_rate_limit: errors.append(f"Rate limit exceeded (max 2 submissions per week). Please try again in {time_left}. " \ "(If you're trying again after a failed validation, either remove the previous entry below or try again in about 30 minutes.") breaking_error = True # Track-specific validation if "Circuit" in track and not breaking_error: submission_errors, submission_warnings = verify_circuit_submission(hf_repo, level) elif not breaking_error: submission_errors, submission_warnings = verify_causal_variable_submission(hf_repo) if not breaking_error: errors.extend(submission_errors) warnings.extend(submission_warnings) _id = secrets.token_urlsafe(12) if errors: return [ gr.Textbox("\n".join(f"❌ {e}" for e in errors), visible=True), None, None, gr.Column(visible=False), ] elif warnings: return [ gr.Textbox("Warnings:", visible=True), gr.Markdown("\n\n".join(f"• {w}" for w in warnings)), (track, hf_repo_circ, hf_repo_cg, level, method_name, description, contact_email, _id), gr.Column(visible=True) ] else: return upload_to_queue(track, hf_repo_circ, hf_repo_cg, level, method_name, description, contact_email, _id) # New warning confirmation dialog warning_modal = gr.Column(visible=False, variant="panel") with warning_modal: gr.Markdown("### ⚠️ Submission Warnings") warning_display = gr.Markdown() proceed_btn = gr.Button("Proceed Anyway", variant="secondary") cancel_btn = gr.Button("Cancel Submission", variant="primary") # Store submission data between callbacks pending_submission = gr.State() submit_btn = gr.Button("Submit Entry", variant="primary") submit_btn.click( handle_submission, inputs=[track, hf_repo_circ, hf_repo_cg, level, method_name, description, contact_email], outputs=[status, warning_display, pending_submission, warning_modal] ) proceed_btn.click( lambda x: upload_to_queue(*x), inputs=pending_submission, outputs=[status, warning_display, pending_submission, warning_modal] ) cancel_btn.click( lambda: [gr.Textbox("Submission canceled.", visible=True), None, None, gr.Column(visible=False)], outputs=[status, warning_display, pending_submission, warning_modal] ) with gr.Column(): with gr.Accordion( f"✅ Finished Evaluations ({len(finished_eval_queue)})", open=False, ): with gr.Row(): finished_eval_table = gr.components.Dataframe( value=finished_eval_queue, headers=EVAL_COLS, datatype=EVAL_TYPES, row_count=5, ) with gr.Accordion( f"⏳ Pending Evaluation Queue ({len(pending_eval_queue)})", open=False, ): with gr.Row(): pending_eval_table = gr.components.Dataframe( value=pending_eval_queue, headers=EVAL_COLS, datatype=EVAL_TYPES, row_count=5, ) with gr.Group(): gr.Markdown("### Remove Submission from Queue") with gr.Row(): name_r = gr.Textbox(label="Method Name") _id_r = gr.Textbox(label = "Submission ID") status_r = gr.Textbox(label="Removal Status", visible=False) remove_button = gr.Button("Remove Entry") remove_button.click( remove_submission, inputs=[track, name_r, _id_r], outputs=[status_r] ) # Add info about rate limits gr.Markdown(""" ### Submission Policy - UPDATE (Oct. 12, 2025): Due to changes to HF storage limits, the maximum file size is now 50MB. If you're running into unexpected errors validating your Llama or Gemma artifacts, this may be why! For circuits, .pt files are *strongly* preferred over .json. For causal variable track artifacts, if you cannot reduce your filesize, please contact us on the MIB Discord. - Maximum 2 valid submissions per HuggingFace account per week - Invalid submissions don't count toward your limit - Rate limit tracked on a rolling basis: a submission no longer counts toward the limit as soon as 7 days have passed since the submission time - The queues can take up to an hour to update; don't fret if your submission doesn't show up immediately! """) with gr.Row(): with gr.Accordion("📙 Citation", open=False): citation_button = gr.Textbox( value=CITATION_BUTTON_TEXT, label=CITATION_BUTTON_LABEL, lines=10, elem_id="citation-button", show_copy_button=True, ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=1800) scheduler.start() demo.queue(default_concurrency_limit=40).launch(share=True, ssr_mode=False)