File size: 6,908 Bytes
10e9b7d 4c934c3 1968b68 43ab812 91f5922 012ef3f 30b3077 4c934c3 30b3077 9925072 012ef3f 0dd84e4 91f5922 012ef3f 30b3077 012ef3f 91f5922 012ef3f 0dd84e4 012ef3f 30b3077 012ef3f 30b3077 012ef3f 0c36fa7 30b3077 012ef3f 0dd84e4 012ef3f 94d642e 30b3077 012ef3f 0dd84e4 012ef3f 30b3077 012ef3f 0dd84e4 012ef3f 94d642e 012ef3f 30b3077 16da5cd 012ef3f 16da5cd 012ef3f 30b3077 012ef3f 326bc46 012ef3f 94d642e 012ef3f 94d642e 012ef3f 91f5922 012ef3f 94d642e 012ef3f 0c36fa7 012ef3f 94d642e 0c36fa7 94d642e 30b3077 012ef3f 94d642e 012ef3f 94d642e 012ef3f d7c91b6 94d642e 012ef3f d7c91b6 012ef3f 30b3077 012ef3f 30b3077 012ef3f 30b3077 012ef3f 30b3077 94d642e 012ef3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import os
import gradio as gr
import requests
import pandas as pd
from smolagents import ToolCallingAgent, tool
from duckduckgo_search import DDGS
import math
from datetime import datetime
import re
# --- Enhanced Tools ---
@tool
def enhanced_search(query: str, num_results: int = 3) -> str:
"""Performs web search with result filtering and quality checks.
Args:
query: The search query string to look up.
num_results: Number of results to return (default 3).
Returns:
A formatted string containing the search results or error message.
"""
try:
with DDGS() as ddgs:
results = ddgs.text(query, max_results=num_results)
filtered = [
f"## {r['title']}\n{r['body']}\nURL: {r['href']}"
for r in results
if len(r['body']) > 30 and not any(
kw in r['title'].lower()
for kw in ['advertisement', 'sponsored', 'ad', 'buy']
)
]
return "\n\n".join(filtered) if filtered else "No quality results found."
except Exception as e:
return f"Search error: {e}"
@tool
def scientific_calculator(expression: str) -> str:
"""Evaluates mathematical expressions with scientific functions.
Args:
expression: The mathematical expression to evaluate.
Returns:
The result as a string or error message.
"""
allowed_names = {k: v for k, v in math.__dict__.items() if not k.startswith("__")}
try:
result = eval(expression, {"__builtins__": {}}, allowed_names)
return str(round(result, 6)) if isinstance(result, float) else str(result)
except Exception as e:
return f"Calculation error: {e}"
@tool
def get_current_date() -> str:
"""Gets the current date and time.
Returns:
Current datetime in YYYY-MM-DD HH:MM:SS format.
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@tool
def unit_converter(amount: float, from_unit: str, to_unit: str) -> str:
"""Converts between common measurement units.
Args:
amount: The numerical value to convert.
from_unit: The source unit (e.g., 'miles').
to_unit: The target unit (e.g., 'kilometers').
Returns:
The converted value with unit or error message.
"""
conversions = {
('miles', 'kilometers'): lambda x: x * 1.60934,
('pounds', 'kilograms'): lambda x: x * 0.453592,
('fahrenheit', 'celsius'): lambda x: (x - 32) * 5/9,
}
key = (from_unit.lower(), to_unit.lower())
if key in conversions:
try:
result = conversions[key](float(amount))
return f"{round(result, 4)} {to_unit}"
except:
return "Invalid amount"
return f"Unsupported conversion: {from_unit} → {to_unit}"
# --- Agent Core ---
class GAIAAgent:
def __init__(self):
self.agent = ToolCallingAgent(
name="GAIA-HF-Agent",
description="Specialized agent for GAIA tasks",
tools=[enhanced_search, scientific_calculator, get_current_date, unit_converter],
model="gpt-4-turbo", # or "gpt-3.5-turbo" if unavailable
planning_interval=5,
max_iterations=10
)
self.session_history = []
def preprocess_question(self, question: str) -> str:
"""Clean GAIA questions"""
question = re.sub(r'\[\d+\]', '', question) # Remove citations
question = question.replace("(a)", "").replace("(b)", "") # Remove options
return question.strip()
def postprocess_answer(self, answer: str) -> str:
"""Extract most precise answer"""
# Extract numbers/dates from longer answers
numbers = re.findall(r'\d+\.?\d*', answer)
dates = re.findall(r'\d{4}-\d{2}-\d{2}', answer)
if dates:
return dates[-1]
if numbers:
return numbers[-1]
return answer[:500] # Limit length
def __call__(self, question: str) -> str:
clean_q = self.preprocess_question(question)
print(f"Processing: {clean_q}")
try:
answer = self.agent.run(clean_q)
processed = self.postprocess_answer(answer)
self.session_history.append((question, processed))
return processed
except Exception as e:
return f"Agent error: {str(e)}"
# --- HF Space Integration ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
def run_and_submit(profile: gr.OAuthProfile | None):
if not profile:
return "Please log in to submit", None
space_id = os.getenv("SPACE_ID")
agent = GAIAAgent()
# Fetch questions
try:
response = requests.get(f"{DEFAULT_API_URL}/questions", timeout=20)
questions = response.json()
if not questions:
return "No questions received", None
except Exception as e:
return f"Failed to get questions: {e}", None
# Process questions
results = []
answers = []
for item in questions[:20]: # Limit to 20 for testing
task_id = item.get("task_id")
question = item.get("question")
if not task_id or not question:
continue
answer = agent(question)
results.append({
"Task ID": task_id,
"Question": question,
"Answer": answer
})
answers.append({
"task_id": task_id,
"submitted_answer": answer
})
# Submit answers
try:
response = requests.post(
f"{DEFAULT_API_URL}/submit",
json={
"username": profile.username,
"agent_code": f"https://huggingface.co/spaces/{space_id}",
"answers": answers
},
timeout=60
)
data = response.json()
return (
f"✅ Submitted {len(answers)} answers\n"
f"Score: {data.get('score', 'N/A')}%\n"
f"Correct: {data.get('correct_count', '?')}/{data.get('total_attempted', '?')}\n"
f"Message: {data.get('message', '')}",
pd.DataFrame(results))
except Exception as e:
return f"Submission failed: {e}", pd.DataFrame(results)
# --- Gradio UI ---
with gr.Blocks(title="GAIA Agent") as demo:
gr.Markdown("## 🚀 GAIA Task Agent")
gr.Markdown("Login and click submit to run evaluation")
login = gr.LoginButton()
submit_btn = gr.Button("Run & Submit Answers", variant="primary")
status = gr.Textbox(label="Submission Status", interactive=False)
results = gr.DataFrame(label="Processed Answers")
submit_btn.click(
fn=run_and_submit,
inputs=None,
outputs=[status, results]
)
if __name__ == "__main__":
demo.launch(debug=True) |