Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Level-Specific GAIA Testing with Real-Time Accuracy Tracking | |
| Focus on achieving 30% Level 1 accuracy through strategic testing and breakthrough leveraging. | |
| """ | |
| import json | |
| import time | |
| import argparse | |
| import logging | |
| import sys | |
| from datetime import datetime | |
| from typing import Dict, List, Optional | |
| from collections import defaultdict | |
| from pathlib import Path | |
| # Add parent directory to path for imports | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from gaia_web_loader import GAIAQuestionLoaderWeb | |
| from main import GAIASolver | |
| from question_classifier import QuestionClassifier | |
| class LevelSpecificGAIATester: | |
| """Enhanced GAIA testing with level-specific focus and real-time accuracy tracking""" | |
| def __init__(self, target_level: str = "1", target_accuracy: float = 0.30): | |
| self.target_level = target_level | |
| self.target_accuracy = target_accuracy | |
| self.loader = GAIAQuestionLoaderWeb() | |
| self.classifier = QuestionClassifier() | |
| self.solver = GAIASolver(use_kluster=True, kluster_model="qwen3-235b") | |
| self.results = [] | |
| self.breakthrough_categories = ['chess', 'wikipedia', 'video', 'excel', 'research'] | |
| # Create logs directory if it doesn't exist | |
| Path("logs").mkdir(exist_ok=True) | |
| # Setup logging | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.log_file = f"logs/level{target_level}_test_{timestamp}.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(self.log_file), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| # Load validation metadata for accuracy tracking | |
| self.validation_data = self.load_validation_metadata() | |
| def load_validation_metadata(self): | |
| """Load GAIA validation metadata for answer checking""" | |
| try: | |
| validation_data = {} | |
| with open('gaia_validation_metadata.jsonl', 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| entry = json.loads(line) | |
| validation_data[entry['task_id']] = entry | |
| self.logger.info(f"π Loaded {len(validation_data)} validation entries") | |
| return validation_data | |
| except Exception as e: | |
| self.logger.error(f"Failed to load validation metadata: {e}") | |
| return {} | |
| def get_questions_by_level(self, level: str) -> List[Dict]: | |
| """Get all questions for a specific level""" | |
| level_questions = [] | |
| for question in self.loader.questions: | |
| # Check validation metadata for level information | |
| task_id = question.get('task_id') | |
| if task_id in self.validation_data: | |
| question_level = str(self.validation_data[task_id].get('Level', '')) | |
| if question_level == level: | |
| level_questions.append(question) | |
| self.logger.info(f"π― Found {len(level_questions)} Level {level} questions") | |
| return level_questions | |
| def classify_question_type(self, question: Dict) -> str: | |
| """Classify question to identify breakthrough opportunities""" | |
| question_text = question.get('question', '').lower() | |
| # Check for breakthrough categories | |
| if any(keyword in question_text for keyword in ['chess', 'move', 'position', 'algebraic']): | |
| return 'chess' | |
| elif any(keyword in question_text for keyword in ['wikipedia', 'featured article', 'nominated']): | |
| return 'wikipedia' | |
| elif any(keyword in question_text for keyword in ['video', 'youtube', 'audio', 'dialogue']): | |
| return 'video' | |
| elif any(keyword in question_text for keyword in ['excel', 'spreadsheet', 'sales', 'total']): | |
| return 'excel' | |
| elif any(keyword in question_text for keyword in ['research', 'find', 'search', 'who', 'what', 'when']): | |
| return 'research' | |
| else: | |
| return 'general' | |
| def calculate_real_time_accuracy(self) -> Dict: | |
| """Calculate real-time accuracy metrics for Level 1 progress""" | |
| if not self.results: | |
| return { | |
| 'total_tested': 0, | |
| 'correct_answers': 0, | |
| 'current_accuracy': 0.0, | |
| 'target_needed': int(53 * self.target_accuracy), # 16 for 30% | |
| 'remaining_to_target': int(53 * self.target_accuracy), | |
| 'on_target': False | |
| } | |
| level_results = [r for r in self.results if r.get('level') == self.target_level] | |
| correct_count = len([r for r in level_results if r.get('validation_status') == 'CORRECT']) | |
| total_tested = len(level_results) | |
| current_accuracy = correct_count / total_tested if total_tested > 0 else 0.0 | |
| target_needed = int(53 * self.target_accuracy) # 16 for 30% | |
| remaining_to_target = max(0, target_needed - correct_count) | |
| on_target = current_accuracy >= self.target_accuracy | |
| return { | |
| 'total_tested': total_tested, | |
| 'correct_answers': correct_count, | |
| 'current_accuracy': current_accuracy, | |
| 'target_needed': target_needed, | |
| 'remaining_to_target': remaining_to_target, | |
| 'on_target': on_target | |
| } | |
| def validate_answer(self, task_id: str, our_answer: str) -> str: | |
| """Validate answer against GAIA metadata""" | |
| if task_id not in self.validation_data: | |
| return 'UNKNOWN' | |
| expected_answer = self.validation_data[task_id].get('Final answer', '').strip() | |
| our_answer = str(our_answer).strip() | |
| # Normalize for comparison | |
| def normalize(text): | |
| return str(text).lower().strip().replace(',', ', ').replace(' ', ' ') | |
| expected_normalized = normalize(expected_answer) | |
| our_normalized = normalize(our_answer) | |
| if expected_normalized == our_normalized: | |
| return 'CORRECT' | |
| elif expected_normalized in our_normalized or our_normalized in expected_normalized: | |
| return 'PARTIAL' | |
| else: | |
| return 'INCORRECT' | |
| def test_question(self, question: Dict) -> Dict: | |
| """Test a single question with enhanced validation""" | |
| task_id = question.get('task_id', 'unknown') | |
| question_text = question.get('question', '') | |
| question_type = self.classify_question_type(question) | |
| # Get level from validation metadata | |
| level = str(self.validation_data.get(task_id, {}).get('Level', 'unknown')) | |
| self.logger.info(f"\nπ§ͺ Testing {task_id} (Level {level}, Type: {question_type})") | |
| self.logger.info(f"π Question: {question_text[:100]}...") | |
| start_time = time.time() | |
| try: | |
| # Use extended timeout for complex questions | |
| timeout = 1800 if question_type in self.breakthrough_categories else 900 | |
| answer = self.solver.solve_question(question) | |
| solve_time = time.time() - start_time | |
| # Validate answer | |
| validation_status = self.validate_answer(task_id, answer) | |
| expected_answer = self.validation_data.get(task_id, {}).get('Final answer', 'Unknown') | |
| result = { | |
| 'task_id': task_id, | |
| 'level': level, | |
| 'question_type': question_type, | |
| 'question': question_text[:200] + "...", | |
| 'our_answer': answer, | |
| 'expected_answer': expected_answer, | |
| 'validation_status': validation_status, | |
| 'solve_time': solve_time, | |
| 'breakthrough_category': question_type in self.breakthrough_categories, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.results.append(result) | |
| # Log result with status emoji | |
| status_emoji = "β " if validation_status == "CORRECT" else "β" if validation_status == "INCORRECT" else "πΆ" | |
| self.logger.info(f"{status_emoji} Result: {validation_status}") | |
| self.logger.info(f"π‘ Our Answer: {answer}") | |
| self.logger.info(f"π― Expected: {expected_answer}") | |
| self.logger.info(f"β±οΈ Time: {solve_time:.1f}s") | |
| # Calculate and display real-time progress | |
| progress = self.calculate_real_time_accuracy() | |
| self.logger.info(f"π Level {self.target_level} Progress: {progress['correct_answers']}/{progress['target_needed']} target ({progress['current_accuracy']:.1%})") | |
| if progress['on_target']: | |
| self.logger.info(f"π TARGET ACHIEVED! {progress['current_accuracy']:.1%} >= {self.target_accuracy:.1%}") | |
| return result | |
| except Exception as e: | |
| error_result = { | |
| 'task_id': task_id, | |
| 'level': level, | |
| 'question_type': question_type, | |
| 'question': question_text[:200] + "...", | |
| 'our_answer': f"ERROR: {str(e)}", | |
| 'expected_answer': self.validation_data.get(task_id, {}).get('Final answer', 'Unknown'), | |
| 'validation_status': 'ERROR', | |
| 'solve_time': time.time() - start_time, | |
| 'breakthrough_category': False, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.results.append(error_result) | |
| self.logger.error(f"β Error testing {task_id}: {e}") | |
| return error_result | |
| def run_level_campaign(self, level: str = None, max_questions: int = None) -> Dict: | |
| """Run strategic testing campaign for specific level""" | |
| if level is None: | |
| level = self.target_level | |
| level_questions = self.get_questions_by_level(level) | |
| if max_questions: | |
| level_questions = level_questions[:max_questions] | |
| self.logger.info(f"\nπ Starting Level {level} Campaign") | |
| self.logger.info(f"π― Target: {self.target_accuracy:.1%} accuracy ({int(len(level_questions) * self.target_accuracy)} correct)") | |
| self.logger.info(f"π Questions to test: {len(level_questions)}") | |
| # Prioritize breakthrough categories | |
| breakthrough_questions = [q for q in level_questions if self.classify_question_type(q) in self.breakthrough_categories] | |
| other_questions = [q for q in level_questions if self.classify_question_type(q) not in self.breakthrough_categories] | |
| self.logger.info(f"π Breakthrough questions: {len(breakthrough_questions)}") | |
| self.logger.info(f"π Other questions: {len(other_questions)}") | |
| # Test breakthrough questions first | |
| all_questions = breakthrough_questions + other_questions | |
| for i, question in enumerate(all_questions, 1): | |
| self.logger.info(f"\n--- Question {i}/{len(all_questions)} ---") | |
| self.test_question(question) | |
| # Check if target achieved early | |
| progress = self.calculate_real_time_accuracy() | |
| if progress['on_target'] and progress['total_tested'] >= 10: # Minimum 10 questions for statistical validity | |
| self.logger.info(f"π EARLY TARGET ACHIEVEMENT! {progress['current_accuracy']:.1%} >= {self.target_accuracy:.1%}") | |
| break | |
| return self.generate_final_report() | |
| def generate_final_report(self) -> Dict: | |
| """Generate comprehensive test report""" | |
| progress = self.calculate_real_time_accuracy() | |
| # Category breakdown | |
| category_stats = defaultdict(lambda: {'total': 0, 'correct': 0}) | |
| for result in self.results: | |
| if result.get('level') == self.target_level: | |
| category = result.get('question_type', 'unknown') | |
| category_stats[category]['total'] += 1 | |
| if result.get('validation_status') == 'CORRECT': | |
| category_stats[category]['correct'] += 1 | |
| # Calculate category accuracy rates | |
| for category in category_stats: | |
| total = category_stats[category]['total'] | |
| category_stats[category]['accuracy'] = category_stats[category]['correct'] / total if total > 0 else 0 | |
| report = { | |
| 'campaign_summary': { | |
| 'target_level': self.target_level, | |
| 'target_accuracy': self.target_accuracy, | |
| 'achievement_status': 'ACHIEVED' if progress['on_target'] else 'IN_PROGRESS', | |
| 'final_accuracy': progress['current_accuracy'], | |
| 'correct_answers': progress['correct_answers'], | |
| 'total_tested': progress['total_tested'], | |
| 'target_needed': progress['target_needed'] | |
| }, | |
| 'category_breakdown': dict(category_stats), | |
| 'breakthrough_performance': { | |
| category: stats for category, stats in category_stats.items() | |
| if category in self.breakthrough_categories | |
| }, | |
| 'detailed_results': self.results, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'log_file': self.log_file | |
| } | |
| # Save report | |
| report_file = f"level{self.target_level}_campaign_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(report_file, 'w') as f: | |
| json.dump(report, f, indent=2) | |
| self.logger.info(f"\nπ FINAL CAMPAIGN REPORT") | |
| self.logger.info(f"π― Target: {self.target_accuracy:.1%} Level {self.target_level} accuracy") | |
| self.logger.info(f"π Achievement: {progress['current_accuracy']:.1%} ({progress['correct_answers']}/{progress['total_tested']})") | |
| self.logger.info(f"π Status: {'β TARGET ACHIEVED' if progress['on_target'] else 'π IN PROGRESS'}") | |
| self.logger.info(f"πΎ Report saved: {report_file}") | |
| return report | |
| def main(): | |
| """Main function for level-specific GAIA testing""" | |
| parser = argparse.ArgumentParser(description='Level-Specific GAIA Testing') | |
| parser.add_argument('--level', type=str, default='1', help='Target level to test (1, 2, 3)') | |
| parser.add_argument('--target-accuracy', type=float, default=0.30, help='Target accuracy (0.30 = 30%)') | |
| parser.add_argument('--max-questions', type=int, help='Maximum questions to test') | |
| args = parser.parse_args() | |
| print(f"π Level-Specific GAIA Testing Campaign") | |
| print(f"π― Level: {args.level}") | |
| print(f"π Target Accuracy: {args.target_accuracy:.1%}") | |
| print("=" * 60) | |
| tester = LevelSpecificGAIATester( | |
| target_level=args.level, | |
| target_accuracy=args.target_accuracy | |
| ) | |
| try: | |
| report = tester.run_level_campaign(level=args.level, max_questions=args.max_questions) | |
| # Print summary | |
| summary = report['campaign_summary'] | |
| print(f"\nπ CAMPAIGN COMPLETE!") | |
| print(f"π― Target: {summary['target_accuracy']:.1%}") | |
| print(f"π Achieved: {summary['final_accuracy']:.1%}") | |
| print(f"π Status: {summary['achievement_status']}") | |
| print(f"π― Score: {summary['correct_answers']}/{summary['total_tested']}") | |
| except Exception as e: | |
| print(f"β Campaign failed: {e}") | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) |