final_agent_course / agent.py
tuan3335's picture
final code
3c4b2fd
"""
AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION
Clean architecture with LangChain HuggingFace Pipeline
"""
import os
import json
import time
from typing import Dict, Any, List, Optional, Annotated
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
# LangChain HuggingFace Integration
from huggingface_hub import InferenceClient
from utils import (
process_question_with_tools,
get_agent_state,
reset_agent_state,
ToolOrchestrator,
get_system_prompt,
get_response_prompt,
build_context_summary,
analyze_question_type
)
load_dotenv()
class AgentState(TypedDict):
messages: Annotated[List, add_messages]
question: str
task_id: str
ai_analysis: Dict[str, Any]
should_use_tools: bool
tool_processing_result: Dict[str, Any]
final_answer: str
processing_complete: bool
class QuestionAnalysis(BaseModel):
question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math")
needs_tools: bool = Field(description="Whether tools are needed")
reasoning: str = Field(description="AI reasoning for the decision")
confidence: str = Field(description="Confidence level: high|medium|low")
class AIBrain:
def __init__(self):
self.model_name = "Qwen/Qwen3-8B"
print("🧠 Initializing Qwen3-8B với huggingface_hub InferenceClient...")
self.client = InferenceClient(
provider="auto",
api_key=os.environ["HF_TOKEN"],
)
print("✅ Qwen3 AI Brain với huggingface_hub InferenceClient đã sẵn sàng")
def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str:
"""Sinh text với Qwen3 bằng huggingface_hub InferenceClient"""
try:
messages = [
{"role": "user", "content": prompt + "\n/no_thinking"}
]
completion = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
max_tokens=max_tokens,
)
return completion.choices[0].message.content
except Exception as e:
print(f"⚠️ Qwen3 InferenceClient error: {str(e)}")
return f"AI generation failed: {str(e)}"
def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Analyze question type using Qwen3 with strict JSON output"""
prompt = f"""<instruction>
Analyze this question and determine the correct tool approach. Return ONLY valid JSON.
- If the question is about a historical event, a specific person, place, object, or something that requires searching the internet (e.g., Wikipedia), you MUST choose "wiki".
- If the question is about an event in the past or future (e.g., "when was", "in what year", "has ever", "will happen", "history", "prediction"), choose "wiki".
- If the question asks about a specific topic, person, place, object, or event (e.g., "who is", "what is", "where is", "when is", "why", "how"), choose "wiki".
- If the data source is unclear or you are not sure, prefer "wiki".
- Other types: youtube (if there is a link/video), image (if there is an image), audio (if there is an audio file), file (if there is an attachment), text (if it is text analysis or math).
</instruction>
<question>{question}</question>
<task_id>{task_id}</task_id>
<classification_rules>
- YouTube URLs (youtube.com, youtu.be): "youtube"
- Images, photos, chess positions, visual content: "image"
- Audio files, voice, sound, mp3: "audio"
- Excel, CSV, documents, file uploads: "file"
- Wikipedia searches, historical facts, people info, events, future/past, specific topics: "wiki"
- Math calculations, logic, text analysis: "text"
</classification_rules>
Return this exact JSON format:
{{
"question_type": "youtube|image|audio|wiki|file|text",
"needs_tools": true,
"reasoning": "Brief explanation of classification",
"confidence": "high"
}}"""
try:
response = self._generate_with_qwen3(prompt, 512)
# Extract JSON from response
import re
json_pattern = r'\{[^{}]*\}'
json_match = re.search(json_pattern, response)
if json_match:
result = json.loads(json_match.group())
# Validate required fields
required_fields = ["question_type", "needs_tools", "reasoning", "confidence"]
if all(field in result for field in required_fields):
return result
raise ValueError("Invalid JSON structure in response")
except Exception as e:
print(f"⚠️ Qwen3 analysis failed: {str(e)[:100]}...")
# Fallback analysis
question_type = analyze_question_type(question)
return {
"question_type": question_type,
"needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"],
"reasoning": f"Fallback classification: detected {question_type}",
"confidence": "medium"
}
def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str:
"""Generate final answer using Qwen3 with context"""
if tool_results and tool_results.get("tool_results"):
context = build_context_summary(
tool_results.get("tool_results", []),
tool_results.get("cached_data", {})
)
else:
context = ""
prompt = f"""
Answer the following question with only the answer. Do not explain, do not add any extra text, do not repeat the question, do not add punctuation or any prefix/suffix. Just output the answer as short and direct as possible. If the answer is not available, reply with 'No data'.
Context (if any): {context}
Question: {question}
"""
response = self._generate_with_qwen3(prompt, 2048)
# Dùng LangChain StrOutputParser để lấy phần text cuối cùng
parser = StrOutputParser()
answer = parser.parse(response)
answer = answer.strip()
# Remove common prefixes
for prefix in ["Answer:", "The answer is", "FINAL ANSWER:", "Final answer:", "final answer:"]:
if answer.lower().startswith(prefix.lower()):
answer = answer[len(prefix):].strip()
# Remove trailing period if only one word/number
if answer.endswith(".") and answer.count(" ") < 2:
answer = answer[:-1].strip()
return answer
# Initialize AI Brain globally
ai_brain = AIBrain()
def analyze_question_node(state: AgentState) -> AgentState:
"""Analyze question using Qwen3 AI Brain"""
question = state["question"]
task_id = state.get("task_id", "")
print("🔍 Analyzing question with Qwen3...")
analysis = ai_brain.analyze_question(question, task_id)
state["ai_analysis"] = analysis
state["should_use_tools"] = analysis.get("needs_tools", True)
print(f"📊 Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}")
return state
def process_with_tools_node(state: AgentState) -> AgentState:
"""Process question with appropriate tools"""
question = state["question"]
task_id = state.get("task_id", "")
print("🔧 Processing with specialized tools...")
tool_results = process_question_with_tools(question, task_id)
state["tool_processing_result"] = tool_results
# Log response từ wiki nếu có
for result in tool_results.get("tool_results", []):
if result.tool_name == "wiki_search":
print(f"[DEBUG] Wiki tool response: {result.result}")
if result.tool_name == "audio_transcript":
print(f"[DEBUG] Audio transcript: {result.result}")
successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success]
if successful_tools:
print(f"✅ Successful tools: {successful_tools}")
else:
print("⚠️ No tools succeeded")
return state
def answer_directly_node(state: AgentState) -> AgentState:
"""Answer directly without tools using Qwen3"""
question = state["question"]
print("💭 Generating direct answer with Qwen3...")
answer = ai_brain.generate_answer(question, {})
state["final_answer"] = answer
state["processing_complete"] = True
return state
def generate_final_answer_node(state: AgentState) -> AgentState:
"""Generate final answer combining tool results and AI analysis"""
question = state["question"]
tool_results = state.get("tool_processing_result", {})
print("🎯 Generating final answer with context...")
answer = ai_brain.generate_answer(question, tool_results)
state["final_answer"] = answer
state["processing_complete"] = True
return state
def create_agent_workflow():
"""Create LangGraph workflow for question processing"""
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("analyze_question", analyze_question_node)
workflow.add_node("process_with_tools", process_with_tools_node)
workflow.add_node("answer_directly", answer_directly_node)
workflow.add_node("generate_final_answer", generate_final_answer_node)
# Define routing logic
def should_use_tools(state: AgentState) -> str:
return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly"
# Set up the flow
workflow.set_entry_point("analyze_question")
workflow.add_conditional_edges("analyze_question", should_use_tools)
workflow.add_edge("process_with_tools", "generate_final_answer")
workflow.add_edge("answer_directly", END)
workflow.add_edge("generate_final_answer", END)
return workflow.compile()
class LangGraphUtilsAgent:
def __init__(self):
self.app = create_agent_workflow()
print("🚀 LangGraph Agent with Qwen3 + Utils System ready")
def process_question(self, question: str, task_id: str = "") -> str:
"""Process question through the workflow"""
try:
print(f"\n🎯 Processing: {question[:100]}...")
# Initialize state
initial_state = {
"messages": [HumanMessage(content=question)],
"question": question,
"task_id": task_id,
"ai_analysis": {},
"should_use_tools": True,
"tool_processing_result": {},
"final_answer": "",
"processing_complete": False
}
# Run workflow
start_time = time.time()
result = self.app.invoke(initial_state)
elapsed_time = time.time() - start_time
final_answer = result.get("final_answer", "No answer generated")
print(f"✅ Completed in {elapsed_time:.2f}s")
return final_answer
except Exception as e:
print(f"❌ Agent error: {str(e)}")
return f"I apologize, but I encountered an error processing your question: {str(e)}"
# Global agent instance
agent = LangGraphUtilsAgent()
def process_question(question: str, task_id: str = "") -> str:
"""Main entry point for question processing"""
if not question or not question.strip():
return "Please provide a valid question."
return agent.process_question(question.strip(), task_id)
# =============================================================================
# TESTING
# =============================================================================
if __name__ == "__main__":
print("🧪 Testing LangGraph Utils Agent\n")
test_cases = [
{
"question": "Who was Marie Curie?",
"task_id": "",
"description": "Wikipedia factual question"
},
{
"question": "What is 25 + 17 * 3?",
"task_id": "",
"description": "Math calculation"
},
{
"question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI",
"task_id": "",
"description": "Reversed text question"
},
{
"question": "How many continents are there?",
"task_id": "",
"description": "General knowledge"
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{'='*60}")
print(f"TEST {i}: {test_case['description']}")
print(f"{'='*60}")
print(f"Question: {test_case['question']}")
try:
answer = process_question(test_case["question"], test_case["task_id"])
print(f"\nAnswer: {answer}")
except Exception as e:
print(f"\nTest failed: {str(e)}")
print(f"\n{'-'*60}")
print("\n✅ All tests completed!")