Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Summary Report Generator | |
| Master reporting with improvement recommendations and actionable insights. | |
| """ | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Any | |
| import statistics | |
| class SummaryReportGenerator: | |
| """Generator for comprehensive summary reports with actionable insights.""" | |
| def __init__(self): | |
| """Initialize the summary report generator.""" | |
| self.logger = logging.getLogger("SummaryReportGenerator") | |
| async def generate_master_report(self, | |
| results: Dict[str, Dict], | |
| session_dir: Path, | |
| classification_report: Dict) -> Dict: | |
| """ | |
| Generate comprehensive master report with actionable insights. | |
| Args: | |
| results: Raw test results | |
| session_dir: Session directory for output | |
| classification_report: Classification analysis results | |
| Returns: | |
| Master report dictionary | |
| """ | |
| self.logger.info("Generating master summary report...") | |
| # Generate all report sections | |
| executive_summary = self.generate_executive_summary(results, classification_report) | |
| detailed_metrics = self.generate_detailed_metrics(results, classification_report) | |
| improvement_roadmap = self.generate_improvement_roadmap(classification_report) | |
| technical_insights = self.generate_technical_insights(results, classification_report) | |
| # Compile master report | |
| master_report = { | |
| "report_metadata": { | |
| "generated_at": datetime.now().isoformat(), | |
| "total_questions": len(results), | |
| "session_directory": str(session_dir), | |
| "report_version": "1.0" | |
| }, | |
| "executive_summary": executive_summary, | |
| "detailed_metrics": detailed_metrics, | |
| "improvement_roadmap": improvement_roadmap, | |
| "technical_insights": technical_insights | |
| } | |
| # Save master report | |
| report_file = session_dir / "master_summary_report.json" | |
| with open(report_file, 'w') as f: | |
| json.dump(master_report, f, indent=2) | |
| # Generate human-readable markdown report | |
| markdown_report = self.generate_markdown_report(master_report) | |
| markdown_file = session_dir / "SUMMARY_REPORT.md" | |
| with open(markdown_file, 'w') as f: | |
| f.write(markdown_report) | |
| self.logger.info(f"Master report saved to: {report_file}") | |
| self.logger.info(f"Markdown report saved to: {markdown_file}") | |
| return master_report | |
| def generate_executive_summary(self, results: Dict, classification_report: Dict) -> Dict: | |
| """Generate executive summary with key metrics and status.""" | |
| performance_metrics = classification_report.get('performance_metrics', {}) | |
| # Calculate overall metrics | |
| total_questions = len(results) | |
| total_correct = sum(metrics.get('counts', {}).get('correct', 0) | |
| for metrics in performance_metrics.values()) | |
| total_partial = sum(metrics.get('counts', {}).get('partial', 0) | |
| for metrics in performance_metrics.values()) | |
| total_errors = sum(metrics.get('counts', {}).get('error', 0) + | |
| metrics.get('counts', {}).get('timeout', 0) | |
| for metrics in performance_metrics.values()) | |
| overall_accuracy = total_correct / total_questions if total_questions > 0 else 0 | |
| partial_rate = total_partial / total_questions if total_questions > 0 else 0 | |
| error_rate = total_errors / total_questions if total_questions > 0 else 0 | |
| # Best and worst performing classifications | |
| classification_accuracies = { | |
| classification: metrics.get('accuracy', 0) | |
| for classification, metrics in performance_metrics.items() | |
| } | |
| best_classification = max(classification_accuracies.items(), | |
| key=lambda x: x[1], default=('none', 0)) | |
| worst_classification = min(classification_accuracies.items(), | |
| key=lambda x: x[1], default=('none', 0)) | |
| # Production readiness assessment | |
| production_ready = overall_accuracy >= 0.7 and error_rate <= 0.1 | |
| return { | |
| "overall_performance": { | |
| "accuracy": overall_accuracy, | |
| "partial_accuracy": partial_rate, | |
| "error_rate": error_rate, | |
| "total_questions": total_questions | |
| }, | |
| "classification_performance": { | |
| "best": { | |
| "classification": best_classification[0], | |
| "accuracy": best_classification[1] | |
| }, | |
| "worst": { | |
| "classification": worst_classification[0], | |
| "accuracy": worst_classification[1] | |
| } | |
| }, | |
| "production_readiness": { | |
| "ready": production_ready, | |
| "accuracy_target": 0.7, | |
| "current_accuracy": overall_accuracy, | |
| "gap_to_target": max(0, 0.7 - overall_accuracy) | |
| }, | |
| "key_findings": self.extract_key_findings(results, classification_report) | |
| } | |
| def generate_detailed_metrics(self, results: Dict, classification_report: Dict) -> Dict: | |
| """Generate detailed performance metrics breakdown.""" | |
| performance_metrics = classification_report.get('performance_metrics', {}) | |
| tool_effectiveness = classification_report.get('tool_effectiveness', {}) | |
| # Processing time analysis | |
| all_times = [] | |
| for result in results.values(): | |
| time_taken = result.get('total_processing_time', 0) | |
| if time_taken > 0: | |
| all_times.append(time_taken) | |
| time_analysis = { | |
| "mean": statistics.mean(all_times) if all_times else 0, | |
| "median": statistics.median(all_times) if all_times else 0, | |
| "max": max(all_times) if all_times else 0, | |
| "min": min(all_times) if all_times else 0, | |
| "total_processing_time": sum(all_times) | |
| } | |
| # Tool usage ranking | |
| tool_ranking = sorted( | |
| tool_effectiveness.items(), | |
| key=lambda x: x[1].get('overall_effectiveness', 0), | |
| reverse=True | |
| ) | |
| return { | |
| "by_classification": performance_metrics, | |
| "processing_time_analysis": time_analysis, | |
| "tool_effectiveness_ranking": [ | |
| { | |
| "tool": tool, | |
| "effectiveness": data.get('overall_effectiveness', 0), | |
| "total_uses": data.get('total_uses', 0) | |
| } | |
| for tool, data in tool_ranking | |
| ], | |
| "error_analysis": self.analyze_errors(results) | |
| } | |
| def analyze_errors(self, results: Dict) -> Dict: | |
| """Analyze error patterns and types.""" | |
| error_types = {} | |
| timeout_questions = [] | |
| error_questions = [] | |
| for question_id, result in results.items(): | |
| solver_result = result.get('solver_result', {}) | |
| status = solver_result.get('status', 'unknown') | |
| if status == 'timeout': | |
| timeout_questions.append(question_id) | |
| elif status == 'error': | |
| error_questions.append(question_id) | |
| error_msg = solver_result.get('error', 'Unknown error') | |
| error_types[error_msg] = error_types.get(error_msg, 0) + 1 | |
| return { | |
| "timeout_count": len(timeout_questions), | |
| "error_count": len(error_questions), | |
| "timeout_questions": timeout_questions, | |
| "error_questions": error_questions, | |
| "error_types": error_types | |
| } | |
| def generate_improvement_roadmap(self, classification_report: Dict) -> Dict: | |
| """Generate structured improvement roadmap.""" | |
| improvement_areas = classification_report.get('improvement_areas', {}) | |
| # Prioritize improvements | |
| high_priority = [] | |
| medium_priority = [] | |
| low_priority = [] | |
| # High priority: Low accuracy classifications | |
| for item in improvement_areas.get('low_accuracy_classifications', []): | |
| if item['accuracy'] < 0.3: | |
| high_priority.append({ | |
| "type": "critical_accuracy", | |
| "target": item['classification'], | |
| "current_accuracy": item['accuracy'], | |
| "action": f"Redesign {item['classification']} agent logic and prompts", | |
| "expected_impact": "High - directly improves success rate" | |
| }) | |
| # High priority: High error rates | |
| for item in improvement_areas.get('high_error_rate_classifications', []): | |
| if item['error_rate'] > 0.4: | |
| high_priority.append({ | |
| "type": "stability", | |
| "target": item['classification'], | |
| "current_error_rate": item['error_rate'], | |
| "action": f"Fix timeout and error handling for {item['classification']} questions", | |
| "expected_impact": "High - reduces system failures" | |
| }) | |
| # Medium priority: Tool improvements | |
| for item in improvement_areas.get('ineffective_tools', []): | |
| if item['uses'] >= 5: # Only tools with significant usage | |
| medium_priority.append({ | |
| "type": "tool_effectiveness", | |
| "target": item['tool'], | |
| "current_effectiveness": item['effectiveness'], | |
| "action": f"Revise {item['tool']} tool implementation and error handling", | |
| "expected_impact": "Medium - improves specific question types" | |
| }) | |
| # Low priority: Performance optimizations | |
| for item in improvement_areas.get('slow_processing_classifications', []): | |
| low_priority.append({ | |
| "type": "performance", | |
| "target": item['classification'], | |
| "current_time": item['avg_time'], | |
| "action": f"Optimize processing pipeline for {item['classification']} questions", | |
| "expected_impact": "Low - improves user experience" | |
| }) | |
| return { | |
| "high_priority": high_priority, | |
| "medium_priority": medium_priority, | |
| "low_priority": low_priority, | |
| "recommended_sequence": self.generate_implementation_sequence( | |
| high_priority, medium_priority, low_priority | |
| ), | |
| "effort_estimates": self.estimate_implementation_effort( | |
| high_priority, medium_priority, low_priority | |
| ) | |
| } | |
| def generate_implementation_sequence(self, high_priority: List, medium_priority: List, low_priority: List) -> List[str]: | |
| """Generate recommended implementation sequence.""" | |
| sequence = [] | |
| # Start with highest impact accuracy improvements | |
| critical_accuracy = [item for item in high_priority if item['type'] == 'critical_accuracy'] | |
| if critical_accuracy: | |
| worst_accuracy = min(critical_accuracy, key=lambda x: x['current_accuracy']) | |
| sequence.append(f"1. Fix {worst_accuracy['target']} agent (critical accuracy issue)") | |
| # Then stability issues | |
| stability_issues = [item for item in high_priority if item['type'] == 'stability'] | |
| if stability_issues: | |
| sequence.append("2. Address high error rate classifications") | |
| # Then tool improvements that affect multiple classifications | |
| if medium_priority: | |
| sequence.append("3. Improve ineffective tools with high usage") | |
| # Finally performance optimizations | |
| if low_priority: | |
| sequence.append("4. Optimize processing performance") | |
| return sequence | |
| def estimate_implementation_effort(self, high_priority: List, medium_priority: List, low_priority: List) -> Dict: | |
| """Estimate implementation effort for improvements.""" | |
| return { | |
| "high_priority_items": len(high_priority), | |
| "estimated_effort": { | |
| "agent_redesign": f"{len([i for i in high_priority if i['type'] == 'critical_accuracy'])} weeks", | |
| "stability_fixes": f"{len([i for i in high_priority if i['type'] == 'stability'])} days", | |
| "tool_improvements": f"{len(medium_priority)} days", | |
| "performance_optimization": f"{len(low_priority)} days" | |
| }, | |
| "total_estimated_effort": f"{len(high_priority) * 5 + len(medium_priority) * 2 + len(low_priority)} person-days" | |
| } | |
| def generate_technical_insights(self, results: Dict, classification_report: Dict) -> Dict: | |
| """Generate technical insights and patterns.""" | |
| # Question complexity vs success rate | |
| complexity_analysis = self.analyze_complexity_patterns(results) | |
| # Classification accuracy patterns | |
| classification_patterns = self.analyze_classification_patterns(classification_report) | |
| # Tool usage patterns | |
| tool_patterns = self.analyze_tool_patterns(classification_report) | |
| return { | |
| "complexity_analysis": complexity_analysis, | |
| "classification_patterns": classification_patterns, | |
| "tool_patterns": tool_patterns, | |
| "system_limitations": self.identify_system_limitations(results, classification_report) | |
| } | |
| def analyze_complexity_patterns(self, results: Dict) -> Dict: | |
| """Analyze how question complexity affects success rate.""" | |
| complexity_buckets = {} | |
| for result in results.values(): | |
| classification = result.get('classification', {}) | |
| complexity = classification.get('complexity', 0) | |
| validation = result.get('validation', {}) | |
| success = validation.get('validation_status') == 'correct' | |
| if complexity not in complexity_buckets: | |
| complexity_buckets[complexity] = {'total': 0, 'successful': 0} | |
| complexity_buckets[complexity]['total'] += 1 | |
| if success: | |
| complexity_buckets[complexity]['successful'] += 1 | |
| # Calculate success rates by complexity | |
| complexity_success_rates = {} | |
| for complexity, data in complexity_buckets.items(): | |
| success_rate = data['successful'] / data['total'] if data['total'] > 0 else 0 | |
| complexity_success_rates[complexity] = { | |
| 'success_rate': success_rate, | |
| 'total_questions': data['total'] | |
| } | |
| return complexity_success_rates | |
| def analyze_classification_patterns(self, classification_report: Dict) -> Dict: | |
| """Analyze patterns in classification performance.""" | |
| performance_metrics = classification_report.get('performance_metrics', {}) | |
| patterns = { | |
| "high_performers": [], | |
| "low_performers": [], | |
| "inconsistent_performers": [] | |
| } | |
| for classification, metrics in performance_metrics.items(): | |
| accuracy = metrics.get('accuracy', 0) | |
| error_rate = metrics.get('error_rate', 0) | |
| total_questions = metrics.get('total_questions', 0) | |
| if accuracy >= 0.8 and total_questions >= 3: | |
| patterns["high_performers"].append({ | |
| "classification": classification, | |
| "accuracy": accuracy, | |
| "questions": total_questions | |
| }) | |
| elif accuracy <= 0.3 and total_questions >= 3: | |
| patterns["low_performers"].append({ | |
| "classification": classification, | |
| "accuracy": accuracy, | |
| "questions": total_questions | |
| }) | |
| elif error_rate > 0.5: | |
| patterns["inconsistent_performers"].append({ | |
| "classification": classification, | |
| "error_rate": error_rate, | |
| "questions": total_questions | |
| }) | |
| return patterns | |
| def analyze_tool_patterns(self, classification_report: Dict) -> Dict: | |
| """Analyze tool usage and effectiveness patterns.""" | |
| tool_effectiveness = classification_report.get('tool_effectiveness', {}) | |
| # Group tools by effectiveness | |
| highly_effective = [] | |
| moderately_effective = [] | |
| ineffective = [] | |
| for tool, data in tool_effectiveness.items(): | |
| effectiveness = data.get('overall_effectiveness', 0) | |
| uses = data.get('total_uses', 0) | |
| if uses >= 3: # Only consider tools with meaningful usage | |
| if effectiveness >= 0.8: | |
| highly_effective.append({ | |
| "tool": tool, | |
| "effectiveness": effectiveness, | |
| "uses": uses | |
| }) | |
| elif effectiveness >= 0.5: | |
| moderately_effective.append({ | |
| "tool": tool, | |
| "effectiveness": effectiveness, | |
| "uses": uses | |
| }) | |
| else: | |
| ineffective.append({ | |
| "tool": tool, | |
| "effectiveness": effectiveness, | |
| "uses": uses | |
| }) | |
| return { | |
| "highly_effective_tools": highly_effective, | |
| "moderately_effective_tools": moderately_effective, | |
| "ineffective_tools": ineffective | |
| } | |
| def identify_system_limitations(self, results: Dict, classification_report: Dict) -> List[str]: | |
| """Identify current system limitations.""" | |
| limitations = [] | |
| # Overall accuracy limitation | |
| overall_accuracy = sum( | |
| metrics.get('counts', {}).get('correct', 0) | |
| for metrics in classification_report.get('performance_metrics', {}).values() | |
| ) / len(results) if results else 0 | |
| if overall_accuracy < 0.7: | |
| limitations.append(f"Overall accuracy ({overall_accuracy:.1%}) below production target (70%)") | |
| # High error rate limitation | |
| total_errors = sum( | |
| metrics.get('counts', {}).get('error', 0) + metrics.get('counts', {}).get('timeout', 0) | |
| for metrics in classification_report.get('performance_metrics', {}).values() | |
| ) | |
| error_rate = total_errors / len(results) if results else 0 | |
| if error_rate > 0.1: | |
| limitations.append(f"High error/timeout rate ({error_rate:.1%}) indicates stability issues") | |
| # Processing time limitation | |
| slow_classifications = classification_report.get('improvement_areas', {}).get('slow_processing_classifications', []) | |
| if slow_classifications: | |
| limitations.append("Slow processing times for some question types may affect user experience") | |
| # Tool effectiveness limitation | |
| ineffective_tools = classification_report.get('improvement_areas', {}).get('ineffective_tools', []) | |
| if len(ineffective_tools) > 3: | |
| limitations.append("Multiple tools showing low effectiveness, impacting overall system performance") | |
| return limitations | |
| def extract_key_findings(self, results: Dict, classification_report: Dict) -> List[str]: | |
| """Extract key findings from the analysis.""" | |
| findings = [] | |
| performance_metrics = classification_report.get('performance_metrics', {}) | |
| # Best performing classification | |
| if performance_metrics: | |
| best_classification = max(performance_metrics.items(), key=lambda x: x[1].get('accuracy', 0)) | |
| findings.append(f"Best performing agent: {best_classification[0]} ({best_classification[1].get('accuracy', 0):.1%} accuracy)") | |
| # Most problematic classification | |
| if performance_metrics: | |
| worst_classification = min(performance_metrics.items(), key=lambda x: x[1].get('accuracy', 0)) | |
| if worst_classification[1].get('accuracy', 0) < 0.5: | |
| findings.append(f"Critical issue: {worst_classification[0]} agent has {worst_classification[1].get('accuracy', 0):.1%} accuracy") | |
| # Tool insights | |
| tool_effectiveness = classification_report.get('tool_effectiveness', {}) | |
| if tool_effectiveness: | |
| most_effective_tool = max(tool_effectiveness.items(), key=lambda x: x[1].get('overall_effectiveness', 0)) | |
| findings.append(f"Most effective tool: {most_effective_tool[0]} ({most_effective_tool[1].get('overall_effectiveness', 0):.1%} success rate)") | |
| return findings | |
| def generate_markdown_report(self, master_report: Dict) -> str: | |
| """Generate human-readable markdown report.""" | |
| report = [] | |
| # Header | |
| metadata = master_report.get('report_metadata', {}) | |
| report.append("# GAIA Test System - Master Summary Report") | |
| report.append(f"**Generated:** {metadata.get('generated_at', 'Unknown')}") | |
| report.append(f"**Total Questions:** {metadata.get('total_questions', 0)}") | |
| report.append("") | |
| # Executive Summary | |
| exec_summary = master_report.get('executive_summary', {}) | |
| overall_perf = exec_summary.get('overall_performance', {}) | |
| report.append("## Executive Summary") | |
| report.append(f"- **Overall Accuracy:** {overall_perf.get('accuracy', 0):.1%}") | |
| report.append(f"- **Error Rate:** {overall_perf.get('error_rate', 0):.1%}") | |
| production = exec_summary.get('production_readiness', {}) | |
| if production.get('ready', False): | |
| report.append("- **Status:** β Production Ready") | |
| else: | |
| gap = production.get('gap_to_target', 0) | |
| report.append(f"- **Status:** β Not Production Ready (need {gap:.1%} improvement)") | |
| report.append("") | |
| # Key Findings | |
| findings = exec_summary.get('key_findings', []) | |
| if findings: | |
| report.append("### Key Findings") | |
| for finding in findings: | |
| report.append(f"- {finding}") | |
| report.append("") | |
| # Improvement Roadmap | |
| roadmap = master_report.get('improvement_roadmap', {}) | |
| high_priority = roadmap.get('high_priority', []) | |
| if high_priority: | |
| report.append("## High Priority Improvements") | |
| for i, item in enumerate(high_priority, 1): | |
| report.append(f"{i}. **{item.get('target', 'Unknown')}** - {item.get('action', 'No action specified')}") | |
| report.append(f" - Current: {item.get('current_accuracy', item.get('current_error_rate', 'Unknown'))}") | |
| report.append(f" - Impact: {item.get('expected_impact', 'Unknown')}") | |
| report.append("") | |
| # Implementation Sequence | |
| sequence = roadmap.get('recommended_sequence', []) | |
| if sequence: | |
| report.append("## Recommended Implementation Sequence") | |
| for step in sequence: | |
| report.append(f"- {step}") | |
| report.append("") | |
| return "\n".join(report) |