|
|
import os |
|
|
import gradio as gr |
|
|
import requests |
|
|
import pandas as pd |
|
|
from smolagents import ToolCallingAgent, tool |
|
|
from duckduckgo_search import DDGS |
|
|
import math |
|
|
from datetime import datetime |
|
|
import re |
|
|
|
|
|
|
|
|
@tool |
|
|
def enhanced_search(query: str, num_results: int = 3) -> str: |
|
|
"""Performs web search with result filtering and quality checks. |
|
|
|
|
|
Args: |
|
|
query: The search query string to look up. |
|
|
num_results: Number of results to return (default 3). |
|
|
|
|
|
Returns: |
|
|
A formatted string containing the search results or error message. |
|
|
""" |
|
|
try: |
|
|
with DDGS() as ddgs: |
|
|
results = ddgs.text(query, max_results=num_results) |
|
|
filtered = [ |
|
|
f"## {r['title']}\n{r['body']}\nURL: {r['href']}" |
|
|
for r in results |
|
|
if len(r['body']) > 30 and not any( |
|
|
kw in r['title'].lower() |
|
|
for kw in ['advertisement', 'sponsored', 'ad', 'buy'] |
|
|
) |
|
|
] |
|
|
return "\n\n".join(filtered) if filtered else "No quality results found." |
|
|
except Exception as e: |
|
|
return f"Search error: {e}" |
|
|
|
|
|
@tool |
|
|
def scientific_calculator(expression: str) -> str: |
|
|
"""Evaluates mathematical expressions with scientific functions. |
|
|
|
|
|
Args: |
|
|
expression: The mathematical expression to evaluate. |
|
|
|
|
|
Returns: |
|
|
The result as a string or error message. |
|
|
""" |
|
|
allowed_names = {k: v for k, v in math.__dict__.items() if not k.startswith("__")} |
|
|
try: |
|
|
result = eval(expression, {"__builtins__": {}}, allowed_names) |
|
|
return str(round(result, 6)) if isinstance(result, float) else str(result) |
|
|
except Exception as e: |
|
|
return f"Calculation error: {e}" |
|
|
|
|
|
@tool |
|
|
def get_current_date() -> str: |
|
|
"""Gets the current date and time. |
|
|
|
|
|
Returns: |
|
|
Current datetime in YYYY-MM-DD HH:MM:SS format. |
|
|
""" |
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
@tool |
|
|
def unit_converter(amount: float, from_unit: str, to_unit: str) -> str: |
|
|
"""Converts between common measurement units. |
|
|
|
|
|
Args: |
|
|
amount: The numerical value to convert. |
|
|
from_unit: The source unit (e.g., 'miles'). |
|
|
to_unit: The target unit (e.g., 'kilometers'). |
|
|
|
|
|
Returns: |
|
|
The converted value with unit or error message. |
|
|
""" |
|
|
conversions = { |
|
|
('miles', 'kilometers'): lambda x: x * 1.60934, |
|
|
('pounds', 'kilograms'): lambda x: x * 0.453592, |
|
|
('fahrenheit', 'celsius'): lambda x: (x - 32) * 5/9, |
|
|
} |
|
|
key = (from_unit.lower(), to_unit.lower()) |
|
|
if key in conversions: |
|
|
try: |
|
|
result = conversions[key](float(amount)) |
|
|
return f"{round(result, 4)} {to_unit}" |
|
|
except: |
|
|
return "Invalid amount" |
|
|
return f"Unsupported conversion: {from_unit} → {to_unit}" |
|
|
|
|
|
|
|
|
|
|
|
class GAIAAgent: |
|
|
def __init__(self): |
|
|
self.agent = ToolCallingAgent( |
|
|
name="GAIA-HF-Agent", |
|
|
description="Specialized agent for GAIA tasks", |
|
|
tools=[enhanced_search, scientific_calculator, get_current_date, unit_converter], |
|
|
model="gpt-4-turbo", |
|
|
planning_interval=5, |
|
|
max_iterations=10 |
|
|
) |
|
|
self.session_history = [] |
|
|
|
|
|
def preprocess_question(self, question: str) -> str: |
|
|
"""Clean GAIA questions""" |
|
|
question = re.sub(r'\[\d+\]', '', question) |
|
|
question = question.replace("(a)", "").replace("(b)", "") |
|
|
return question.strip() |
|
|
|
|
|
def postprocess_answer(self, answer: str) -> str: |
|
|
"""Extract most precise answer""" |
|
|
|
|
|
numbers = re.findall(r'\d+\.?\d*', answer) |
|
|
dates = re.findall(r'\d{4}-\d{2}-\d{2}', answer) |
|
|
if dates: |
|
|
return dates[-1] |
|
|
if numbers: |
|
|
return numbers[-1] |
|
|
return answer[:500] |
|
|
|
|
|
def __call__(self, question: str) -> str: |
|
|
clean_q = self.preprocess_question(question) |
|
|
print(f"Processing: {clean_q}") |
|
|
|
|
|
try: |
|
|
answer = self.agent.run(clean_q) |
|
|
processed = self.postprocess_answer(answer) |
|
|
self.session_history.append((question, processed)) |
|
|
return processed |
|
|
except Exception as e: |
|
|
return f"Agent error: {str(e)}" |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
def run_and_submit(profile: gr.OAuthProfile | None): |
|
|
if not profile: |
|
|
return "Please log in to submit", None |
|
|
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
agent = GAIAAgent() |
|
|
|
|
|
|
|
|
try: |
|
|
response = requests.get(f"{DEFAULT_API_URL}/questions", timeout=20) |
|
|
questions = response.json() |
|
|
if not questions: |
|
|
return "No questions received", None |
|
|
except Exception as e: |
|
|
return f"Failed to get questions: {e}", None |
|
|
|
|
|
|
|
|
results = [] |
|
|
answers = [] |
|
|
for item in questions[:20]: |
|
|
task_id = item.get("task_id") |
|
|
question = item.get("question") |
|
|
if not task_id or not question: |
|
|
continue |
|
|
|
|
|
answer = agent(question) |
|
|
results.append({ |
|
|
"Task ID": task_id, |
|
|
"Question": question, |
|
|
"Answer": answer |
|
|
}) |
|
|
answers.append({ |
|
|
"task_id": task_id, |
|
|
"submitted_answer": answer |
|
|
}) |
|
|
|
|
|
|
|
|
try: |
|
|
response = requests.post( |
|
|
f"{DEFAULT_API_URL}/submit", |
|
|
json={ |
|
|
"username": profile.username, |
|
|
"agent_code": f"https://huggingface.co/spaces/{space_id}", |
|
|
"answers": answers |
|
|
}, |
|
|
timeout=60 |
|
|
) |
|
|
data = response.json() |
|
|
return ( |
|
|
f"✅ Submitted {len(answers)} answers\n" |
|
|
f"Score: {data.get('score', 'N/A')}%\n" |
|
|
f"Correct: {data.get('correct_count', '?')}/{data.get('total_attempted', '?')}\n" |
|
|
f"Message: {data.get('message', '')}", |
|
|
pd.DataFrame(results)) |
|
|
except Exception as e: |
|
|
return f"Submission failed: {e}", pd.DataFrame(results) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="GAIA Agent") as demo: |
|
|
gr.Markdown("## 🚀 GAIA Task Agent") |
|
|
gr.Markdown("Login and click submit to run evaluation") |
|
|
|
|
|
login = gr.LoginButton() |
|
|
submit_btn = gr.Button("Run & Submit Answers", variant="primary") |
|
|
|
|
|
status = gr.Textbox(label="Submission Status", interactive=False) |
|
|
results = gr.DataFrame(label="Processed Answers") |
|
|
|
|
|
submit_btn.click( |
|
|
fn=run_and_submit, |
|
|
inputs=None, |
|
|
outputs=[status, results] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(debug=True) |