Spaces:
Sleeping
Sleeping
| """ | |
| AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION | |
| Clean architecture with LangChain HuggingFace Pipeline | |
| """ | |
| import os | |
| import json | |
| import time | |
| from typing import Dict, Any, List, Optional, Annotated | |
| from dotenv import load_dotenv | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import JsonOutputParser, StrOutputParser | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.message import add_messages | |
| from typing_extensions import TypedDict | |
| from pydantic import BaseModel, Field | |
| # LangChain HuggingFace Integration | |
| from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace, HuggingFaceEndpoint | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| from utils import ( | |
| process_question_with_tools, | |
| get_agent_state, | |
| reset_agent_state, | |
| ToolOrchestrator, | |
| get_system_prompt, | |
| get_response_prompt, | |
| build_context_summary, | |
| analyze_question_type | |
| ) | |
| load_dotenv() | |
| class AgentState(TypedDict): | |
| messages: Annotated[List, add_messages] | |
| question: str | |
| task_id: str | |
| ai_analysis: Dict[str, Any] | |
| should_use_tools: bool | |
| tool_processing_result: Dict[str, Any] | |
| final_answer: str | |
| processing_complete: bool | |
| class QuestionAnalysis(BaseModel): | |
| question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") | |
| needs_tools: bool = Field(description="Whether tools are needed") | |
| reasoning: str = Field(description="AI reasoning for the decision") | |
| confidence: str = Field(description="Confidence level: high|medium|low") | |
| class AIBrain: | |
| def __init__(self): | |
| self.model_name = "Qwen/Qwen3-8B" | |
| print("π§ Initializing Qwen3-8B with LangChain HuggingFace...") | |
| # Load tokenizer with thinking disabled | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| # Create text generation pipeline with Qwen3 | |
| self.hf_pipeline = pipeline( | |
| "text-generation", | |
| model=self.model_name, | |
| tokenizer=self.tokenizer, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| max_new_tokens=2048, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id if self.tokenizer.eos_token_id else self.tokenizer.pad_token_id | |
| ) | |
| # Wrap with LangChain HuggingFacePipeline | |
| self.llm = HuggingFacePipeline(pipeline=self.hf_pipeline) | |
| # Create ChatHuggingFace for chat interface | |
| self.chat_model = ChatHuggingFace(llm=self.llm) | |
| print("β Qwen3 AI Brain with LangChain HuggingFace initialized") | |
| def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str: | |
| """Generate text with Qwen3 via LangChain - thinking disabled""" | |
| try: | |
| # Prepare messages for chat template with thinking DISABLED | |
| messages = [{"role": "user", "content": prompt}] | |
| # Apply chat template with enable_thinking=False | |
| text = self.tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| enable_thinking=False # CRITICAL: Disable thinking mode | |
| ) | |
| # Use LangChain HuggingFace pipeline for generation | |
| response = self.llm.invoke(text) | |
| # Clean up response - remove input prompt | |
| if text in response: | |
| response = response.replace(text, "").strip() | |
| return response | |
| except Exception as e: | |
| print(f"β οΈ Qwen3 generation error: {str(e)}") | |
| # Fallback to direct pipeline call | |
| try: | |
| result = self.hf_pipeline(prompt, max_new_tokens=max_tokens) | |
| return result[0]['generated_text'].replace(prompt, "").strip() | |
| except Exception as e2: | |
| return f"AI generation failed: {str(e2)}" | |
| def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: | |
| """Analyze question type using Qwen3 with strict JSON output""" | |
| prompt = f"""<instruction> | |
| Analyze this question and determine the correct tool approach. Return ONLY valid JSON. | |
| </instruction> | |
| <question>{question}</question> | |
| <task_id>{task_id}</task_id> | |
| <classification_rules> | |
| - YouTube URLs (youtube.com, youtu.be): "youtube" | |
| - Images, photos, chess positions, visual content: "image" | |
| - Audio files, voice, sound, mp3: "audio" | |
| - Excel, CSV, documents, file uploads: "file" | |
| - Wikipedia searches, historical facts, people info: "wiki" | |
| - Math calculations, logic, text analysis: "text" | |
| </classification_rules> | |
| Return this exact JSON format: | |
| {{ | |
| "question_type": "youtube|image|audio|wiki|file|text", | |
| "needs_tools": true, | |
| "reasoning": "Brief explanation of classification", | |
| "confidence": "high" | |
| }}""" | |
| try: | |
| response = self._generate_with_qwen3(prompt, 512) | |
| # Extract JSON from response | |
| import re | |
| json_pattern = r'\{[^{}]*\}' | |
| json_match = re.search(json_pattern, response) | |
| if json_match: | |
| result = json.loads(json_match.group()) | |
| # Validate required fields | |
| required_fields = ["question_type", "needs_tools", "reasoning", "confidence"] | |
| if all(field in result for field in required_fields): | |
| return result | |
| raise ValueError("Invalid JSON structure in response") | |
| except Exception as e: | |
| print(f"β οΈ Qwen3 analysis failed: {str(e)[:100]}...") | |
| # Fallback analysis | |
| question_type = analyze_question_type(question) | |
| return { | |
| "question_type": question_type, | |
| "needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"], | |
| "reasoning": f"Fallback classification: detected {question_type}", | |
| "confidence": "medium" | |
| } | |
| def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str: | |
| """Generate final answer using Qwen3 with context""" | |
| if tool_results and tool_results.get("tool_results"): | |
| context = build_context_summary( | |
| tool_results.get("tool_results", []), | |
| tool_results.get("cached_data", {}) | |
| ) | |
| else: | |
| context = "No additional context available" | |
| prompt = f"""<instruction> | |
| Generate a comprehensive answer to the user's question using the provided context. | |
| </instruction> | |
| <question>{question}</question> | |
| <context> | |
| {context} | |
| </context> | |
| <output_rules> | |
| - Provide direct, accurate answers | |
| - Use context information when relevant | |
| - Be concise but complete | |
| - No thinking process in output | |
| - Professional tone | |
| </output_rules> | |
| Answer:""" | |
| response = self._generate_with_qwen3(prompt, 2048) | |
| # Clean up response | |
| if "Answer:" in response: | |
| response = response.split("Answer:")[-1].strip() | |
| return response | |
| # Initialize AI Brain globally | |
| ai_brain = AIBrain() | |
| def analyze_question_node(state: AgentState) -> AgentState: | |
| """Analyze question using Qwen3 AI Brain""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| print("π Analyzing question with Qwen3...") | |
| analysis = ai_brain.analyze_question(question, task_id) | |
| state["ai_analysis"] = analysis | |
| state["should_use_tools"] = analysis.get("needs_tools", True) | |
| print(f"π Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}") | |
| return state | |
| def process_with_tools_node(state: AgentState) -> AgentState: | |
| """Process question with appropriate tools""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| print("π§ Processing with specialized tools...") | |
| tool_results = process_question_with_tools(question, task_id) | |
| state["tool_processing_result"] = tool_results | |
| successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success] | |
| if successful_tools: | |
| print(f"β Successful tools: {successful_tools}") | |
| else: | |
| print("β οΈ No tools succeeded") | |
| return state | |
| def answer_directly_node(state: AgentState) -> AgentState: | |
| """Answer directly without tools using Qwen3""" | |
| question = state["question"] | |
| print("π Generating direct answer with Qwen3...") | |
| answer = ai_brain.generate_answer(question, {}) | |
| state["final_answer"] = answer | |
| state["processing_complete"] = True | |
| return state | |
| def generate_final_answer_node(state: AgentState) -> AgentState: | |
| """Generate final answer combining tool results and AI analysis""" | |
| question = state["question"] | |
| tool_results = state.get("tool_processing_result", {}) | |
| print("π― Generating final answer with context...") | |
| answer = ai_brain.generate_answer(question, tool_results) | |
| state["final_answer"] = answer | |
| state["processing_complete"] = True | |
| return state | |
| def create_agent_workflow(): | |
| """Create LangGraph workflow for question processing""" | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("analyze_question", analyze_question_node) | |
| workflow.add_node("process_with_tools", process_with_tools_node) | |
| workflow.add_node("answer_directly", answer_directly_node) | |
| workflow.add_node("generate_final_answer", generate_final_answer_node) | |
| # Define routing logic | |
| def should_use_tools(state: AgentState) -> str: | |
| return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly" | |
| # Set up the flow | |
| workflow.set_entry_point("analyze_question") | |
| workflow.add_conditional_edges("analyze_question", should_use_tools) | |
| workflow.add_edge("process_with_tools", "generate_final_answer") | |
| workflow.add_edge("answer_directly", END) | |
| workflow.add_edge("generate_final_answer", END) | |
| return workflow.compile() | |
| class LangGraphUtilsAgent: | |
| def __init__(self): | |
| self.app = create_agent_workflow() | |
| print("π LangGraph Agent with Qwen3 + Utils System ready") | |
| def process_question(self, question: str, task_id: str = "") -> str: | |
| """Process question through the workflow""" | |
| try: | |
| print(f"\nπ― Processing: {question[:100]}...") | |
| # Initialize state | |
| initial_state = { | |
| "messages": [HumanMessage(content=question)], | |
| "question": question, | |
| "task_id": task_id, | |
| "ai_analysis": {}, | |
| "should_use_tools": True, | |
| "tool_processing_result": {}, | |
| "final_answer": "", | |
| "processing_complete": False | |
| } | |
| # Run workflow | |
| start_time = time.time() | |
| result = self.app.invoke(initial_state) | |
| elapsed_time = time.time() - start_time | |
| final_answer = result.get("final_answer", "No answer generated") | |
| print(f"β Completed in {elapsed_time:.2f}s") | |
| return final_answer | |
| except Exception as e: | |
| print(f"β Agent error: {str(e)}") | |
| return f"I apologize, but I encountered an error processing your question: {str(e)}" | |
| # Global agent instance | |
| agent = LangGraphUtilsAgent() | |
| def process_question(question: str, task_id: str = "") -> str: | |
| """Main entry point for question processing""" | |
| if not question or not question.strip(): | |
| return "Please provide a valid question." | |
| return agent.process_question(question.strip(), task_id) | |
| # ============================================================================= | |
| # TESTING | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| print("π§ͺ Testing LangGraph Utils Agent\n") | |
| test_cases = [ | |
| { | |
| "question": "Who was Marie Curie?", | |
| "task_id": "", | |
| "description": "Wikipedia factual question" | |
| }, | |
| { | |
| "question": "What is 25 + 17 * 3?", | |
| "task_id": "", | |
| "description": "Math calculation" | |
| }, | |
| { | |
| "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", | |
| "task_id": "", | |
| "description": "Reversed text question" | |
| }, | |
| { | |
| "question": "How many continents are there?", | |
| "task_id": "", | |
| "description": "General knowledge" | |
| } | |
| ] | |
| for i, test_case in enumerate(test_cases, 1): | |
| print(f"\n{'='*60}") | |
| print(f"TEST {i}: {test_case['description']}") | |
| print(f"{'='*60}") | |
| print(f"Question: {test_case['question']}") | |
| try: | |
| answer = process_question(test_case["question"], test_case["task_id"]) | |
| print(f"\nAnswer: {answer}") | |
| except Exception as e: | |
| print(f"\nTest failed: {str(e)}") | |
| print(f"\n{'-'*60}") | |
| print("\nβ All tests completed!") | |
| # Initialize Qwen3 with thinking mode disabled | |
| primary_brain = HuggingFaceEndpoint( | |
| repo_id=primary_model, | |
| temperature=0.7, | |
| max_new_tokens=300, | |
| huggingfacehub_api_token=os.getenv("HF_API_KEY"), | |
| model_kwargs={"enable_thinking": False, "thinking_prompt": "/no_thinking"} | |
| ) |