final_agent_course / agent.py
tuan3335's picture
use langchain
040a6c6
raw
history blame
13.9 kB
"""
AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION
Clean architecture with LangChain HuggingFace Pipeline
"""
import os
import json
import time
from typing import Dict, Any, List, Optional, Annotated
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
# LangChain HuggingFace Integration
from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace, HuggingFaceEndpoint
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from utils import (
process_question_with_tools,
get_agent_state,
reset_agent_state,
ToolOrchestrator,
get_system_prompt,
get_response_prompt,
build_context_summary,
analyze_question_type
)
load_dotenv()
class AgentState(TypedDict):
messages: Annotated[List, add_messages]
question: str
task_id: str
ai_analysis: Dict[str, Any]
should_use_tools: bool
tool_processing_result: Dict[str, Any]
final_answer: str
processing_complete: bool
class QuestionAnalysis(BaseModel):
question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math")
needs_tools: bool = Field(description="Whether tools are needed")
reasoning: str = Field(description="AI reasoning for the decision")
confidence: str = Field(description="Confidence level: high|medium|low")
class AIBrain:
def __init__(self):
self.model_name = "Qwen/Qwen3-8B"
print("🧠 Initializing Qwen3-8B with LangChain HuggingFace...")
# Load tokenizer with thinking disabled
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
# Create text generation pipeline with Qwen3
self.hf_pipeline = pipeline(
"text-generation",
model=self.model_name,
tokenizer=self.tokenizer,
torch_dtype="auto",
device_map="auto",
max_new_tokens=2048,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id if self.tokenizer.eos_token_id else self.tokenizer.pad_token_id
)
# Wrap with LangChain HuggingFacePipeline
self.llm = HuggingFacePipeline(pipeline=self.hf_pipeline)
# Create ChatHuggingFace for chat interface
self.chat_model = ChatHuggingFace(llm=self.llm)
print("βœ… Qwen3 AI Brain with LangChain HuggingFace initialized")
def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str:
"""Generate text with Qwen3 via LangChain - thinking disabled"""
try:
# Prepare messages for chat template with thinking DISABLED
messages = [{"role": "user", "content": prompt}]
# Apply chat template with enable_thinking=False
text = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False # CRITICAL: Disable thinking mode
)
# Use LangChain HuggingFace pipeline for generation
response = self.llm.invoke(text)
# Clean up response - remove input prompt
if text in response:
response = response.replace(text, "").strip()
return response
except Exception as e:
print(f"⚠️ Qwen3 generation error: {str(e)}")
# Fallback to direct pipeline call
try:
result = self.hf_pipeline(prompt, max_new_tokens=max_tokens)
return result[0]['generated_text'].replace(prompt, "").strip()
except Exception as e2:
return f"AI generation failed: {str(e2)}"
def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Analyze question type using Qwen3 with strict JSON output"""
prompt = f"""<instruction>
Analyze this question and determine the correct tool approach. Return ONLY valid JSON.
</instruction>
<question>{question}</question>
<task_id>{task_id}</task_id>
<classification_rules>
- YouTube URLs (youtube.com, youtu.be): "youtube"
- Images, photos, chess positions, visual content: "image"
- Audio files, voice, sound, mp3: "audio"
- Excel, CSV, documents, file uploads: "file"
- Wikipedia searches, historical facts, people info: "wiki"
- Math calculations, logic, text analysis: "text"
</classification_rules>
Return this exact JSON format:
{{
"question_type": "youtube|image|audio|wiki|file|text",
"needs_tools": true,
"reasoning": "Brief explanation of classification",
"confidence": "high"
}}"""
try:
response = self._generate_with_qwen3(prompt, 512)
# Extract JSON from response
import re
json_pattern = r'\{[^{}]*\}'
json_match = re.search(json_pattern, response)
if json_match:
result = json.loads(json_match.group())
# Validate required fields
required_fields = ["question_type", "needs_tools", "reasoning", "confidence"]
if all(field in result for field in required_fields):
return result
raise ValueError("Invalid JSON structure in response")
except Exception as e:
print(f"⚠️ Qwen3 analysis failed: {str(e)[:100]}...")
# Fallback analysis
question_type = analyze_question_type(question)
return {
"question_type": question_type,
"needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"],
"reasoning": f"Fallback classification: detected {question_type}",
"confidence": "medium"
}
def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str:
"""Generate final answer using Qwen3 with context"""
if tool_results and tool_results.get("tool_results"):
context = build_context_summary(
tool_results.get("tool_results", []),
tool_results.get("cached_data", {})
)
else:
context = "No additional context available"
prompt = f"""<instruction>
Generate a comprehensive answer to the user's question using the provided context.
</instruction>
<question>{question}</question>
<context>
{context}
</context>
<output_rules>
- Provide direct, accurate answers
- Use context information when relevant
- Be concise but complete
- No thinking process in output
- Professional tone
</output_rules>
Answer:"""
response = self._generate_with_qwen3(prompt, 2048)
# Clean up response
if "Answer:" in response:
response = response.split("Answer:")[-1].strip()
return response
# Initialize AI Brain globally
ai_brain = AIBrain()
def analyze_question_node(state: AgentState) -> AgentState:
"""Analyze question using Qwen3 AI Brain"""
question = state["question"]
task_id = state.get("task_id", "")
print("πŸ” Analyzing question with Qwen3...")
analysis = ai_brain.analyze_question(question, task_id)
state["ai_analysis"] = analysis
state["should_use_tools"] = analysis.get("needs_tools", True)
print(f"πŸ“Š Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}")
return state
def process_with_tools_node(state: AgentState) -> AgentState:
"""Process question with appropriate tools"""
question = state["question"]
task_id = state.get("task_id", "")
print("πŸ”§ Processing with specialized tools...")
tool_results = process_question_with_tools(question, task_id)
state["tool_processing_result"] = tool_results
successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success]
if successful_tools:
print(f"βœ… Successful tools: {successful_tools}")
else:
print("⚠️ No tools succeeded")
return state
def answer_directly_node(state: AgentState) -> AgentState:
"""Answer directly without tools using Qwen3"""
question = state["question"]
print("πŸ’­ Generating direct answer with Qwen3...")
answer = ai_brain.generate_answer(question, {})
state["final_answer"] = answer
state["processing_complete"] = True
return state
def generate_final_answer_node(state: AgentState) -> AgentState:
"""Generate final answer combining tool results and AI analysis"""
question = state["question"]
tool_results = state.get("tool_processing_result", {})
print("🎯 Generating final answer with context...")
answer = ai_brain.generate_answer(question, tool_results)
state["final_answer"] = answer
state["processing_complete"] = True
return state
def create_agent_workflow():
"""Create LangGraph workflow for question processing"""
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("analyze_question", analyze_question_node)
workflow.add_node("process_with_tools", process_with_tools_node)
workflow.add_node("answer_directly", answer_directly_node)
workflow.add_node("generate_final_answer", generate_final_answer_node)
# Define routing logic
def should_use_tools(state: AgentState) -> str:
return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly"
# Set up the flow
workflow.set_entry_point("analyze_question")
workflow.add_conditional_edges("analyze_question", should_use_tools)
workflow.add_edge("process_with_tools", "generate_final_answer")
workflow.add_edge("answer_directly", END)
workflow.add_edge("generate_final_answer", END)
return workflow.compile()
class LangGraphUtilsAgent:
def __init__(self):
self.app = create_agent_workflow()
print("πŸš€ LangGraph Agent with Qwen3 + Utils System ready")
def process_question(self, question: str, task_id: str = "") -> str:
"""Process question through the workflow"""
try:
print(f"\n🎯 Processing: {question[:100]}...")
# Initialize state
initial_state = {
"messages": [HumanMessage(content=question)],
"question": question,
"task_id": task_id,
"ai_analysis": {},
"should_use_tools": True,
"tool_processing_result": {},
"final_answer": "",
"processing_complete": False
}
# Run workflow
start_time = time.time()
result = self.app.invoke(initial_state)
elapsed_time = time.time() - start_time
final_answer = result.get("final_answer", "No answer generated")
print(f"βœ… Completed in {elapsed_time:.2f}s")
return final_answer
except Exception as e:
print(f"❌ Agent error: {str(e)}")
return f"I apologize, but I encountered an error processing your question: {str(e)}"
# Global agent instance
agent = LangGraphUtilsAgent()
def process_question(question: str, task_id: str = "") -> str:
"""Main entry point for question processing"""
if not question or not question.strip():
return "Please provide a valid question."
return agent.process_question(question.strip(), task_id)
# =============================================================================
# TESTING
# =============================================================================
if __name__ == "__main__":
print("πŸ§ͺ Testing LangGraph Utils Agent\n")
test_cases = [
{
"question": "Who was Marie Curie?",
"task_id": "",
"description": "Wikipedia factual question"
},
{
"question": "What is 25 + 17 * 3?",
"task_id": "",
"description": "Math calculation"
},
{
"question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI",
"task_id": "",
"description": "Reversed text question"
},
{
"question": "How many continents are there?",
"task_id": "",
"description": "General knowledge"
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{'='*60}")
print(f"TEST {i}: {test_case['description']}")
print(f"{'='*60}")
print(f"Question: {test_case['question']}")
try:
answer = process_question(test_case["question"], test_case["task_id"])
print(f"\nAnswer: {answer}")
except Exception as e:
print(f"\nTest failed: {str(e)}")
print(f"\n{'-'*60}")
print("\nβœ… All tests completed!")
# Initialize Qwen3 with thinking mode disabled
primary_brain = HuggingFaceEndpoint(
repo_id=primary_model,
temperature=0.7,
max_new_tokens=300,
huggingfacehub_api_token=os.getenv("HF_API_KEY"),
model_kwargs={"enable_thinking": False, "thinking_prompt": "/no_thinking"}
)