Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| from datasets import load_dataset | |
| import time | |
| from typing import Dict, List, Tuple | |
| from config import ModelManager | |
| class MathsBenchmarkApp: | |
| def __init__(self): | |
| """Initialise the Mathematics Benchmark application.""" | |
| self.dataset = None | |
| self.df = None | |
| self.model_manager = ModelManager() | |
| self.load_dataset() | |
| def load_dataset(self) -> None: | |
| """Load the MathsBench dataset from HuggingFace.""" | |
| try: | |
| self.dataset = load_dataset("0xnu/maths_bench", split="train") | |
| self.df = pd.DataFrame(self.dataset) | |
| print(f"Dataset loaded successfully: {len(self.df)} questions") | |
| except Exception as e: | |
| print(f"Error loading dataset: {e}") | |
| self.df = pd.DataFrame() | |
| def setup_api_provider(self, provider_name: str, api_key: str) -> Tuple[bool, str]: | |
| """Setup API provider with key.""" | |
| return self.model_manager.setup_provider(provider_name, api_key) | |
| def get_filtered_data(self, category: str = "All", difficulty: str = "All") -> pd.DataFrame: | |
| """Filter dataset based on category and difficulty.""" | |
| if self.df.empty: | |
| return pd.DataFrame() | |
| filtered_df = self.df.copy() | |
| if category != "All": | |
| filtered_df = filtered_df[filtered_df['category'] == category] | |
| if difficulty != "All": | |
| filtered_df = filtered_df[filtered_df['difficulty'] == difficulty] | |
| return filtered_df | |
| def create_prompt_for_question(self, question_data: Dict) -> str: | |
| """Create a structured prompt for the model.""" | |
| prompt = f"""You are an expert mathematician. Solve this question and select the correct answer from the given options. | |
| Question: {question_data['question']} | |
| Available options: | |
| A) {question_data['option_a']} | |
| B) {question_data['option_b']} | |
| C) {question_data['option_c']} | |
| D) {question_data['option_d']} | |
| Instructions: | |
| 1. Work through the problem step by step | |
| 2. Compare your result with each option | |
| 3. Select the option that matches your calculated answer | |
| 4. Respond with only the letter of your chosen answer | |
| Your response must end with: "My final answer is: [LETTER]" | |
| Example format: | |
| First I'll solve... [working] | |
| Checking the options... | |
| My final answer is: B""" | |
| return prompt | |
| def evaluate_single_question(self, question_id: int, model: str) -> Dict: | |
| """Evaluate a single question using the specified model.""" | |
| if not self.model_manager.get_configured_providers(): | |
| return {"error": "No API providers configured"} | |
| question_data = self.df[self.df['question_id'] == question_id].iloc[0].to_dict() | |
| prompt = self.create_prompt_for_question(question_data) | |
| try: | |
| ai_response = self.model_manager.generate_response(prompt, model, max_tokens=800) | |
| # Parse the response to extract the answer | |
| ai_answer = self.extract_answer_from_response(ai_response) | |
| # Convert correct answer to letter format if needed | |
| correct_answer_letter = self.convert_answer_to_letter(question_data) | |
| is_correct = ai_answer == correct_answer_letter | |
| return { | |
| "question_id": question_id, | |
| "question": question_data['question'], | |
| "category": question_data['category'], | |
| "difficulty": question_data['difficulty'], | |
| "correct_answer": question_data['correct_answer'], | |
| "correct_answer_letter": correct_answer_letter, | |
| "ai_answer": ai_answer, | |
| "is_correct": is_correct, | |
| "ai_response": ai_response, | |
| "model": model, | |
| "options": { | |
| "A": question_data['option_a'], | |
| "B": question_data['option_b'], | |
| "C": question_data['option_c'], | |
| "D": question_data['option_d'] | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"API call failed: {str(e)}"} | |
| def convert_answer_to_letter(self, question_data: Dict) -> str: | |
| """Convert the correct answer to its corresponding letter option.""" | |
| correct_answer = str(question_data['correct_answer']).strip() | |
| options = { | |
| 'A': str(question_data['option_a']).strip(), | |
| 'B': str(question_data['option_b']).strip(), | |
| 'C': str(question_data['option_c']).strip(), | |
| 'D': str(question_data['option_d']).strip() | |
| } | |
| # Find which option matches the correct answer | |
| for letter, option_value in options.items(): | |
| if correct_answer == option_value: | |
| return letter | |
| # If no exact match, try case-insensitive comparison | |
| correct_lower = correct_answer.lower() | |
| for letter, option_value in options.items(): | |
| if correct_lower == option_value.lower(): | |
| return letter | |
| # If still no match, return the first option as fallback | |
| return 'A' | |
| def extract_answer_from_response(self, response: str) -> str: | |
| """Extract the letter answer from AI response.""" | |
| response_upper = response.upper() | |
| # Primary method: Look for "MY FINAL ANSWER IS: X" pattern | |
| if "MY FINAL ANSWER IS:" in response_upper: | |
| answer_part = response_upper.split("MY FINAL ANSWER IS:")[1].strip() | |
| for letter in ['A', 'B', 'C', 'D']: | |
| if letter in answer_part[:3]: # Check first 3 chars after the phrase | |
| return letter | |
| # Secondary method: Look for "ANSWER:" pattern | |
| if "ANSWER:" in response_upper: | |
| answer_part = response_upper.split("ANSWER:")[1].strip() | |
| for letter in ['A', 'B', 'C', 'D']: | |
| if letter in answer_part[:10]: | |
| return letter | |
| # Tertiary method: Look for explicit statements like "THE ANSWER IS A" | |
| for letter in ['A', 'B', 'C', 'D']: | |
| patterns = [ | |
| f"THE ANSWER IS {letter}", | |
| f"ANSWER IS {letter}", | |
| f"I CHOOSE {letter}", | |
| f"SELECT {letter}", | |
| f"OPTION {letter}" | |
| ] | |
| for pattern in patterns: | |
| if pattern in response_upper: | |
| return letter | |
| # Final fallback: Look for last occurrence of a standalone letter | |
| letters_found = [] | |
| for letter in ['A', 'B', 'C', 'D']: | |
| if f" {letter}" in response_upper or f"{letter})" in response_upper or f"({letter}" in response_upper: | |
| letters_found.append(letter) | |
| if letters_found: | |
| return letters_found[-1] # Return the last found letter | |
| return "Unknown" | |
| def run_benchmark(self, category: str, difficulty: str, num_questions: int, model: str, progress=gr.Progress()) -> Tuple[pd.DataFrame, str]: | |
| """Run benchmark evaluation on filtered questions.""" | |
| if not self.model_manager.get_configured_providers(): | |
| return pd.DataFrame(), "Please configure API providers first" | |
| filtered_df = self.get_filtered_data(category, difficulty) | |
| if filtered_df.empty: | |
| return pd.DataFrame(), "No questions found for the selected filters" | |
| # Sample questions if requested number is less than available | |
| if num_questions < len(filtered_df): | |
| filtered_df = filtered_df.sample(n=num_questions, random_state=42) | |
| results = [] | |
| correct_count = 0 | |
| progress(0, desc="Starting evaluation...") | |
| for i, (_, row) in enumerate(filtered_df.iterrows()): | |
| progress((i + 1) / len(filtered_df), desc=f"Evaluating question {i + 1}/{len(filtered_df)}") | |
| result = self.evaluate_single_question(row['question_id'], model) | |
| if "error" not in result: | |
| results.append(result) | |
| if result['is_correct']: | |
| correct_count += 1 | |
| # Add small delay to avoid rate limits | |
| time.sleep(0.5) | |
| if not results: | |
| return pd.DataFrame(), "No valid results obtained" | |
| results_df = pd.DataFrame(results) | |
| accuracy = (correct_count / len(results)) * 100 | |
| summary = f""" | |
| Benchmark Complete! | |
| Total Questions: {len(results)} | |
| Correct Answers: {correct_count} | |
| Accuracy: {accuracy:.2f}% | |
| Model: {model} | |
| Category: {category} | |
| Difficulty: {difficulty} | |
| """ | |
| return results_df, summary | |
| # Global app instance | |
| app = MathsBenchmarkApp() | |
| def create_gradio_interface(): | |
| """Create the Gradio interface for the Mathematics Benchmark.""" | |
| # Get unique categories and difficulties | |
| categories = ["All"] + sorted(app.df['category'].unique().tolist()) if not app.df.empty else ["All"] | |
| difficulties = ["All"] + sorted(app.df['difficulty'].unique().tolist()) if not app.df.empty else ["All"] | |
| with gr.Blocks(title="Mathematics Benchmark", theme=gr.themes.Soft()) as interface: | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px;"> | |
| <h1>🧮 LLM Mathematics Benchmark</h1> | |
| <p>Evaluate Large Language Models on mathematical reasoning tasks using a diverse dataset of questions</p> | |
| </div> | |
| """) | |
| with gr.Tab("🔧 Configuration"): | |
| gr.HTML("<h3>API Configuration</h3><p>Configure your API keys for different model providers:</p>") | |
| # OpenAI Configuration | |
| with gr.Group(): | |
| gr.HTML("<h4>🤖 OpenAI Configuration</h4>") | |
| with gr.Row(): | |
| openai_key_input = gr.Textbox( | |
| label="OpenAI API Key", | |
| placeholder="Enter your OpenAI API key", | |
| type="password", | |
| scale=3 | |
| ) | |
| openai_setup_btn = gr.Button("Configure OpenAI", variant="primary", scale=1) | |
| openai_status = gr.Textbox(label="OpenAI Status", interactive=False) | |
| # Claude Configuration | |
| with gr.Group(): | |
| gr.HTML("<h4>🧠 Anthropic Claude Configuration</h4>") | |
| with gr.Row(): | |
| claude_key_input = gr.Textbox( | |
| label="Anthropic API Key", | |
| placeholder="Enter your Anthropic API key", | |
| type="password", | |
| scale=3 | |
| ) | |
| claude_setup_btn = gr.Button("Configure Claude", variant="primary", scale=1) | |
| claude_status = gr.Textbox(label="Claude Status", interactive=False) | |
| # Configuration status | |
| config_summary = gr.Textbox( | |
| label="Configuration Summary", | |
| placeholder="No providers configured", | |
| interactive=False | |
| ) | |
| def setup_openai(api_key): | |
| success, message = app.setup_api_provider("openai", api_key) | |
| update_config_summary() | |
| return message | |
| def setup_claude(api_key): | |
| success, message = app.setup_api_provider("claude", api_key) | |
| update_config_summary() | |
| return message | |
| def update_config_summary(): | |
| configured = app.model_manager.get_configured_providers() | |
| if not configured: | |
| return "No providers configured" | |
| return f"Configured providers: {', '.join(configured)}" | |
| openai_setup_btn.click( | |
| fn=setup_openai, | |
| inputs=[openai_key_input], | |
| outputs=[openai_status] | |
| ) | |
| claude_setup_btn.click( | |
| fn=setup_claude, | |
| inputs=[claude_key_input], | |
| outputs=[claude_status] | |
| ) | |
| with gr.Tab("📊 Dataset Explorer"): | |
| with gr.Row(): | |
| filter_category = gr.Dropdown( | |
| choices=categories, | |
| value="All", | |
| label="Category", | |
| scale=1 | |
| ) | |
| filter_difficulty = gr.Dropdown( | |
| choices=difficulties, | |
| value="All", | |
| label="Difficulty", | |
| scale=1 | |
| ) | |
| refresh_btn = gr.Button("Refresh Data", scale=1) | |
| dataset_table = gr.Dataframe( | |
| headers=["question_id", "category", "difficulty", "question", "correct_answer"], | |
| label="Filtered Dataset" | |
| ) | |
| def update_table(category, difficulty): | |
| filtered_df = app.get_filtered_data(category, difficulty) | |
| if filtered_df.empty: | |
| return pd.DataFrame() | |
| return filtered_df[['question_id', 'category', 'difficulty', 'question', 'correct_answer']] | |
| refresh_btn.click( | |
| fn=update_table, | |
| inputs=[filter_category, filter_difficulty], | |
| outputs=[dataset_table] | |
| ) | |
| # Initial load | |
| interface.load( | |
| fn=update_table, | |
| inputs=[filter_category, filter_difficulty], | |
| outputs=[dataset_table] | |
| ) | |
| with gr.Tab("🧪 Run Benchmark"): | |
| with gr.Row(): | |
| bench_category = gr.Dropdown( | |
| choices=categories, | |
| value="All", | |
| label="Category Filter" | |
| ) | |
| bench_difficulty = gr.Dropdown( | |
| choices=difficulties, | |
| value="All", | |
| label="Difficulty Filter" | |
| ) | |
| with gr.Row(): | |
| num_questions = gr.Slider( | |
| minimum=1, | |
| maximum=100, | |
| value=10, | |
| step=1, | |
| label="Number of Questions" | |
| ) | |
| model_choice = gr.Dropdown( | |
| choices=app.model_manager.get_flat_model_list(), | |
| value=app.model_manager.get_flat_model_list()[0] if app.model_manager.get_flat_model_list() else None, | |
| label="Model" | |
| ) | |
| run_benchmark_btn = gr.Button("Run Benchmark", variant="primary", size="lg") | |
| benchmark_summary = gr.Textbox( | |
| label="Benchmark Results Summary", | |
| lines=8, | |
| interactive=False | |
| ) | |
| results_table = gr.Dataframe( | |
| label="Detailed Results", | |
| headers=["question_id", "question", "category", "difficulty", "correct_answer", "correct_letter", "ai_answer", "ai_choice", "is_correct"] | |
| ) | |
| def run_benchmark_wrapper(category, difficulty, num_q, model): | |
| results_df, summary = app.run_benchmark(category, difficulty, num_q, model) | |
| if results_df.empty: | |
| return summary, pd.DataFrame() | |
| # Prepare display dataframe | |
| display_df = results_df[['question_id', 'question', 'category', 'difficulty', 'correct_answer', 'correct_answer_letter', 'ai_answer', 'is_correct']].copy() | |
| # Add the actual AI choice text | |
| display_df['ai_choice'] = display_df.apply( | |
| lambda row: results_df[results_df['question_id'] == row['question_id']]['options'].iloc[0].get(row['ai_answer'], 'Unknown') | |
| if row['ai_answer'] in ['A', 'B', 'C', 'D'] else 'Invalid', axis=1 | |
| ) | |
| # Reorder columns for better display | |
| display_df = display_df[['question_id', 'question', 'category', 'difficulty', 'correct_answer', 'correct_answer_letter', 'ai_answer', 'ai_choice', 'is_correct']] | |
| return summary, display_df | |
| run_benchmark_btn.click( | |
| fn=run_benchmark_wrapper, | |
| inputs=[bench_category, bench_difficulty, num_questions, model_choice], | |
| outputs=[benchmark_summary, results_table] | |
| ) | |
| with gr.Tab("🔍 Debug Single Question"): | |
| with gr.Row(): | |
| debug_question_id = gr.Number( | |
| label="Question ID", | |
| value=450, | |
| precision=0 | |
| ) | |
| debug_model = gr.Dropdown( | |
| choices=app.model_manager.get_flat_model_list(), | |
| value=app.model_manager.get_flat_model_list()[0] if app.model_manager.get_flat_model_list() else None, | |
| label="Model" | |
| ) | |
| debug_btn = gr.Button("Test Single Question", variant="primary") | |
| debug_question_display = gr.Textbox( | |
| label="Question Details", | |
| lines=4, | |
| interactive=False | |
| ) | |
| debug_ai_response = gr.Textbox( | |
| label="Full AI Response", | |
| lines=8, | |
| interactive=False | |
| ) | |
| debug_result = gr.Textbox( | |
| label="Parsed Result", | |
| lines=3, | |
| interactive=False | |
| ) | |
| def debug_single_question(question_id, model): | |
| if not app.model_manager.get_configured_providers(): | |
| return "Please configure API providers first", "", "" | |
| try: | |
| question_id = int(question_id) | |
| matching_questions = app.df[app.df['question_id'] == question_id] | |
| if matching_questions.empty: | |
| return f"No question found with ID {question_id}", "", "" | |
| question_data = matching_questions.iloc[0].to_dict() | |
| question_info = f"""Question ID: {question_id} | |
| Category: {question_data['category']} | |
| Difficulty: {question_data['difficulty']} | |
| Question: {question_data['question']} | |
| Options: | |
| A) {question_data['option_a']} | |
| B) {question_data['option_b']} | |
| C) {question_data['option_c']} | |
| D) {question_data['option_d']} | |
| Correct Answer: {question_data['correct_answer']}""" | |
| result = app.evaluate_single_question(question_id, model) | |
| if "error" in result: | |
| return question_info, "", f"Error: {result['error']}" | |
| ai_response = result.get('ai_response', 'No response') | |
| parsed_result = f"""Extracted Answer: {result.get('ai_answer', 'Unknown')} | |
| Correct Letter: {result.get('correct_answer_letter', 'Unknown')} | |
| Is Correct: {result.get('is_correct', False)} | |
| AI Choice Text: {result.get('options', {}).get(result.get('ai_answer', ''), 'Unknown')}""" | |
| return question_info, ai_response, parsed_result | |
| except Exception as e: | |
| return f"Error processing question: {str(e)}", "", "" | |
| debug_btn.click( | |
| fn=debug_single_question, | |
| inputs=[debug_question_id, debug_model], | |
| outputs=[debug_question_display, debug_ai_response, debug_result] | |
| ) | |
| with gr.Tab("📈 Analytics"): | |
| gr.HTML(""" | |
| <div style="padding: 20px;"> | |
| <h3>Dataset Statistics</h3> | |
| </div> | |
| """) | |
| # Dataset statistics | |
| if not app.df.empty: | |
| stats_html = f""" | |
| <div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; padding: 20px;"> | |
| <div style="background: #f0f0f0; padding: 15px; border-radius: 8px;"> | |
| <h4 style="color: #101010;">Total Questions</h4> | |
| <p style="font-size: 24px; color: #101010; font-weight: bold;">{len(app.df)}</p> | |
| </div> | |
| <div style="background: #f0f0f0; padding: 15px; border-radius: 8px;"> | |
| <h4 style="color: #101010;">Categories</h4> | |
| <p style="font-size: 24px; color: #101010; font-weight: bold;">{len(app.df['category'].unique())}</p> | |
| </div> | |
| <div style="background: #f0f0f0; padding: 15px; border-radius: 8px;"> | |
| <h4 style="color: #101010;">Difficulty Levels</h4> | |
| <p style="font-size: 24px; color: #101010; font-weight: bold;">{len(app.df['difficulty'].unique())}</p> | |
| </div> | |
| </div> | |
| <div style="padding: 20px;"> | |
| <h4>Categories Distribution:</h4> | |
| <ul> | |
| """ | |
| for category, count in app.df['category'].value_counts().items(): | |
| stats_html += f"<li>{category}: {count} questions</li>" | |
| stats_html += """ | |
| </ul> | |
| <h4>Difficulty Distribution:</h4> | |
| <ul> | |
| """ | |
| for difficulty, count in app.df['difficulty'].value_counts().items(): | |
| stats_html += f"<li>{difficulty}: {count} questions</li>" | |
| stats_html += "</ul></div>" | |
| gr.HTML(stats_html) | |
| return interface | |
| # Create and launch the interface | |
| if __name__ == "__main__": | |
| interface = create_gradio_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True, | |
| share=False | |
| ) | |