Spaces:
Sleeping
Sleeping
| """ | |
| AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION | |
| Clean architecture with LangChain HuggingFace Pipeline | |
| """ | |
| import os | |
| import json | |
| import time | |
| from typing import Dict, Any, List, Optional, Annotated | |
| from dotenv import load_dotenv | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import JsonOutputParser, StrOutputParser | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.message import add_messages | |
| from typing_extensions import TypedDict | |
| from pydantic import BaseModel, Field | |
| # LangChain HuggingFace Integration | |
| from huggingface_hub import InferenceClient | |
| from utils import ( | |
| process_question_with_tools, | |
| get_agent_state, | |
| reset_agent_state, | |
| ToolOrchestrator, | |
| get_system_prompt, | |
| get_response_prompt, | |
| build_context_summary, | |
| analyze_question_type | |
| ) | |
| load_dotenv() | |
| class AgentState(TypedDict): | |
| messages: Annotated[List, add_messages] | |
| question: str | |
| task_id: str | |
| ai_analysis: Dict[str, Any] | |
| should_use_tools: bool | |
| tool_processing_result: Dict[str, Any] | |
| final_answer: str | |
| processing_complete: bool | |
| class QuestionAnalysis(BaseModel): | |
| question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") | |
| needs_tools: bool = Field(description="Whether tools are needed") | |
| reasoning: str = Field(description="AI reasoning for the decision") | |
| confidence: str = Field(description="Confidence level: high|medium|low") | |
| class AIBrain: | |
| def __init__(self): | |
| self.model_name = "Qwen/Qwen3-8B" | |
| print("🧠 Initializing Qwen3-8B với huggingface_hub InferenceClient...") | |
| self.client = InferenceClient( | |
| provider="auto", | |
| api_key=os.environ["HF_TOKEN"], | |
| ) | |
| print("✅ Qwen3 AI Brain với huggingface_hub InferenceClient đã sẵn sàng") | |
| def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str: | |
| """Sinh text với Qwen3 bằng huggingface_hub InferenceClient""" | |
| try: | |
| messages = [ | |
| {"role": "user", "content": prompt + "\n/no_thinking"} | |
| ] | |
| completion = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| ) | |
| return completion.choices[0].message.content | |
| except Exception as e: | |
| print(f"⚠️ Qwen3 InferenceClient error: {str(e)}") | |
| return f"AI generation failed: {str(e)}" | |
| def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: | |
| """Analyze question type using Qwen3 with strict JSON output""" | |
| prompt = f"""<instruction> | |
| Analyze this question and determine the correct tool approach. Return ONLY valid JSON. | |
| - If the question is about a historical event, a specific person, place, object, or something that requires searching the internet (e.g., Wikipedia), you MUST choose "wiki". | |
| - If the question is about an event in the past or future (e.g., "when was", "in what year", "has ever", "will happen", "history", "prediction"), choose "wiki". | |
| - If the question asks about a specific topic, person, place, object, or event (e.g., "who is", "what is", "where is", "when is", "why", "how"), choose "wiki". | |
| - If the data source is unclear or you are not sure, prefer "wiki". | |
| - Other types: youtube (if there is a link/video), image (if there is an image), audio (if there is an audio file), file (if there is an attachment), text (if it is text analysis or math). | |
| </instruction> | |
| <question>{question}</question> | |
| <task_id>{task_id}</task_id> | |
| <classification_rules> | |
| - YouTube URLs (youtube.com, youtu.be): "youtube" | |
| - Images, photos, chess positions, visual content: "image" | |
| - Audio files, voice, sound, mp3: "audio" | |
| - Excel, CSV, documents, file uploads: "file" | |
| - Wikipedia searches, historical facts, people info, events, future/past, specific topics: "wiki" | |
| - Math calculations, logic, text analysis: "text" | |
| </classification_rules> | |
| Return this exact JSON format: | |
| {{ | |
| "question_type": "youtube|image|audio|wiki|file|text", | |
| "needs_tools": true, | |
| "reasoning": "Brief explanation of classification", | |
| "confidence": "high" | |
| }}""" | |
| try: | |
| response = self._generate_with_qwen3(prompt, 512) | |
| # Extract JSON from response | |
| import re | |
| json_pattern = r'\{[^{}]*\}' | |
| json_match = re.search(json_pattern, response) | |
| if json_match: | |
| result = json.loads(json_match.group()) | |
| # Validate required fields | |
| required_fields = ["question_type", "needs_tools", "reasoning", "confidence"] | |
| if all(field in result for field in required_fields): | |
| return result | |
| raise ValueError("Invalid JSON structure in response") | |
| except Exception as e: | |
| print(f"⚠️ Qwen3 analysis failed: {str(e)[:100]}...") | |
| # Fallback analysis | |
| question_type = analyze_question_type(question) | |
| return { | |
| "question_type": question_type, | |
| "needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"], | |
| "reasoning": f"Fallback classification: detected {question_type}", | |
| "confidence": "medium" | |
| } | |
| def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str: | |
| """Generate final answer using Qwen3 with context""" | |
| if tool_results and tool_results.get("tool_results"): | |
| context = build_context_summary( | |
| tool_results.get("tool_results", []), | |
| tool_results.get("cached_data", {}) | |
| ) | |
| else: | |
| context = "" | |
| prompt = f""" | |
| Answer the following question with only the answer. Do not explain, do not add any extra text, do not repeat the question, do not add punctuation or any prefix/suffix. Just output the answer as short and direct as possible. If the answer is not available, reply with 'No data'. | |
| Context (if any): {context} | |
| Question: {question} | |
| """ | |
| response = self._generate_with_qwen3(prompt, 2048) | |
| # Dùng LangChain StrOutputParser để lấy phần text cuối cùng | |
| parser = StrOutputParser() | |
| answer = parser.parse(response) | |
| answer = answer.strip() | |
| # Remove common prefixes | |
| for prefix in ["Answer:", "The answer is", "FINAL ANSWER:", "Final answer:", "final answer:"]: | |
| if answer.lower().startswith(prefix.lower()): | |
| answer = answer[len(prefix):].strip() | |
| # Remove trailing period if only one word/number | |
| if answer.endswith(".") and answer.count(" ") < 2: | |
| answer = answer[:-1].strip() | |
| return answer | |
| # Initialize AI Brain globally | |
| ai_brain = AIBrain() | |
| def analyze_question_node(state: AgentState) -> AgentState: | |
| """Analyze question using Qwen3 AI Brain""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| print("🔍 Analyzing question with Qwen3...") | |
| analysis = ai_brain.analyze_question(question, task_id) | |
| state["ai_analysis"] = analysis | |
| state["should_use_tools"] = analysis.get("needs_tools", True) | |
| print(f"📊 Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}") | |
| return state | |
| def process_with_tools_node(state: AgentState) -> AgentState: | |
| """Process question with appropriate tools""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| print("🔧 Processing with specialized tools...") | |
| tool_results = process_question_with_tools(question, task_id) | |
| state["tool_processing_result"] = tool_results | |
| # Log response từ wiki nếu có | |
| for result in tool_results.get("tool_results", []): | |
| if result.tool_name == "wiki_search": | |
| print(f"[DEBUG] Wiki tool response: {result.result}") | |
| if result.tool_name == "audio_transcript": | |
| print(f"[DEBUG] Audio transcript: {result.result}") | |
| successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success] | |
| if successful_tools: | |
| print(f"✅ Successful tools: {successful_tools}") | |
| else: | |
| print("⚠️ No tools succeeded") | |
| return state | |
| def answer_directly_node(state: AgentState) -> AgentState: | |
| """Answer directly without tools using Qwen3""" | |
| question = state["question"] | |
| print("💭 Generating direct answer with Qwen3...") | |
| answer = ai_brain.generate_answer(question, {}) | |
| state["final_answer"] = answer | |
| state["processing_complete"] = True | |
| return state | |
| def generate_final_answer_node(state: AgentState) -> AgentState: | |
| """Generate final answer combining tool results and AI analysis""" | |
| question = state["question"] | |
| tool_results = state.get("tool_processing_result", {}) | |
| print("🎯 Generating final answer with context...") | |
| answer = ai_brain.generate_answer(question, tool_results) | |
| state["final_answer"] = answer | |
| state["processing_complete"] = True | |
| return state | |
| def create_agent_workflow(): | |
| """Create LangGraph workflow for question processing""" | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("analyze_question", analyze_question_node) | |
| workflow.add_node("process_with_tools", process_with_tools_node) | |
| workflow.add_node("answer_directly", answer_directly_node) | |
| workflow.add_node("generate_final_answer", generate_final_answer_node) | |
| # Define routing logic | |
| def should_use_tools(state: AgentState) -> str: | |
| return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly" | |
| # Set up the flow | |
| workflow.set_entry_point("analyze_question") | |
| workflow.add_conditional_edges("analyze_question", should_use_tools) | |
| workflow.add_edge("process_with_tools", "generate_final_answer") | |
| workflow.add_edge("answer_directly", END) | |
| workflow.add_edge("generate_final_answer", END) | |
| return workflow.compile() | |
| class LangGraphUtilsAgent: | |
| def __init__(self): | |
| self.app = create_agent_workflow() | |
| print("🚀 LangGraph Agent with Qwen3 + Utils System ready") | |
| def process_question(self, question: str, task_id: str = "") -> str: | |
| """Process question through the workflow""" | |
| try: | |
| print(f"\n🎯 Processing: {question[:100]}...") | |
| # Initialize state | |
| initial_state = { | |
| "messages": [HumanMessage(content=question)], | |
| "question": question, | |
| "task_id": task_id, | |
| "ai_analysis": {}, | |
| "should_use_tools": True, | |
| "tool_processing_result": {}, | |
| "final_answer": "", | |
| "processing_complete": False | |
| } | |
| # Run workflow | |
| start_time = time.time() | |
| result = self.app.invoke(initial_state) | |
| elapsed_time = time.time() - start_time | |
| final_answer = result.get("final_answer", "No answer generated") | |
| print(f"✅ Completed in {elapsed_time:.2f}s") | |
| return final_answer | |
| except Exception as e: | |
| print(f"❌ Agent error: {str(e)}") | |
| return f"I apologize, but I encountered an error processing your question: {str(e)}" | |
| # Global agent instance | |
| agent = LangGraphUtilsAgent() | |
| def process_question(question: str, task_id: str = "") -> str: | |
| """Main entry point for question processing""" | |
| if not question or not question.strip(): | |
| return "Please provide a valid question." | |
| return agent.process_question(question.strip(), task_id) | |
| # ============================================================================= | |
| # TESTING | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| print("🧪 Testing LangGraph Utils Agent\n") | |
| test_cases = [ | |
| { | |
| "question": "Who was Marie Curie?", | |
| "task_id": "", | |
| "description": "Wikipedia factual question" | |
| }, | |
| { | |
| "question": "What is 25 + 17 * 3?", | |
| "task_id": "", | |
| "description": "Math calculation" | |
| }, | |
| { | |
| "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", | |
| "task_id": "", | |
| "description": "Reversed text question" | |
| }, | |
| { | |
| "question": "How many continents are there?", | |
| "task_id": "", | |
| "description": "General knowledge" | |
| } | |
| ] | |
| for i, test_case in enumerate(test_cases, 1): | |
| print(f"\n{'='*60}") | |
| print(f"TEST {i}: {test_case['description']}") | |
| print(f"{'='*60}") | |
| print(f"Question: {test_case['question']}") | |
| try: | |
| answer = process_question(test_case["question"], test_case["task_id"]) | |
| print(f"\nAnswer: {answer}") | |
| except Exception as e: | |
| print(f"\nTest failed: {str(e)}") | |
| print(f"\n{'-'*60}") | |
| print("\n✅ All tests completed!") |