import os import gradio as gr import requests import pandas as pd from smolagents import ToolCallingAgent, tool from duckduckgo_search import DDGS import math from datetime import datetime import re # --- Enhanced Tools --- @tool def enhanced_search(query: str, num_results: int = 3) -> str: """Performs web search with result filtering and quality checks. Args: query: The search query string to look up. num_results: Number of results to return (default 3). Returns: A formatted string containing the search results or error message. """ try: with DDGS() as ddgs: results = ddgs.text(query, max_results=num_results) filtered = [ f"## {r['title']}\n{r['body']}\nURL: {r['href']}" for r in results if len(r['body']) > 30 and not any( kw in r['title'].lower() for kw in ['advertisement', 'sponsored', 'ad', 'buy'] ) ] return "\n\n".join(filtered) if filtered else "No quality results found." except Exception as e: return f"Search error: {e}" @tool def scientific_calculator(expression: str) -> str: """Evaluates mathematical expressions with scientific functions. Args: expression: The mathematical expression to evaluate. Returns: The result as a string or error message. """ allowed_names = {k: v for k, v in math.__dict__.items() if not k.startswith("__")} try: result = eval(expression, {"__builtins__": {}}, allowed_names) return str(round(result, 6)) if isinstance(result, float) else str(result) except Exception as e: return f"Calculation error: {e}" @tool def get_current_date() -> str: """Gets the current date and time. Returns: Current datetime in YYYY-MM-DD HH:MM:SS format. """ return datetime.now().strftime("%Y-%m-%d %H:%M:%S") @tool def unit_converter(amount: float, from_unit: str, to_unit: str) -> str: """Converts between common measurement units. Args: amount: The numerical value to convert. from_unit: The source unit (e.g., 'miles'). to_unit: The target unit (e.g., 'kilometers'). Returns: The converted value with unit or error message. """ conversions = { ('miles', 'kilometers'): lambda x: x * 1.60934, ('pounds', 'kilograms'): lambda x: x * 0.453592, ('fahrenheit', 'celsius'): lambda x: (x - 32) * 5/9, } key = (from_unit.lower(), to_unit.lower()) if key in conversions: try: result = conversions[key](float(amount)) return f"{round(result, 4)} {to_unit}" except: return "Invalid amount" return f"Unsupported conversion: {from_unit} → {to_unit}" # --- Agent Core --- class GAIAAgent: def __init__(self): self.agent = ToolCallingAgent( name="GAIA-HF-Agent", description="Specialized agent for GAIA tasks", tools=[enhanced_search, scientific_calculator, get_current_date, unit_converter], model="gpt-4-turbo", # or "gpt-3.5-turbo" if unavailable planning_interval=5, max_iterations=10 ) self.session_history = [] def preprocess_question(self, question: str) -> str: """Clean GAIA questions""" question = re.sub(r'\[\d+\]', '', question) # Remove citations question = question.replace("(a)", "").replace("(b)", "") # Remove options return question.strip() def postprocess_answer(self, answer: str) -> str: """Extract most precise answer""" # Extract numbers/dates from longer answers numbers = re.findall(r'\d+\.?\d*', answer) dates = re.findall(r'\d{4}-\d{2}-\d{2}', answer) if dates: return dates[-1] if numbers: return numbers[-1] return answer[:500] # Limit length def __call__(self, question: str) -> str: clean_q = self.preprocess_question(question) print(f"Processing: {clean_q}") try: answer = self.agent.run(clean_q) processed = self.postprocess_answer(answer) self.session_history.append((question, processed)) return processed except Exception as e: return f"Agent error: {str(e)}" # --- HF Space Integration --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" def run_and_submit(profile: gr.OAuthProfile | None): if not profile: return "Please log in to submit", None space_id = os.getenv("SPACE_ID") agent = GAIAAgent() # Fetch questions try: response = requests.get(f"{DEFAULT_API_URL}/questions", timeout=20) questions = response.json() if not questions: return "No questions received", None except Exception as e: return f"Failed to get questions: {e}", None # Process questions results = [] answers = [] for item in questions[:20]: # Limit to 20 for testing task_id = item.get("task_id") question = item.get("question") if not task_id or not question: continue answer = agent(question) results.append({ "Task ID": task_id, "Question": question, "Answer": answer }) answers.append({ "task_id": task_id, "submitted_answer": answer }) # Submit answers try: response = requests.post( f"{DEFAULT_API_URL}/submit", json={ "username": profile.username, "agent_code": f"https://huggingface.co/spaces/{space_id}", "answers": answers }, timeout=60 ) data = response.json() return ( f"✅ Submitted {len(answers)} answers\n" f"Score: {data.get('score', 'N/A')}%\n" f"Correct: {data.get('correct_count', '?')}/{data.get('total_attempted', '?')}\n" f"Message: {data.get('message', '')}", pd.DataFrame(results)) except Exception as e: return f"Submission failed: {e}", pd.DataFrame(results) # --- Gradio UI --- with gr.Blocks(title="GAIA Agent") as demo: gr.Markdown("## 🚀 GAIA Task Agent") gr.Markdown("Login and click submit to run evaluation") login = gr.LoginButton() submit_btn = gr.Button("Run & Submit Answers", variant="primary") status = gr.Textbox(label="Submission Status", interactive=False) results = gr.DataFrame(label="Processed Answers") submit_btn.click( fn=run_and_submit, inputs=None, outputs=[status, results] ) if __name__ == "__main__": demo.launch(debug=True)