final_agent_course / agent.py
tuan3335's picture
🤖 Add LangGraph AI Agent with Qwen3-8B
75849d9
raw
history blame
19.2 kB
"""
AI AGENT WITH LANGGRAPH + AI-DRIVEN TOOL CALLING
Flow:
1. AI phân loại câu hỏi và quyết định tool
2. LangGraph nodes thực hiện tools
3. AI quyết định tiếp tục hoặc kết thúc
4. Qwen3-8B làm main reasoning engine
Architecture:
- Qwen3-8B via HuggingFace InferenceClient
- LangGraph workflow với dynamic routing
- AI-powered decision making (không hardcode)
"""
import os
import json
import tempfile
import requests
from typing import List, Dict, Any, Annotated
from dotenv import load_dotenv
# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
# HuggingFace imports
from huggingface_hub import InferenceClient
# Other imports
import wikipedia
from PIL import Image
import pandas as pd
import yt_dlp
from groq import Groq
# OCR alternative - fallback to basic image processing
try:
import easyocr
OCR_AVAILABLE = True
except ImportError:
OCR_AVAILABLE = False
print("⚠️ EasyOCR not available, using fallback image processing")
# Load environment
load_dotenv()
# =============================================================================
# STATE DEFINITION
# =============================================================================
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
question: str
task_id: str
file_name: str
ai_decision: Dict[str, Any] # AI's decision about what to do
tool_results: Dict[str, Any]
answer: str
continue_workflow: bool
# =============================================================================
# QWEN3-8B AI BRAIN
# =============================================================================
class Qwen3Brain:
"""Main AI brain using Qwen3-8B for all decisions"""
def __init__(self):
self.client = InferenceClient(
provider="auto",
api_key=os.environ.get("HF_TOKEN", "")
)
self.model_name = "Qwen/Qwen3-8B"
print("🧠 Qwen3-8B AI Brain initialized")
def think(self, prompt: str) -> str:
"""Main thinking function"""
try:
completion = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": prompt
}
],
max_tokens=2048,
temperature=0.6
)
return completion.choices[0].message.content
except Exception as e:
return f"AI Error: {str(e)}"
def decide_action(self, question: str, task_id: str = "", file_name: str = "") -> Dict[str, Any]:
"""AI decides what action to take"""
prompt = f"""You are an intelligent AI agent. Analyze this question and decide the next action.
Question: {question}
Task ID: {task_id}
File name: {file_name}
Available actions:
1. "answer_directly" - if you can answer without tools
2. "transcribe_audio" - for audio files
3. "ocr_image" - for images with text
4. "read_file" - for Python/Excel/text files
5. "search_wikipedia" - for factual information
6. "calculate_math" - for math calculations
7. "get_youtube" - for YouTube videos
8. "download_file" - to get files from API
Respond in JSON format:
{{
"action": "action_name",
"reasoning": "why you chose this",
"params": "parameters needed (if any)",
"can_answer_now": true/false
}}
Be decisive and clear about your choice."""
try:
response = self.think(prompt)
# Try to parse JSON
return json.loads(response)
except:
# Fallback if JSON parsing fails
return {
"action": "answer_directly",
"reasoning": "JSON parsing failed, answering directly",
"params": "",
"can_answer_now": True
}
def final_answer(self, question: str, tool_results: Dict[str, Any]) -> str:
"""Generate final answer based on question and tool results"""
prompt = f"""Generate the final answer based on the question and any tool results.
Question: {question}
Tool results: {json.dumps(tool_results, indent=2)}
Provide a clear, direct answer to the original question. Use the tool results if available."""
return self.think(prompt)
# =============================================================================
# TOOLS AS LANGGRAPH NODES
# =============================================================================
# Initialize components
ai_brain = Qwen3Brain()
# Initialize Groq client with error handling
try:
groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY", ""))
print("✅ Groq client initialized")
except Exception as e:
print(f"⚠️ Groq client initialization failed: {e}")
groq_client = None
# Initialize OCR with fallback
if OCR_AVAILABLE:
ocr_reader = easyocr.Reader(['en'])
else:
ocr_reader = None
def ai_decision_node(state: AgentState) -> AgentState:
"""AI decides what to do next"""
question = state["question"]
task_id = state.get("task_id", "")
file_name = state.get("file_name", "")
decision = ai_brain.decide_action(question, task_id, file_name)
state["ai_decision"] = decision
print(f"🧠 AI Decision: {decision['action']} - {decision['reasoning']}")
return state
def answer_directly_node(state: AgentState) -> AgentState:
"""Answer question directly without tools"""
question = state["question"]
prompt = f"Answer this question directly: {question}"
answer = ai_brain.think(prompt)
state["answer"] = answer
state["continue_workflow"] = False
return state
def transcribe_audio_node(state: AgentState) -> AgentState:
"""Transcribe audio files"""
task_id = state.get("task_id", "")
try:
# Download file
file_path = download_file(task_id)
if not file_path.startswith("Error") and groq_client:
# Transcribe
with open(file_path, "rb") as f:
transcription = groq_client.audio.transcriptions.create(
file=(file_path, f.read()),
model="whisper-large-v3-turbo",
response_format="text",
language="en"
)
result = transcription.text
elif not groq_client:
result = "Audio transcription not available - Groq client not initialized"
else:
result = file_path
state["tool_results"]["audio_transcript"] = result
except Exception as e:
state["tool_results"]["audio_transcript"] = f"Audio error: {str(e)}"
state["continue_workflow"] = True
return state
def ocr_image_node(state: AgentState) -> AgentState:
"""Extract text from images"""
task_id = state.get("task_id", "")
try:
# Download file
file_path = download_file(task_id)
if not file_path.startswith("Error"):
if OCR_AVAILABLE and ocr_reader:
# Use EasyOCR
results = ocr_reader.readtext(file_path)
text = " ".join([result[1] for result in results])
result = text if text.strip() else "No text found"
else:
# Fallback: Basic image info
try:
img = Image.open(file_path)
result = f"Image info: {img.format} {img.size} {img.mode} - OCR not available, please describe the image content"
except:
result = "Image file detected but cannot process without OCR"
else:
result = file_path
state["tool_results"]["ocr_text"] = result
except Exception as e:
state["tool_results"]["ocr_text"] = f"OCR error: {str(e)}"
state["continue_workflow"] = True
return state
def read_file_node(state: AgentState) -> AgentState:
"""Read various file types"""
task_id = state.get("task_id", "")
try:
# Download file
file_path = download_file(task_id)
if not file_path.startswith("Error"):
# Read based on file type
if file_path.endswith('.py'):
with open(file_path, 'r', encoding='utf-8') as f:
result = f"Python code:\n{f.read()}"
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
result = f"Excel data:\n{df.to_string()}"
elif file_path.endswith('.csv'):
df = pd.read_csv(file_path)
result = f"CSV data:\n{df.to_string()}"
else:
with open(file_path, 'r', encoding='utf-8') as f:
result = f"File content:\n{f.read()}"
else:
result = file_path
state["tool_results"]["file_content"] = result
except Exception as e:
state["tool_results"]["file_content"] = f"File reading error: {str(e)}"
state["continue_workflow"] = True
return state
def search_wikipedia_node(state: AgentState) -> AgentState:
"""Search Wikipedia"""
question = state["question"]
params = state["ai_decision"].get("params", "")
# Use AI to determine search query if params not provided
if not params:
query_prompt = f"Extract the main search term for Wikipedia from: '{question}'. Return only the search term."
search_query = ai_brain.think(query_prompt).strip()
else:
search_query = params
try:
wikipedia.set_lang("en")
page = wikipedia.page(search_query)
result = f"Title: {page.title}\nSummary: {page.summary[:2000]}"
except:
try:
results = wikipedia.search(search_query, results=1)
if results:
page = wikipedia.page(results[0])
result = f"Title: {page.title}\nSummary: {page.summary[:2000]}"
else:
result = f"No Wikipedia results for: {search_query}"
except:
result = f"Wikipedia search failed for: {search_query}"
state["tool_results"]["wikipedia"] = result
state["continue_workflow"] = True
return state
def calculate_math_node(state: AgentState) -> AgentState:
"""Perform math calculations"""
question = state["question"]
# Extract math expression using AI
extract_prompt = f"Extract ONLY the mathematical expression from: '{question}'. Return just the expression like '15+27'."
expression = ai_brain.think(extract_prompt).strip()
# Clean expression
import re
cleaned = re.findall(r'[\d+\-*/\(\)\.\s]+', expression)
if cleaned:
expression = cleaned[0].strip()
try:
# Safe evaluation
allowed_chars = set('0123456789+-*/.() ')
if all(c in allowed_chars for c in expression):
result = str(eval(expression))
else:
result = "Invalid mathematical expression"
except Exception as e:
result = f"Calculation error: {str(e)}"
state["tool_results"]["calculation"] = result
state["continue_workflow"] = True
return state
def get_youtube_node(state: AgentState) -> AgentState:
"""Get YouTube video info"""
params = state["ai_decision"].get("params", "")
try:
ydl_opts = {
'writesubtitles': True,
'writeautomaticsub': True,
'subtitleslangs': ['en'],
'skip_download': True,
'quiet': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(params, download=False)
title = info.get('title', 'Unknown')
description = info.get('description', 'No description')[:500]
result = f"Video: {title}\nDescription: {description}"
except Exception as e:
result = f"YouTube error: {str(e)}"
state["tool_results"]["youtube"] = result
state["continue_workflow"] = True
return state
def download_file_node(state: AgentState) -> AgentState:
"""Download file from API"""
task_id = state.get("task_id", "")
try:
result = download_file(task_id)
state["tool_results"]["downloaded_file"] = result
except Exception as e:
state["tool_results"]["downloaded_file"] = f"Download error: {str(e)}"
state["continue_workflow"] = True
return state
def final_answer_node(state: AgentState) -> AgentState:
"""Generate final answer using AI"""
question = state["question"]
tool_results = state.get("tool_results", {})
answer = ai_brain.final_answer(question, tool_results)
state["answer"] = answer
state["continue_workflow"] = False
return state
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def download_file(task_id: str) -> str:
"""Download file from API"""
try:
api_url = "https://agents-course-unit4-scoring.hf.space"
file_url = f"{api_url}/files/{task_id}"
response = requests.get(file_url, timeout=30)
if response.status_code == 200:
# Determine file extension
content_type = response.headers.get('content-type', '')
if 'audio' in content_type:
suffix = '.mp3'
elif 'image' in content_type:
suffix = '.png'
elif 'excel' in content_type:
suffix = '.xlsx'
elif 'python' in content_type:
suffix = '.py'
else:
suffix = '.tmp'
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
tmp_file.write(response.content)
return tmp_file.name
else:
return f"Error: HTTP {response.status_code}"
except Exception as e:
return f"Error: {str(e)}"
# =============================================================================
# LANGGRAPH WORKFLOW
# =============================================================================
def create_ai_agent_workflow():
"""Create LangGraph workflow with AI-driven routing"""
workflow = StateGraph(AgentState)
# Add all nodes
workflow.add_node("decision", ai_decision_node)
workflow.add_node("direct_answer", answer_directly_node)
workflow.add_node("audio_transcribe", transcribe_audio_node)
workflow.add_node("image_ocr", ocr_image_node)
workflow.add_node("file_read", read_file_node)
workflow.add_node("wiki_search", search_wikipedia_node)
workflow.add_node("math_calc", calculate_math_node)
workflow.add_node("youtube_get", get_youtube_node)
workflow.add_node("file_download", download_file_node)
workflow.add_node("generate_answer", final_answer_node)
# Dynamic routing based on AI decision
def route_by_ai_decision(state: AgentState) -> str:
action = state.get("ai_decision", {}).get("action", "answer_directly")
print(f"🔀 Routing to: {action}")
return action
# Conditional routing from decision
workflow.add_conditional_edges(
"decision",
route_by_ai_decision,
{
"answer_directly": "direct_answer",
"transcribe_audio": "audio_transcribe",
"ocr_image": "image_ocr",
"read_file": "file_read",
"search_wikipedia": "wiki_search",
"calculate_math": "math_calc",
"get_youtube": "youtube_get",
"download_file": "file_download"
}
)
# Continue or end based on workflow state
def should_continue(state: AgentState) -> str:
if state.get("continue_workflow", False):
return "generate_answer"
else:
return END
# Add continue/end logic for tool nodes
tool_nodes = [
"audio_transcribe", "image_ocr", "file_read",
"wiki_search", "math_calc", "youtube_get", "file_download"
]
for node in tool_nodes:
workflow.add_conditional_edges(
node,
should_continue,
{
"generate_answer": "generate_answer",
END: END
}
)
# End edges
workflow.add_edge("direct_answer", END)
workflow.add_edge("generate_answer", END)
# Set entry point
workflow.set_entry_point("decision")
return workflow.compile()
# =============================================================================
# MAIN AGENT CLASS
# =============================================================================
class LangGraphAIAgent:
"""LangGraph agent with AI-driven tool calling"""
def __init__(self):
self.workflow = create_ai_agent_workflow()
print("🤖 LangGraph AI Agent with Qwen3-8B ready!")
print("🔧 Available tools: transcribe_audio, ocr_image, read_file, search_wikipedia, calculate_math, get_youtube")
def process_question(self, question: str, task_id: str = "", file_name: str = "") -> str:
"""Process question through AI-driven workflow"""
try:
# Initialize state
initial_state = {
"messages": [],
"question": question,
"task_id": task_id,
"file_name": file_name,
"ai_decision": {},
"tool_results": {},
"answer": "",
"continue_workflow": False
}
# Run workflow
result = self.workflow.invoke(initial_state)
return result.get("answer", "No answer generated")
except Exception as e:
return f"Agent error: {str(e)}"
# =============================================================================
# GLOBAL AGENT
# =============================================================================
# Create global agent instance
agent = LangGraphAIAgent()
def process_question(question: str, task_id: str = "", file_name: str = "") -> str:
"""Main entry point"""
return agent.process_question(question, task_id, file_name)
# =============================================================================
# TEST
# =============================================================================
if __name__ == "__main__":
test_questions = [
"What is 25 + 17?",
"Who was Mercedes Sosa?",
"What is the opposite of left?",
"How many continents are there?"
]
print("🧪 Testing LangGraph AI Agent:")
for i, q in enumerate(test_questions):
print(f"\n--- Test {i+1} ---")
print(f"Q: {q}")
answer = process_question(q)
print(f"A: {answer}")
print("-" * 50)