""" AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION Clean architecture with LangChain HuggingFace Pipeline """ import os import json import time from typing import Dict, Any, List, Optional, Annotated from dotenv import load_dotenv from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser, StrOutputParser from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from typing_extensions import TypedDict from pydantic import BaseModel, Field # LangChain HuggingFace Integration from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace, HuggingFaceEndpoint from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline from utils import ( process_question_with_tools, get_agent_state, reset_agent_state, ToolOrchestrator, get_system_prompt, get_response_prompt, build_context_summary, analyze_question_type ) load_dotenv() class AgentState(TypedDict): messages: Annotated[List, add_messages] question: str task_id: str ai_analysis: Dict[str, Any] should_use_tools: bool tool_processing_result: Dict[str, Any] final_answer: str processing_complete: bool class QuestionAnalysis(BaseModel): question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") needs_tools: bool = Field(description="Whether tools are needed") reasoning: str = Field(description="AI reasoning for the decision") confidence: str = Field(description="Confidence level: high|medium|low") class AIBrain: def __init__(self): self.model_name = "Qwen/Qwen3-8B" print("๐Ÿง  Initializing Qwen3-8B with LangChain HuggingFace...") # Load tokenizer with thinking disabled self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) # Create text generation pipeline with Qwen3 self.hf_pipeline = pipeline( "text-generation", model=self.model_name, tokenizer=self.tokenizer, torch_dtype="auto", device_map="auto", max_new_tokens=2048, temperature=0.7, top_p=0.9, do_sample=True, pad_token_id=self.tokenizer.eos_token_id if self.tokenizer.eos_token_id else self.tokenizer.pad_token_id ) # Wrap with LangChain HuggingFacePipeline self.llm = HuggingFacePipeline(pipeline=self.hf_pipeline) # Create ChatHuggingFace for chat interface self.chat_model = ChatHuggingFace(llm=self.llm) print("โœ… Qwen3 AI Brain with LangChain HuggingFace initialized") def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str: """Generate text with Qwen3 via LangChain - thinking disabled""" try: # Prepare messages for chat template with thinking DISABLED messages = [{"role": "user", "content": prompt}] # Apply chat template with enable_thinking=False text = self.tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False # CRITICAL: Disable thinking mode ) # Use LangChain HuggingFace pipeline for generation response = self.llm.invoke(text) # Clean up response - remove input prompt if text in response: response = response.replace(text, "").strip() return response except Exception as e: print(f"โš ๏ธ Qwen3 generation error: {str(e)}") # Fallback to direct pipeline call try: result = self.hf_pipeline(prompt, max_new_tokens=max_tokens) return result[0]['generated_text'].replace(prompt, "").strip() except Exception as e2: return f"AI generation failed: {str(e2)}" def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: """Analyze question type using Qwen3 with strict JSON output""" prompt = f""" Analyze this question and determine the correct tool approach. Return ONLY valid JSON. {question} {task_id} - YouTube URLs (youtube.com, youtu.be): "youtube" - Images, photos, chess positions, visual content: "image" - Audio files, voice, sound, mp3: "audio" - Excel, CSV, documents, file uploads: "file" - Wikipedia searches, historical facts, people info: "wiki" - Math calculations, logic, text analysis: "text" Return this exact JSON format: {{ "question_type": "youtube|image|audio|wiki|file|text", "needs_tools": true, "reasoning": "Brief explanation of classification", "confidence": "high" }}""" try: response = self._generate_with_qwen3(prompt, 512) # Extract JSON from response import re json_pattern = r'\{[^{}]*\}' json_match = re.search(json_pattern, response) if json_match: result = json.loads(json_match.group()) # Validate required fields required_fields = ["question_type", "needs_tools", "reasoning", "confidence"] if all(field in result for field in required_fields): return result raise ValueError("Invalid JSON structure in response") except Exception as e: print(f"โš ๏ธ Qwen3 analysis failed: {str(e)[:100]}...") # Fallback analysis question_type = analyze_question_type(question) return { "question_type": question_type, "needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"], "reasoning": f"Fallback classification: detected {question_type}", "confidence": "medium" } def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str: """Generate final answer using Qwen3 with context""" if tool_results and tool_results.get("tool_results"): context = build_context_summary( tool_results.get("tool_results", []), tool_results.get("cached_data", {}) ) else: context = "No additional context available" prompt = f""" Generate a comprehensive answer to the user's question using the provided context. {question} {context} - Provide direct, accurate answers - Use context information when relevant - Be concise but complete - No thinking process in output - Professional tone Answer:""" response = self._generate_with_qwen3(prompt, 2048) # Clean up response if "Answer:" in response: response = response.split("Answer:")[-1].strip() return response # Initialize AI Brain globally ai_brain = AIBrain() def analyze_question_node(state: AgentState) -> AgentState: """Analyze question using Qwen3 AI Brain""" question = state["question"] task_id = state.get("task_id", "") print("๐Ÿ” Analyzing question with Qwen3...") analysis = ai_brain.analyze_question(question, task_id) state["ai_analysis"] = analysis state["should_use_tools"] = analysis.get("needs_tools", True) print(f"๐Ÿ“Š Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}") return state def process_with_tools_node(state: AgentState) -> AgentState: """Process question with appropriate tools""" question = state["question"] task_id = state.get("task_id", "") print("๐Ÿ”ง Processing with specialized tools...") tool_results = process_question_with_tools(question, task_id) state["tool_processing_result"] = tool_results successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success] if successful_tools: print(f"โœ… Successful tools: {successful_tools}") else: print("โš ๏ธ No tools succeeded") return state def answer_directly_node(state: AgentState) -> AgentState: """Answer directly without tools using Qwen3""" question = state["question"] print("๐Ÿ’ญ Generating direct answer with Qwen3...") answer = ai_brain.generate_answer(question, {}) state["final_answer"] = answer state["processing_complete"] = True return state def generate_final_answer_node(state: AgentState) -> AgentState: """Generate final answer combining tool results and AI analysis""" question = state["question"] tool_results = state.get("tool_processing_result", {}) print("๐ŸŽฏ Generating final answer with context...") answer = ai_brain.generate_answer(question, tool_results) state["final_answer"] = answer state["processing_complete"] = True return state def create_agent_workflow(): """Create LangGraph workflow for question processing""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("analyze_question", analyze_question_node) workflow.add_node("process_with_tools", process_with_tools_node) workflow.add_node("answer_directly", answer_directly_node) workflow.add_node("generate_final_answer", generate_final_answer_node) # Define routing logic def should_use_tools(state: AgentState) -> str: return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly" # Set up the flow workflow.set_entry_point("analyze_question") workflow.add_conditional_edges("analyze_question", should_use_tools) workflow.add_edge("process_with_tools", "generate_final_answer") workflow.add_edge("answer_directly", END) workflow.add_edge("generate_final_answer", END) return workflow.compile() class LangGraphUtilsAgent: def __init__(self): self.app = create_agent_workflow() print("๐Ÿš€ LangGraph Agent with Qwen3 + Utils System ready") def process_question(self, question: str, task_id: str = "") -> str: """Process question through the workflow""" try: print(f"\n๐ŸŽฏ Processing: {question[:100]}...") # Initialize state initial_state = { "messages": [HumanMessage(content=question)], "question": question, "task_id": task_id, "ai_analysis": {}, "should_use_tools": True, "tool_processing_result": {}, "final_answer": "", "processing_complete": False } # Run workflow start_time = time.time() result = self.app.invoke(initial_state) elapsed_time = time.time() - start_time final_answer = result.get("final_answer", "No answer generated") print(f"โœ… Completed in {elapsed_time:.2f}s") return final_answer except Exception as e: print(f"โŒ Agent error: {str(e)}") return f"I apologize, but I encountered an error processing your question: {str(e)}" # Global agent instance agent = LangGraphUtilsAgent() def process_question(question: str, task_id: str = "") -> str: """Main entry point for question processing""" if not question or not question.strip(): return "Please provide a valid question." return agent.process_question(question.strip(), task_id) # ============================================================================= # TESTING # ============================================================================= if __name__ == "__main__": print("๐Ÿงช Testing LangGraph Utils Agent\n") test_cases = [ { "question": "Who was Marie Curie?", "task_id": "", "description": "Wikipedia factual question" }, { "question": "What is 25 + 17 * 3?", "task_id": "", "description": "Math calculation" }, { "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", "task_id": "", "description": "Reversed text question" }, { "question": "How many continents are there?", "task_id": "", "description": "General knowledge" } ] for i, test_case in enumerate(test_cases, 1): print(f"\n{'='*60}") print(f"TEST {i}: {test_case['description']}") print(f"{'='*60}") print(f"Question: {test_case['question']}") try: answer = process_question(test_case["question"], test_case["task_id"]) print(f"\nAnswer: {answer}") except Exception as e: print(f"\nTest failed: {str(e)}") print(f"\n{'-'*60}") print("\nโœ… All tests completed!") # Initialize Qwen3 with thinking mode disabled primary_brain = HuggingFaceEndpoint( repo_id=primary_model, temperature=0.7, max_new_tokens=300, huggingfacehub_api_token=os.getenv("HF_API_KEY"), model_kwargs={"enable_thinking": False, "thinking_prompt": "/no_thinking"} )