Spaces:
Sleeping
Sleeping
| """ | |
| Model evaluation queue system for Dynamic Highscores. | |
| This module handles the evaluation queue, CPU-only processing, | |
| and enforces daily submission limits for users. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import threading | |
| import queue as queue_module | |
| from datetime import datetime, timedelta | |
| import gradio as gr | |
| from huggingface_hub import HfApi, hf_hub_download, snapshot_download | |
| from datasets import load_dataset | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| import sqlite3 | |
| class EvaluationQueue: | |
| """Manages the evaluation queue for model benchmarking.""" | |
| def __init__(self, db_manager, auth_manager): | |
| """Initialize the evaluation queue manager. | |
| Args: | |
| db_manager: Database manager instance | |
| auth_manager: Authentication manager instance | |
| """ | |
| self.db_manager = db_manager | |
| self.auth_manager = auth_manager | |
| self.hf_api = HfApi() | |
| self.queue = queue_module.Queue() | |
| self.is_processing = False | |
| self.worker_thread = None | |
| self.model_tags = ["Merge", "Agent", "Reasoning", "Coding", "General", "Specialized", "Instruction", "Chat"] | |
| self.current_evaluation = None | |
| self.progress = 0 | |
| self.progress_lock = threading.Lock() | |
| # Memory limit for models in GB (leave 2GB for system) | |
| self.memory_limit_gb = 14.0 | |
| def start_worker(self): | |
| """Start the worker thread for processing the evaluation queue.""" | |
| if self.worker_thread is None or not self.worker_thread.is_alive(): | |
| self.is_processing = True | |
| self.worker_thread = threading.Thread(target=self._process_queue) | |
| self.worker_thread.daemon = True | |
| self.worker_thread.start() | |
| def stop_worker(self): | |
| """Stop the worker thread.""" | |
| self.is_processing = False | |
| if self.worker_thread and self.worker_thread.is_alive(): | |
| self.worker_thread.join(timeout=1.0) | |
| def check_model_size(self, model_id): | |
| """Check if a model will fit within RAM limitations. | |
| Args: | |
| model_id: HuggingFace model ID | |
| Returns: | |
| tuple: (will_fit, message) | |
| """ | |
| try: | |
| # Query model info from the HuggingFace API | |
| model_info_obj = self.hf_api.model_info(model_id) | |
| # Initialize total size | |
| total_size_gb = 0 | |
| # Try different approaches to get model size based on API response structure | |
| if hasattr(model_info_obj, 'safetensors') and model_info_obj.safetensors: | |
| # New API format with safetensors dict | |
| for file_info in model_info_obj.safetensors.values(): | |
| if hasattr(file_info, 'size'): | |
| total_size_gb += file_info.size / (1024 * 1024 * 1024) | |
| elif isinstance(file_info, dict) and 'size' in file_info: | |
| total_size_gb += file_info['size'] / (1024 * 1024 * 1024) | |
| # Fallback to siblings method | |
| if total_size_gb == 0 and hasattr(model_info_obj, 'siblings'): | |
| for sibling in model_info_obj.siblings: | |
| if hasattr(sibling, 'size'): | |
| if sibling.rfilename.endswith(('.bin', '.safetensors', '.pt')): | |
| total_size_gb += sibling.size / (1024 * 1024 * 1024) | |
| elif isinstance(sibling, dict) and 'size' in sibling: | |
| if sibling.get('rfilename', '').endswith(('.bin', '.safetensors', '.pt')): | |
| total_size_gb += sibling['size'] / (1024 * 1024 * 1024) | |
| # If we still couldn't determine size, try a reasonable guess based on model name | |
| if total_size_gb == 0: | |
| # Try to guess from model name (e.g., if it has "7b" in the name) | |
| model_name = model_id.lower() | |
| size_indicators = { | |
| "1b": 1, "2b": 2, "3b": 3, "5b": 5, "7b": 7, "8b": 8, | |
| "10b": 10, "13b": 13, "20b": 20, "30b": 30, "65b": 65, "70b": 70 | |
| } | |
| for indicator, size in size_indicators.items(): | |
| if indicator in model_name.replace("-", "").replace("_", ""): | |
| total_size_gb = size * 2 # Rough estimate: param count × 2 for size in GB | |
| break | |
| # If we still couldn't determine size, use a default | |
| if total_size_gb == 0: | |
| # Try direct API method | |
| try: | |
| print(f"Checking model size with direct method for {model_id}") | |
| # Print out the entire structure for debugging | |
| print(f"Model info: {model_info_obj.__dict__}") | |
| # Default to a conservative estimate | |
| total_size_gb = 5 # Assume a 5GB model as default | |
| except Exception as e: | |
| print(f"Direct size check failed: {e}") | |
| return True, "Unable to determine model size accurately, but allowing submission with caution" | |
| # Account for memory overhead | |
| estimated_ram_needed = total_size_gb * 1.3 # 30% overhead | |
| # Check against limit | |
| if estimated_ram_needed > self.memory_limit_gb: | |
| return False, f"Model is too large (approximately {total_size_gb:.1f}GB, needs {estimated_ram_needed:.1f}GB RAM). Maximum allowed is {self.memory_limit_gb}GB." | |
| return True, f"Model size check passed ({total_size_gb:.1f}GB, estimated {estimated_ram_needed:.1f}GB RAM usage)" | |
| except Exception as e: | |
| print(f"Model size check error: {e}") | |
| # Log more details for debugging | |
| import traceback | |
| traceback.print_exc() | |
| # Allow submission with warning | |
| return True, f"Warning: Could not verify model size ({str(e)}). Please ensure your model is under {self.memory_limit_gb}GB." | |
| def _process_queue(self): | |
| """Process the evaluation queue in a separate thread.""" | |
| while self.is_processing: | |
| try: | |
| # Get the next evaluation from the database | |
| pending_evals = self.db_manager.get_evaluation_results(status="pending") | |
| if pending_evals: | |
| # Sort by priority and added_at | |
| next_eval = pending_evals[0] | |
| # Update status to running | |
| self.db_manager.update_evaluation_status(next_eval['id'], 'running') | |
| # Set current evaluation and reset progress | |
| with self.progress_lock: | |
| self.current_evaluation = next_eval | |
| self.progress = 0 | |
| try: | |
| # Get model and benchmark details | |
| model_info = self.db_manager.get_model(next_eval['model_id']) | |
| benchmark_info = self.db_manager.get_benchmark(next_eval['benchmark_id']) | |
| if model_info and benchmark_info: | |
| # Check if model will fit in memory | |
| will_fit, message = self.check_model_size(model_info['hf_model_id']) | |
| if not will_fit: | |
| raise Exception(f"Model too large for evaluation: {message}") | |
| # Run the evaluation | |
| results = self._run_evaluation( | |
| model_info['hf_model_id'], | |
| benchmark_info['dataset_id'] | |
| ) | |
| # Calculate overall score | |
| score = self._calculate_overall_score(results) | |
| # Update status to completed with results | |
| self.db_manager.update_evaluation_status( | |
| next_eval['id'], | |
| 'completed', | |
| results=results, | |
| score=score | |
| ) | |
| else: | |
| raise Exception("Model or benchmark not found") | |
| except Exception as e: | |
| print(f"Evaluation error: {e}") | |
| # Update status to failed with error message | |
| error_results = {"error": str(e)} | |
| self.db_manager.update_evaluation_status( | |
| next_eval['id'], | |
| 'failed', | |
| results=error_results | |
| ) | |
| # Clear current evaluation | |
| with self.progress_lock: | |
| self.current_evaluation = None | |
| self.progress = 0 | |
| else: | |
| # No evaluations in queue, sleep for a bit | |
| time.sleep(5) | |
| except Exception as e: | |
| print(f"Queue processing error: {e}") | |
| time.sleep(5) | |
| def _run_evaluation(self, model_id, dataset_id): | |
| """Run an evaluation for a model on a benchmark. | |
| Args: | |
| model_id: HuggingFace model ID | |
| dataset_id: HuggingFace dataset ID (with optional config) | |
| Returns: | |
| dict: Evaluation results | |
| """ | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 5 # Starting evaluation | |
| # Parse dataset ID and config | |
| if ":" in dataset_id: | |
| dataset_id, config = dataset_id.split(":", 1) | |
| else: | |
| config = None | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 10 # Loading dataset | |
| # Load the dataset | |
| try: | |
| if config: | |
| dataset = load_dataset(dataset_id, config, split="test") | |
| else: | |
| dataset = load_dataset(dataset_id, split="test") | |
| except Exception as e: | |
| return {"error": f"Failed to load dataset: {str(e)}"} | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 20 # Loading model | |
| try: | |
| # Load the model with memory optimization settings | |
| device = "cpu" | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| device_map=device, | |
| torch_dtype=torch.float32, # Use float32 for CPU | |
| low_cpu_mem_usage=True, # Enable memory optimization | |
| offload_folder="offload", # Enable offloading if needed | |
| offload_state_dict=True, # Offload state dict for memory saving | |
| max_memory={0: f"{self.memory_limit_gb}GB"} # Limit memory usage | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| except Exception as e: | |
| print(f"Model loading error: {e}") | |
| return {"error": f"Failed to load model: {str(e)}"} | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 30 # Determining task type | |
| # Determine task type based on dataset features | |
| task_type = self._determine_task_type(dataset) | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 40 # Starting evaluation | |
| try: | |
| # Run appropriate evaluation based on task type | |
| if task_type == "text-generation": | |
| results = self._evaluate_text_generation(model, tokenizer, dataset) | |
| elif task_type == "question-answering": | |
| results = self._evaluate_question_answering(model, tokenizer, dataset) | |
| elif task_type == "classification": | |
| results = self._evaluate_classification(model, tokenizer, dataset) | |
| elif task_type == "code-generation": | |
| results = self._evaluate_code_generation(model, tokenizer, dataset) | |
| else: | |
| # Default to general evaluation | |
| results = self._evaluate_general(model, tokenizer, dataset) | |
| except Exception as e: | |
| print(f"Evaluation task error: {e}") | |
| return {"error": f"Evaluation failed: {str(e)}"} | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 95 # Cleaning up | |
| # Clean up to free memory | |
| del model | |
| del tokenizer | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # Update progress | |
| with self.progress_lock: | |
| self.progress = 100 # Completed | |
| return results | |
| def get_current_progress(self): | |
| """Get the current evaluation progress. | |
| Returns: | |
| tuple: (current_evaluation, progress_percentage) | |
| """ | |
| with self.progress_lock: | |
| return self.current_evaluation, self.progress | |
| def _determine_task_type(self, dataset): | |
| """Determine the task type based on dataset features. | |
| Args: | |
| dataset: HuggingFace dataset | |
| Returns: | |
| str: Task type | |
| """ | |
| features = dataset.features | |
| # Check for common feature patterns | |
| if "question" in features and "answer" in features: | |
| return "question-answering" | |
| elif "code" in features or "solution" in features: | |
| return "code-generation" | |
| elif "label" in features or "class" in features: | |
| return "classification" | |
| elif "input" in features and "output" in features: | |
| return "text-generation" | |
| else: | |
| return "general" | |
| def _evaluate_text_generation(self, model, tokenizer, dataset): | |
| """Evaluate a model on text generation tasks. | |
| Args: | |
| model: HuggingFace model | |
| tokenizer: HuggingFace tokenizer | |
| dataset: HuggingFace dataset | |
| Returns: | |
| dict: Evaluation results | |
| """ | |
| # Set up generation pipeline | |
| generator = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device="cpu" | |
| ) | |
| # Sample a subset for evaluation (to keep runtime reasonable) | |
| if len(dataset) > 100: | |
| dataset = dataset.select(range(100)) | |
| # Track metrics | |
| correct = 0 | |
| total = 0 | |
| generated_texts = [] | |
| # Process each example | |
| for i, example in enumerate(dataset): | |
| # Update progress based on completion percentage | |
| with self.progress_lock: | |
| self.progress = 40 + int((i / len(dataset)) * 50) | |
| input_text = example.get("input", example.get("prompt", "")) | |
| expected_output = example.get("output", example.get("target", "")) | |
| if not input_text or not expected_output: | |
| continue | |
| # Generate text | |
| generated = generator( | |
| input_text, | |
| max_length=100, | |
| num_return_sequences=1 | |
| ) | |
| generated_text = generated[0]["generated_text"] | |
| generated_texts.append(generated_text) | |
| # Simple exact match check | |
| if expected_output.strip() in generated_text: | |
| correct += 1 | |
| total += 1 | |
| # Calculate metrics | |
| accuracy = correct / total if total > 0 else 0 | |
| return { | |
| "accuracy": accuracy, | |
| "samples_evaluated": total, | |
| "generated_samples": generated_texts[:5] # Include a few samples | |
| } | |
| def _evaluate_question_answering(self, model, tokenizer, dataset): | |
| """Evaluate a model on question answering tasks. | |
| Args: | |
| model: HuggingFace model | |
| tokenizer: HuggingFace tokenizer | |
| dataset: HuggingFace dataset | |
| Returns: | |
| dict: Evaluation results | |
| """ | |
| # Set up QA pipeline | |
| qa_pipeline = pipeline( | |
| "question-answering", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device="cpu" | |
| ) | |
| # Sample a subset for evaluation | |
| if len(dataset) > 100: | |
| dataset = dataset.select(range(100)) | |
| # Track metrics | |
| exact_matches = 0 | |
| f1_scores = [] | |
| total = 0 | |
| # Process each example | |
| for i, example in enumerate(dataset): | |
| # Update progress based on completion percentage | |
| with self.progress_lock: | |
| self.progress = 40 + int((i / len(dataset)) * 50) | |
| question = example.get("question", "") | |
| context = example.get("context", "") | |
| answer = example.get("answer", "") | |
| if not question or not answer: | |
| continue | |
| # Get model prediction | |
| if context: | |
| result = qa_pipeline(question=question, context=context) | |
| else: | |
| # If no context provided, use the question as context | |
| result = qa_pipeline(question=question, context=question) | |
| predicted_answer = result["answer"] | |
| # Calculate exact match | |
| if predicted_answer.strip() == answer.strip(): | |
| exact_matches += 1 | |
| # Calculate F1 score | |
| f1 = self._calculate_f1(answer, predicted_answer) | |
| f1_scores.append(f1) | |
| total += 1 | |
| # Calculate metrics | |
| exact_match_accuracy = exact_matches / total if total > 0 else 0 | |
| avg_f1 = sum(f1_scores) / len(f1_scores) if f1_scores else 0 | |
| return { | |
| "exact_match": exact_match_accuracy, | |
| "f1": avg_f1, | |
| "samples_evaluated": total | |
| } | |
| def _evaluate_classification(self, model, tokenizer, dataset): | |
| """Evaluate a model on classification tasks. | |
| Args: | |
| model: HuggingFace model | |
| tokenizer: HuggingFace tokenizer | |
| dataset: HuggingFace dataset | |
| Returns: | |
| dict: Evaluation results | |
| """ | |
| # Set up classification pipeline | |
| classifier = pipeline( | |
| "text-classification", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device="cpu" | |
| ) | |
| # Sample a subset for evaluation | |
| if len(dataset) > 100: | |
| dataset = dataset.select(range(100)) | |
| # Track metrics | |
| correct = 0 | |
| total = 0 | |
| # Process each example | |
| for i, example in enumerate(dataset): | |
| # Update progress based on completion percentage | |
| with self.progress_lock: | |
| self.progress = 40 + int((i / len(dataset)) * 50) | |
| text = example.get("text", example.get("sentence", "")) | |
| label = str(example.get("label", example.get("class", ""))) | |
| if not text or not label: | |
| continue | |
| # Get model prediction | |
| result = classifier(text) | |
| predicted_label = result[0]["label"] | |
| # Check if correct | |
| if str(predicted_label) == label: | |
| correct += 1 | |
| total += 1 | |
| # Calculate metrics | |
| accuracy = correct / total if total > 0 else 0 | |
| return { | |
| "accuracy": accuracy, | |
| "samples_evaluated": total | |
| } | |
| def _evaluate_code_generation(self, model, tokenizer, dataset): | |
| """Evaluate a model on code generation tasks. | |
| Args: | |
| model: HuggingFace model | |
| tokenizer: HuggingFace tokenizer | |
| dataset: HuggingFace dataset | |
| Returns: | |
| dict: Evaluation results | |
| """ | |
| # Set up generation pipeline | |
| generator = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device="cpu" | |
| ) | |
| # Sample a subset for evaluation | |
| if len(dataset) > 50: # Smaller sample for code tasks | |
| dataset = dataset.select(range(50)) | |
| # Track metrics | |
| exact_matches = 0 | |
| functional_matches = 0 | |
| total = 0 | |
| # Process each example | |
| for i, example in enumerate(dataset): | |
| # Update progress based on completion percentage | |
| with self.progress_lock: | |
| self.progress = 40 + int((i / len(dataset)) * 50) | |
| prompt = example.get("prompt", example.get("input", "")) | |
| solution = example.get("solution", example.get("output", "")) | |
| if not prompt or not solution: | |
| continue | |
| # Generate code | |
| generated = generator( | |
| prompt, | |
| max_length=200, | |
| num_return_sequences=1 | |
| ) | |
| generated_code = generated[0]["generated_text"] | |
| # Extract code from generated text (remove prompt) | |
| if prompt in generated_code: | |
| generated_code = generated_code[len(prompt):].strip() | |
| # Check exact match | |
| if generated_code.strip() == solution.strip(): | |
| exact_matches += 1 | |
| functional_matches += 1 | |
| else: | |
| # We would ideally check functional correctness here | |
| # but that requires executing code which is complex and potentially unsafe | |
| # For now, we'll use a simple heuristic | |
| if len(generated_code) > 0 and any(keyword in generated_code for keyword in ["def ", "function", "return", "class"]): | |
| functional_matches += 0.5 # Partial credit | |
| total += 1 | |
| # Calculate metrics | |
| exact_match_rate = exact_matches / total if total > 0 else 0 | |
| functional_correctness = functional_matches / total if total > 0 else 0 | |
| return { | |
| "exact_match": exact_match_rate, | |
| "functional_correctness": functional_correctness, | |
| "samples_evaluated": total | |
| } | |
| def _evaluate_general(self, model, tokenizer, dataset): | |
| """General evaluation for any dataset type. | |
| Args: | |
| model: HuggingFace model | |
| tokenizer: HuggingFace tokenizer | |
| dataset: HuggingFace dataset | |
| Returns: | |
| dict: Evaluation results | |
| """ | |
| # Set up generation pipeline | |
| generator = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device="cpu" | |
| ) | |
| # Sample a subset for evaluation | |
| if len(dataset) > 50: | |
| dataset = dataset.select(range(50)) | |
| # Find input and output fields | |
| features = dataset.features | |
| input_field = None | |
| output_field = None | |
| for field in features: | |
| if field.lower() in ["input", "prompt", "question", "text"]: | |
| input_field = field | |
| elif field.lower() in ["output", "target", "answer", "response"]: | |
| output_field = field | |
| if not input_field: | |
| # Just use the first string field as input | |
| for field in features: | |
| if isinstance(features[field], (str, list)): | |
| input_field = field | |
| break | |
| # Track metrics | |
| total = 0 | |
| generated_texts = [] | |
| # Process each example | |
| for i, example in enumerate(dataset): | |
| # Update progress based on completion percentage | |
| with self.progress_lock: | |
| self.progress = 40 + int((i / len(dataset)) * 50) | |
| if input_field and input_field in example: | |
| input_text = str(example[input_field]) | |
| # Generate text | |
| generated = generator( | |
| input_text, | |
| max_length=100, | |
| num_return_sequences=1 | |
| ) | |
| generated_text = generated[0]["generated_text"] | |
| generated_texts.append({ | |
| "input": input_text, | |
| "output": generated_text, | |
| "expected": str(example[output_field]) if output_field and output_field in example else "N/A" | |
| }) | |
| total += 1 | |
| return { | |
| "samples_evaluated": total, | |
| "generated_samples": generated_texts[:5] # Include a few samples | |
| } | |
| def _calculate_f1(self, answer, prediction): | |
| """Calculate F1 score between answer and prediction. | |
| Args: | |
| answer: Ground truth answer | |
| prediction: Model prediction | |
| Returns: | |
| float: F1 score | |
| """ | |
| # Tokenize | |
| answer_tokens = answer.lower().split() | |
| prediction_tokens = prediction.lower().split() | |
| # Calculate precision and recall | |
| common_tokens = set(answer_tokens) & set(prediction_tokens) | |
| if not common_tokens: | |
| return 0.0 | |
| precision = len(common_tokens) / len(prediction_tokens) | |
| recall = len(common_tokens) / len(answer_tokens) | |
| # Calculate F1 | |
| if precision + recall == 0: | |
| return 0.0 | |
| f1 = 2 * precision * recall / (precision + recall) | |
| return f1 | |
| def _calculate_overall_score(self, results): | |
| """Calculate an overall score from evaluation results. | |
| Args: | |
| results: Evaluation results dictionary | |
| Returns: | |
| float: Overall score between 0 and 100 | |
| """ | |
| # If there was an error, return a low score | |
| if "error" in results: | |
| return 0.0 | |
| score = 0.0 | |
| # Check for common metrics and weight them | |
| if "accuracy" in results: | |
| score += results["accuracy"] * 100 | |
| if "exact_match" in results: | |
| score += results["exact_match"] * 100 | |
| if "f1" in results: | |
| score += results["f1"] * 100 | |
| if "functional_correctness" in results: | |
| score += results["functional_correctness"] * 100 | |
| # If multiple metrics were found, average them | |
| num_metrics = sum(1 for metric in ["accuracy", "exact_match", "f1", "functional_correctness"] if metric in results) | |
| if num_metrics > 0: | |
| score /= num_metrics | |
| else: | |
| # Default score if no metrics available | |
| score = 50.0 | |
| return score | |
| def submit_evaluation(self, model_id, benchmark_id, user_id, priority=0): | |
| """Submit a model for evaluation on a benchmark. | |
| Args: | |
| model_id: Model ID in the database | |
| benchmark_id: Benchmark ID in the database | |
| user_id: User ID submitting the evaluation | |
| priority: Queue priority (higher = higher priority) | |
| Returns: | |
| tuple: (evaluation_id, message) | |
| """ | |
| # Check if user can submit today | |
| if not self.auth_manager.can_submit_benchmark(user_id): | |
| return None, "Daily submission limit reached. Try again tomorrow." | |
| try: | |
| # Get model HuggingFace ID to check size | |
| model_info = self.db_manager.get_model(model_id) | |
| if not model_info: | |
| return None, "Model not found in database." | |
| # Check if model will fit in memory | |
| will_fit, message = self.check_model_size(model_info['hf_model_id']) | |
| if not will_fit: | |
| return None, message | |
| # Add evaluation to database and queue | |
| evaluation_id = self.db_manager.add_evaluation( | |
| model_id=model_id, | |
| benchmark_id=benchmark_id, | |
| priority=priority | |
| ) | |
| # Update user's last submission date | |
| self.auth_manager.update_submission_date(user_id) | |
| # Make sure worker is running | |
| self.start_worker() | |
| return evaluation_id, f"Evaluation submitted successfully. {message}" | |
| except Exception as e: | |
| print(f"Submit evaluation error: {e}") | |
| return None, f"Failed to submit evaluation: {str(e)}" | |
| def get_queue_status(self): | |
| """Get the current status of the evaluation queue. | |
| Returns: | |
| dict: Queue status information | |
| """ | |
| try: | |
| # Get evaluations from database | |
| pending_evals = self.db_manager.get_evaluation_results(status="pending") | |
| running_evals = self.db_manager.get_evaluation_results(status="running") | |
| completed_evals = self.db_manager.get_evaluation_results(status="completed") | |
| failed_evals = self.db_manager.get_evaluation_results(status="failed") | |
| # Get current evaluation progress | |
| current_eval, progress = self.get_current_progress() | |
| return { | |
| "pending": len(pending_evals), | |
| "running": len(running_evals), | |
| "completed": len(completed_evals), | |
| "failed": len(failed_evals), | |
| "is_processing": self.is_processing, | |
| "current_evaluation": current_eval, | |
| "progress": progress, | |
| "memory_limit_gb": self.memory_limit_gb | |
| } | |
| except Exception as e: | |
| print(f"Queue status error: {e}") | |
| return { | |
| "pending": 0, | |
| "running": 0, | |
| "completed": 0, | |
| "failed": 0, | |
| "is_processing": self.is_processing, | |
| "current_evaluation": None, | |
| "progress": 0, | |
| "memory_limit_gb": self.memory_limit_gb, | |
| "error": str(e) | |
| } | |
| # Model submission UI components | |
| def create_model_submission_ui(evaluation_queue, auth_manager, db_manager): | |
| """Create the model submission UI components. | |
| Args: | |
| evaluation_queue: Evaluation queue instance | |
| auth_manager: Authentication manager instance | |
| db_manager: Database manager instance | |
| Returns: | |
| gr.Blocks: Gradio Blocks component with model submission UI | |
| """ | |
| with gr.Blocks() as submission_ui: | |
| # Store user authentication state | |
| user_state = gr.State(None) | |
| # Check authentication on load | |
| def check_auth_on_load(request: gr.Request): | |
| if request: | |
| # Special handling for HF Spaces OAuth | |
| if 'SPACE_ID' in os.environ: | |
| username = request.headers.get("HF-User") | |
| if username: | |
| user = db_manager.get_user_by_username(username) | |
| if user: | |
| print(f"User authenticated via HF Spaces OAuth: {username}") | |
| return user | |
| else: | |
| # Standard token-based auth | |
| user = auth_manager.check_login(request) | |
| if user: | |
| return user | |
| return None | |
| with gr.Tab("Submit Model"): | |
| gr.Markdown(f""" | |
| ### Model Size Restrictions | |
| Models must fit within {evaluation_queue.memory_limit_gb}GB of RAM for evaluation. | |
| Large models will be rejected to ensure all evaluations can complete successfully. | |
| """, elem_classes=["info-text"]) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| model_id_input = gr.Textbox( | |
| placeholder="HuggingFace model ID (e.g., 'gpt2', 'facebook/opt-350m')", | |
| label="Model ID" | |
| ) | |
| check_size_button = gr.Button("Check Model Size") | |
| size_check_result = gr.Markdown("") | |
| model_name_input = gr.Textbox( | |
| placeholder="Display name for your model", | |
| label="Model Name" | |
| ) | |
| model_description_input = gr.Textbox( | |
| placeholder="Brief description of your model", | |
| label="Description", | |
| lines=3 | |
| ) | |
| model_parameters_input = gr.Number( | |
| label="Number of Parameters (billions)", | |
| precision=2 | |
| ) | |
| with gr.Column(scale=1): | |
| model_tag_input = gr.Dropdown( | |
| choices=evaluation_queue.model_tags, | |
| label="Model Tag", | |
| info="Select one category that best describes your model" | |
| ) | |
| # Fixed benchmark dropdown to properly show names | |
| benchmark_dropdown = gr.Dropdown( | |
| label="Benchmark", | |
| info="Select a benchmark to evaluate your model on", | |
| choices=[("none", "Loading benchmarks...")], | |
| value=None | |
| ) | |
| refresh_benchmarks_button = gr.Button("Refresh Benchmarks") | |
| submit_model_button = gr.Button("Submit for Evaluation") | |
| submission_status = gr.Markdown("") | |
| auth_message = gr.Markdown("") | |
| with gr.Tab("Evaluation Queue"): | |
| refresh_queue_button = gr.Button("Refresh Queue") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| queue_stats = gr.JSON( | |
| label="Queue Statistics" | |
| ) | |
| with gr.Column(scale=2): | |
| queue_status = gr.Dataframe( | |
| headers=["ID", "Model", "Benchmark", "Status", "Submitted"], | |
| label="Recent Evaluations" | |
| ) | |
| with gr.Row(visible=True) as progress_container: | |
| with gr.Column(): | |
| current_eval_info = gr.Markdown("No evaluation currently running") | |
| # Use a simple text display for progress instead of Progress component | |
| progress_display = gr.Markdown("Progress: 0%") | |
| # Event handlers | |
| def check_model_size_handler(model_id): | |
| if not model_id: | |
| return "Please enter a HuggingFace model ID." | |
| try: | |
| will_fit, message = evaluation_queue.check_model_size(model_id) | |
| if will_fit: | |
| return f"✅ {message}" | |
| else: | |
| return f"❌ {message}" | |
| except Exception as e: | |
| print(f"Model size check error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error checking model size: {str(e)}" | |
| def refresh_benchmarks_handler(): | |
| benchmarks = db_manager.get_benchmarks() | |
| # Format for dropdown - properly formatted to display names | |
| choices = [] | |
| for b in benchmarks: | |
| # Add as tuple of (id, name) to ensure proper display | |
| choices.append((str(b["id"]), b["name"])) | |
| if not choices: | |
| choices = [("none", "No benchmarks available - add some first")] | |
| return gr.update(choices=choices) | |
| def submit_model_handler(model_id, model_name, model_description, model_parameters, model_tag, benchmark_id, user): | |
| # Check if user is logged in | |
| if not user: | |
| return "Please log in to submit a model." | |
| if not model_id or not model_name or not model_tag or not benchmark_id: | |
| return "Please fill in all required fields." | |
| if benchmark_id == "none": | |
| return "Please select a valid benchmark." | |
| try: | |
| # Check if model will fit in RAM | |
| will_fit, size_message = evaluation_queue.check_model_size(model_id) | |
| if not will_fit: | |
| return f"❌ {size_message}" | |
| # Add model to database | |
| model_db_id = db_manager.add_model( | |
| name=model_name, | |
| hf_model_id=model_id, | |
| user_id=user["id"], | |
| tag=model_tag, | |
| parameters=str(model_parameters) if model_parameters else None, | |
| description=model_description | |
| ) | |
| if not model_db_id: | |
| return "Failed to add model to database." | |
| # Submit for evaluation | |
| eval_id, message = evaluation_queue.submit_evaluation( | |
| model_id=model_db_id, | |
| benchmark_id=benchmark_id, | |
| user_id=user["id"] | |
| ) | |
| if eval_id: | |
| return f"✅ Model submitted successfully. {size_message}\nEvaluation ID: {eval_id}" | |
| else: | |
| return message | |
| except Exception as e: | |
| print(f"Error submitting model: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error submitting model: {str(e)}" | |
| def refresh_queue_handler(): | |
| # Get queue statistics | |
| stats = evaluation_queue.get_queue_status() | |
| # Get recent evaluations (all statuses, limited to 20) | |
| evals = db_manager.get_evaluation_results(limit=20) | |
| # Format for dataframe | |
| eval_data = [] | |
| for eval in evals: | |
| eval_data.append([ | |
| eval["id"], | |
| eval["model_name"], | |
| eval["benchmark_name"], | |
| eval["status"], | |
| eval["submitted_at"] | |
| ]) | |
| # Also update progress display | |
| current_eval, progress = evaluation_queue.get_current_progress() | |
| if current_eval: | |
| model_info = db_manager.get_model(current_eval['model_id']) | |
| benchmark_info = db_manager.get_benchmark(current_eval['benchmark_id']) | |
| if model_info and benchmark_info: | |
| eval_info = f"**Currently Evaluating:** {model_info['name']} on {benchmark_info['name']}" | |
| progress_text = f"Progress: {progress}%" | |
| return stats, eval_data, eval_info, progress_text | |
| return stats, eval_data, "No evaluation currently running", "Progress: 0%" | |
| # Update authentication status | |
| def update_auth_message(user): | |
| if user: | |
| return f"Logged in as {user['username']}" | |
| else: | |
| return "Please log in to submit a model." | |
| # Connect event handlers | |
| check_size_button.click( | |
| fn=check_model_size_handler, | |
| inputs=[model_id_input], | |
| outputs=[size_check_result] | |
| ) | |
| refresh_benchmarks_button.click( | |
| fn=refresh_benchmarks_handler, | |
| inputs=[], | |
| outputs=[benchmark_dropdown] | |
| ) | |
| submit_model_button.click( | |
| fn=submit_model_handler, | |
| inputs=[ | |
| model_id_input, | |
| model_name_input, | |
| model_description_input, | |
| model_parameters_input, | |
| model_tag_input, | |
| benchmark_dropdown, | |
| user_state | |
| ], | |
| outputs=[submission_status] | |
| ) | |
| refresh_queue_button.click( | |
| fn=refresh_queue_handler, | |
| inputs=[], | |
| outputs=[queue_stats, queue_status, current_eval_info, progress_display] | |
| ) | |
| # Initialize on load | |
| submission_ui.load( | |
| fn=check_auth_on_load, | |
| inputs=[], | |
| outputs=[user_state] | |
| ) | |
| submission_ui.load( | |
| fn=lambda user: update_auth_message(user), | |
| inputs=[user_state], | |
| outputs=[auth_message] | |
| ) | |
| submission_ui.load( | |
| fn=refresh_benchmarks_handler, | |
| inputs=[], | |
| outputs=[benchmark_dropdown] | |
| ) | |
| submission_ui.load( | |
| fn=refresh_queue_handler, | |
| inputs=[], | |
| outputs=[queue_stats, queue_status, current_eval_info, progress_display] | |
| ) | |
| return submission_ui |