|
|
import os |
|
|
import gradio as gr |
|
|
import requests |
|
|
import pandas as pd |
|
|
from smolagents import ToolCallingAgent, tool |
|
|
from duckduckgo_search import DDGS |
|
|
import math |
|
|
import openai |
|
|
import re |
|
|
import json |
|
|
from datetime import datetime, timedelta |
|
|
import time |
|
|
|
|
|
|
|
|
@tool |
|
|
def duck_search(query: str) -> str: |
|
|
""" |
|
|
Searches the web using DuckDuckGo and returns detailed information. |
|
|
|
|
|
Args: |
|
|
query: The search query string. |
|
|
|
|
|
Returns: |
|
|
A string with comprehensive search results including titles, snippets, and URLs. |
|
|
""" |
|
|
try: |
|
|
with DDGS() as ddgs: |
|
|
results = ddgs.text(query, max_results=5) |
|
|
if not results: |
|
|
return "No results found." |
|
|
|
|
|
formatted_results = [] |
|
|
for i, r in enumerate(results, 1): |
|
|
formatted_results.append( |
|
|
f"Result {i}:\n" |
|
|
f"Title: {r['title']}\n" |
|
|
f"Content: {r['body']}\n" |
|
|
f"URL: {r['href']}\n" |
|
|
f"---" |
|
|
) |
|
|
return "\n".join(formatted_results) |
|
|
except Exception as e: |
|
|
return f"Search error: {e}" |
|
|
|
|
|
@tool |
|
|
def focused_search(query: str, topic: str = "") -> str: |
|
|
""" |
|
|
Performs a more focused search with specific keywords for better results. |
|
|
|
|
|
Args: |
|
|
query: The main search query |
|
|
topic: Additional topic context to improve search accuracy |
|
|
|
|
|
Returns: |
|
|
Focused search results |
|
|
""" |
|
|
try: |
|
|
|
|
|
enhanced_query = f"{query} {topic}".strip() |
|
|
|
|
|
with DDGS() as ddgs: |
|
|
results = ddgs.text(enhanced_query, max_results=3) |
|
|
if not results: |
|
|
|
|
|
results = ddgs.text(query, max_results=3) |
|
|
|
|
|
if not results: |
|
|
return "No results found for focused search." |
|
|
|
|
|
summaries = [] |
|
|
for r in results: |
|
|
summaries.append(f"**{r['title']}**\n{r['body']}\nSource: {r['href']}") |
|
|
|
|
|
return "\n\n".join(summaries) |
|
|
except Exception as e: |
|
|
return f"Focused search error: {e}" |
|
|
|
|
|
@tool |
|
|
def advanced_calculator(expression: str) -> str: |
|
|
""" |
|
|
Enhanced calculator with support for complex mathematical operations. |
|
|
|
|
|
Args: |
|
|
expression: A mathematical expression or calculation |
|
|
|
|
|
Returns: |
|
|
The calculated result with detailed steps when possible |
|
|
""" |
|
|
try: |
|
|
|
|
|
expression = expression.strip() |
|
|
|
|
|
|
|
|
safe_dict = { |
|
|
"__builtins__": {}, |
|
|
**math.__dict__, |
|
|
"abs": abs, |
|
|
"round": round, |
|
|
"min": min, |
|
|
"max": max, |
|
|
"sum": sum, |
|
|
"pow": pow, |
|
|
} |
|
|
|
|
|
|
|
|
result = eval(expression, safe_dict) |
|
|
|
|
|
|
|
|
if isinstance(result, float): |
|
|
if result.is_integer(): |
|
|
return str(int(result)) |
|
|
else: |
|
|
return f"{result:.10g}" |
|
|
|
|
|
return str(result) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if "%" in expression: |
|
|
try: |
|
|
|
|
|
expr_mod = expression.replace("%", "/100") |
|
|
result = eval(expr_mod, safe_dict) |
|
|
return str(result) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return f"Calculation error: {e}. Please check the mathematical expression." |
|
|
|
|
|
@tool |
|
|
def date_calculator(date_expression: str) -> str: |
|
|
""" |
|
|
Calculates dates, time differences, and handles date-related queries. |
|
|
|
|
|
Args: |
|
|
date_expression: A date calculation or query |
|
|
|
|
|
Returns: |
|
|
The calculated date or time difference |
|
|
""" |
|
|
try: |
|
|
current_date = datetime.now() |
|
|
|
|
|
|
|
|
if "days ago" in date_expression.lower(): |
|
|
days_match = re.search(r'(\d+)\s*days?\s*ago', date_expression.lower()) |
|
|
if days_match: |
|
|
days = int(days_match.group(1)) |
|
|
target_date = current_date - timedelta(days=days) |
|
|
return target_date.strftime("%Y-%m-%d (%A)") |
|
|
|
|
|
elif "days from now" in date_expression.lower(): |
|
|
days_match = re.search(r'(\d+)\s*days?\s*from\s*now', date_expression.lower()) |
|
|
if days_match: |
|
|
days = int(days_match.group(1)) |
|
|
target_date = current_date + timedelta(days=days) |
|
|
return target_date.strftime("%Y-%m-%d (%A)") |
|
|
|
|
|
elif "weeks ago" in date_expression.lower(): |
|
|
weeks_match = re.search(r'(\d+)\s*weeks?\s*ago', date_expression.lower()) |
|
|
if weeks_match: |
|
|
weeks = int(weeks_match.group(1)) |
|
|
target_date = current_date - timedelta(weeks=weeks) |
|
|
return target_date.strftime("%Y-%m-%d (%A)") |
|
|
|
|
|
|
|
|
elif "today" in date_expression.lower() or "current date" in date_expression.lower(): |
|
|
return current_date.strftime("%Y-%m-%d (%A)") |
|
|
|
|
|
return f"Current date: {current_date.strftime('%Y-%m-%d (%A)')}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Date calculation error: {e}" |
|
|
|
|
|
@tool |
|
|
def text_analyzer(text: str) -> str: |
|
|
""" |
|
|
Analyzes text for patterns, extracts information, and provides insights. |
|
|
|
|
|
Args: |
|
|
text: The text to analyze |
|
|
|
|
|
Returns: |
|
|
Analysis results including word count, patterns, and extracted information |
|
|
""" |
|
|
try: |
|
|
if not text: |
|
|
return "No text provided for analysis." |
|
|
|
|
|
|
|
|
word_count = len(text.split()) |
|
|
char_count = len(text) |
|
|
sentence_count = len([s for s in text.split('.') if s.strip()]) |
|
|
|
|
|
|
|
|
numbers = re.findall(r'-?\d+(?:\.\d+)?', text) |
|
|
|
|
|
|
|
|
date_patterns = re.findall(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', text) |
|
|
|
|
|
|
|
|
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text) |
|
|
|
|
|
analysis = f"Text Analysis:\n" |
|
|
analysis += f"- Words: {word_count}\n" |
|
|
analysis += f"- Characters: {char_count}\n" |
|
|
analysis += f"- Sentences: {sentence_count}\n" |
|
|
|
|
|
if numbers: |
|
|
analysis += f"- Numbers found: {', '.join(numbers[:10])}{'...' if len(numbers) > 10 else ''}\n" |
|
|
|
|
|
if date_patterns: |
|
|
analysis += f"- Dates found: {', '.join(date_patterns)}\n" |
|
|
|
|
|
if emails: |
|
|
analysis += f"- Emails found: {', '.join(emails)}\n" |
|
|
|
|
|
return analysis |
|
|
|
|
|
except Exception as e: |
|
|
return f"Text analysis error: {e}" |
|
|
|
|
|
|
|
|
class ImprovedWebSearchAgent: |
|
|
def __init__(self): |
|
|
"""Initialize the enhanced agent with better reasoning capabilities.""" |
|
|
|
|
|
|
|
|
model_name = "gpt-4o-mini" |
|
|
|
|
|
|
|
|
system_prompt = """You are an advanced AI assistant designed to solve complex problems by breaking them down systematically. |
|
|
|
|
|
Key capabilities: |
|
|
1. **Multi-step Reasoning**: Break complex problems into smaller, manageable steps |
|
|
2. **Information Synthesis**: Combine information from multiple sources |
|
|
3. **Verification**: Double-check calculations and facts |
|
|
4. **Context Awareness**: Understand the broader context of questions |
|
|
|
|
|
Problem-solving approach: |
|
|
1. Analyze the question carefully to understand what's being asked |
|
|
2. Identify what information you need to find |
|
|
3. Use available tools strategically (search, calculate, analyze) |
|
|
4. Verify your findings and reasoning |
|
|
5. Provide a clear, accurate answer |
|
|
|
|
|
When using tools: |
|
|
- Use focused_search for specific factual information |
|
|
- Use duck_search for broader context |
|
|
- Use advanced_calculator for any mathematical operations |
|
|
- Use date_calculator for time-related queries |
|
|
- Use text_analyzer when you need to extract information from text |
|
|
|
|
|
Always think step-by-step and explain your reasoning process.""" |
|
|
|
|
|
try: |
|
|
self.agent = ToolCallingAgent( |
|
|
name="ImprovedGAIAAgent", |
|
|
description=system_prompt, |
|
|
tools=[duck_search, focused_search, advanced_calculator, date_calculator, text_analyzer], |
|
|
model=model_name, |
|
|
planning_interval=3, |
|
|
) |
|
|
print(f"โ
Enhanced agent initialized with {model_name}") |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Error initializing with {model_name}, trying fallback...") |
|
|
try: |
|
|
self.agent = ToolCallingAgent( |
|
|
name="ImprovedGAIAAgent", |
|
|
description=system_prompt, |
|
|
tools=[duck_search, focused_search, advanced_calculator, date_calculator, text_analyzer], |
|
|
model="gpt-3.5-turbo", |
|
|
planning_interval=3, |
|
|
) |
|
|
print("โ
Enhanced agent initialized with gpt-3.5-turbo") |
|
|
except Exception as e2: |
|
|
print(f"โ Agent initialization failed: {e2}") |
|
|
raise e2 |
|
|
|
|
|
def __call__(self, question: str) -> str: |
|
|
""" |
|
|
Process a question with enhanced reasoning and error handling. |
|
|
|
|
|
Args: |
|
|
question: The question to answer |
|
|
|
|
|
Returns: |
|
|
A comprehensive answer |
|
|
""" |
|
|
print(f"๐ Processing question: {question}") |
|
|
|
|
|
try: |
|
|
|
|
|
question_lower = question.lower() |
|
|
|
|
|
|
|
|
enhanced_question = self._enhance_question(question) |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
max_time = 120 |
|
|
|
|
|
result = self.agent.run(enhanced_question) |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
|
print(f"โฑ๏ธ Question processed in {elapsed_time:.1f} seconds") |
|
|
|
|
|
|
|
|
final_answer = self._post_process_answer(result, question) |
|
|
|
|
|
return final_answer |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Agent error: {e}") |
|
|
|
|
|
return self._fallback_answer(question, str(e)) |
|
|
|
|
|
def _enhance_question(self, question: str) -> str: |
|
|
"""Add context and instructions to improve question processing.""" |
|
|
|
|
|
enhanced = f"""Please solve this step by step: |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Instructions: |
|
|
1. Read the question carefully and identify what type of answer is needed |
|
|
2. Break down complex problems into steps |
|
|
3. Use the available tools to gather information or perform calculations |
|
|
4. Verify your answer makes sense |
|
|
5. Provide a clear, concise final answer |
|
|
|
|
|
If this is a factual question, search for current information. |
|
|
If this involves calculations, show your work. |
|
|
If this requires multiple steps, explain each step clearly.""" |
|
|
|
|
|
return enhanced |
|
|
|
|
|
def _post_process_answer(self, result: str, original_question: str) -> str: |
|
|
"""Clean and improve the agent's response.""" |
|
|
|
|
|
if not result or len(result.strip()) < 10: |
|
|
return f"I need more information to properly answer: {original_question}" |
|
|
|
|
|
|
|
|
result = result.strip() |
|
|
|
|
|
|
|
|
if "final answer" not in result.lower() and "answer:" not in result.lower(): |
|
|
|
|
|
lines = result.split('\n') |
|
|
if lines: |
|
|
|
|
|
best_line = max(lines, key=len, default=result) |
|
|
if len(best_line) > 20: |
|
|
result = f"{result}\n\nFinal Answer: {best_line}" |
|
|
|
|
|
return result |
|
|
|
|
|
def _fallback_answer(self, question: str, error: str) -> str: |
|
|
"""Provide a fallback response when the main agent fails.""" |
|
|
|
|
|
question_lower = question.lower() |
|
|
|
|
|
|
|
|
if any(word in question_lower for word in ['calculate', 'math', '+', '-', '*', '/', 'equals']): |
|
|
return f"This appears to be a mathematical question. Error occurred: {error}. Please verify the calculation manually." |
|
|
|
|
|
elif any(word in question_lower for word in ['when', 'date', 'year', 'time']): |
|
|
return f"This appears to be a date/time related question. Error occurred: {error}. Please search for current information." |
|
|
|
|
|
elif any(word in question_lower for word in ['who', 'what', 'where', 'how']): |
|
|
return f"This appears to be a factual question. Error occurred: {error}. Please search for current information." |
|
|
|
|
|
else: |
|
|
return f"I encountered an error while processing your question: {error}. Please try rephrasing your question." |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
|
space_id = os.getenv("SPACE_ID") |
|
|
if profile: |
|
|
username = profile.username |
|
|
print(f"๐ค User: {username}") |
|
|
else: |
|
|
return "Please login to Hugging Face.", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
questions_url = f"{DEFAULT_API_URL}/questions" |
|
|
submit_url = f"{DEFAULT_API_URL}/submit" |
|
|
|
|
|
try: |
|
|
agent = ImprovedWebSearchAgent() |
|
|
except Exception as e: |
|
|
return f"Agent initialization error: {e}", None |
|
|
|
|
|
try: |
|
|
response = requests.get(questions_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
questions = response.json() |
|
|
if not questions: |
|
|
return "No questions received.", None |
|
|
|
|
|
print(f"๐ Received {len(questions)} questions") |
|
|
|
|
|
except Exception as e: |
|
|
return f"Failed to fetch questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
|
answers_payload = [] |
|
|
|
|
|
for i, item in enumerate(questions, 1): |
|
|
task_id = item.get("task_id") |
|
|
question = item.get("question") |
|
|
|
|
|
if not task_id or not question: |
|
|
continue |
|
|
|
|
|
print(f"\n๐ Processing question {i}/{len(questions)}: {task_id}") |
|
|
|
|
|
try: |
|
|
answer = agent(question) |
|
|
|
|
|
|
|
|
if not answer or len(answer.strip()) < 2: |
|
|
answer = "Unable to determine answer from available information." |
|
|
|
|
|
results_log.append({ |
|
|
"Task ID": task_id, |
|
|
"Question": question[:100] + "..." if len(question) > 100 else question, |
|
|
"Submitted Answer": answer[:200] + "..." if len(answer) > 200 else answer |
|
|
}) |
|
|
|
|
|
answers_payload.append({ |
|
|
"task_id": task_id, |
|
|
"submitted_answer": answer |
|
|
}) |
|
|
|
|
|
print(f"โ
Answer generated for {task_id}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Agent error: {str(e)[:100]}" |
|
|
print(f"โ Error for {task_id}: {error_msg}") |
|
|
|
|
|
results_log.append({ |
|
|
"Task ID": task_id, |
|
|
"Question": question[:100] + "..." if len(question) > 100 else question, |
|
|
"Submitted Answer": error_msg |
|
|
}) |
|
|
|
|
|
answers_payload.append({ |
|
|
"task_id": task_id, |
|
|
"submitted_answer": "Error processing question" |
|
|
}) |
|
|
|
|
|
if not answers_payload: |
|
|
return "No answers were generated.", pd.DataFrame(results_log) |
|
|
|
|
|
print(f"\n๐ Submitting {len(answers_payload)} answers...") |
|
|
|
|
|
try: |
|
|
response = requests.post(submit_url, json={ |
|
|
"username": username.strip(), |
|
|
"agent_code": agent_code, |
|
|
"answers": answers_payload |
|
|
}, timeout=120) |
|
|
|
|
|
response.raise_for_status() |
|
|
result = response.json() |
|
|
|
|
|
score = result.get('score', 0) |
|
|
correct_count = result.get('correct_count', 0) |
|
|
total_attempted = result.get('total_attempted', len(answers_payload)) |
|
|
|
|
|
status = ( |
|
|
f"โ
Submission Successful!\n" |
|
|
f"User: {result.get('username')}\n" |
|
|
f"Score: {score}% ({correct_count}/{total_attempted} correct)\n" |
|
|
f"Message: {result.get('message', 'No message')}\n" |
|
|
f"Total questions processed: {len(questions)}" |
|
|
) |
|
|
|
|
|
print(f"๐ฏ Final Score: {score}%") |
|
|
|
|
|
return status, pd.DataFrame(results_log) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"โ Submission failed: {e}" |
|
|
print(error_msg) |
|
|
return error_msg, pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Enhanced GAIA Agent") as demo: |
|
|
gr.Markdown("# ๐ค Enhanced GAIA Agent with Advanced Reasoning") |
|
|
gr.Markdown(""" |
|
|
**Improvements in this version:** |
|
|
- ๐ง Enhanced multi-step reasoning capabilities |
|
|
- ๐ Multiple specialized search tools |
|
|
- ๐งฎ Advanced calculator with better math support |
|
|
- ๐
Date and time calculation tools |
|
|
- ๐ Text analysis capabilities |
|
|
- โก Better error handling and fallback mechanisms |
|
|
- ๐ฏ Optimized for GAIA benchmark performance |
|
|
""") |
|
|
|
|
|
gr.LoginButton() |
|
|
|
|
|
with gr.Row(): |
|
|
run_btn = gr.Button("๐ Run Enhanced Evaluation & Submit", variant="primary", scale=2) |
|
|
|
|
|
status_box = gr.Textbox(label="๐ Status & Results", lines=8, interactive=False) |
|
|
result_table = gr.DataFrame(label="๐ Agent Answers Log", interactive=False) |
|
|
|
|
|
run_btn.click( |
|
|
fn=run_and_submit_all, |
|
|
outputs=[status_box, result_table], |
|
|
show_progress=True |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(debug=True, share=False) |