""" Tool Orchestrator - Điều phối và quản lý việc sử dụng tools """ import time from typing import Dict, Any, List, Optional, Callable from .state_manager import AgentState, ToolResult, get_agent_state, analyze_question_type, detect_urls_in_question from .text_tool import reverse_text_if_needed from .youtube_tool import YouTubeTool from .image_tool import ocr_image_with_nanonets, fallback_ocr_image from .audio_tool import transcribe_audio_groq, fallback_audio_info from .wiki_tool import search_wikipedia_from_question from .file_tool import read_file_content from .prompts import get_system_prompt, get_tool_prompt class ToolOrchestrator: """Điều phối và quản lý tools""" def __init__(self): self.state = get_agent_state() self.tools_registry = { "text_processor": self._run_text_processor, "youtube_tool": self._run_youtube_tool, "image_ocr": self._run_image_ocr, "audio_transcript": self._run_audio_transcript, "wiki_search": self._run_wiki_search, "file_reader": self._run_file_reader } def analyze_and_prepare_question(self, question: str, task_id: str = "") -> str: """ Phân tích và chuẩn bị câu hỏi """ # Start new task task_context = self.state.start_new_task(task_id or f"task_{int(time.time())}", question) # Detect URLs urls = detect_urls_in_question(question) task_context.detected_urls = urls # Check if text might be reversed text_analysis = reverse_text_if_needed(question) if isinstance(text_analysis, dict): processed_question = text_analysis.get("processed_text", question) else: processed_question = text_analysis if text_analysis else question task_context.processed_question = processed_question # Analyze question type question_type = analyze_question_type(processed_question) task_context.question_type = question_type # Check for file attachments (giả sử có task_id thì có file) task_context.has_file_attachment = bool(task_id) # Update state self.state.update_task_context( question_type=question_type, processed_question=processed_question, detected_urls=urls, has_file_attachment=bool(task_id) ) print(f"📋 Question Analysis:") print(f" Type: {question_type}") print(f" URLs found: {len(urls)}") print(f" Has file: {task_context.has_file_attachment}") print(f" Processed: {processed_question != question}") return processed_question def determine_tools_to_run(self, question_type: str, has_file: bool, has_urls: bool) -> List[str]: """ Xác định tools cần chạy dựa trên question type - FIXED LOGIC """ tools_to_run = [] # Tools theo loại câu hỏi từ AI analysis if question_type == "youtube": tools_to_run.append("youtube_tool") elif question_type == "image": tools_to_run.append("image_ocr") elif question_type == "audio": tools_to_run.append("audio_transcript") elif question_type == "wiki": tools_to_run.append("wiki_search") elif question_type == "file": tools_to_run.append("file_reader") elif question_type == "text": tools_to_run.append("text_processor") # Fallback cho math hoặc unknown if question_type in ["math", "unknown"] or not tools_to_run: tools_to_run.append("text_processor") print(f"🎯 Tools to run: {tools_to_run}") return tools_to_run def run_tools_sequence(self, tools_list: List[str], question: str, task_id: str = "") -> List[ToolResult]: """ Chạy sequence các tools """ results = [] for tool_name in tools_list: print(f"🔧 Running tool: {tool_name}") start_time = time.time() try: if tool_name in self.tools_registry: result = self.tools_registry[tool_name](question, task_id) execution_time = time.time() - start_time tool_result = ToolResult( tool_name=tool_name, success=result.get("success", True) if isinstance(result, dict) else bool(result), result=result, execution_time=execution_time ) # Nếu tool thành công và có kết quả hữu ích, có thể skip các tools khác if tool_result.success and self._is_result_sufficient(tool_result, question): print(f"✅ {tool_name} provided sufficient information, skipping remaining tools") results.append(tool_result) self.state.add_tool_result(tool_result) break else: tool_result = ToolResult( tool_name=tool_name, success=False, result=None, error_message=f"Unknown tool: {tool_name}" ) results.append(tool_result) self.state.add_tool_result(tool_result) print(f" Completed in {execution_time:.2f}s") except Exception as e: execution_time = time.time() - start_time tool_result = ToolResult( tool_name=tool_name, success=False, result=None, error_message=str(e), execution_time=execution_time ) results.append(tool_result) self.state.add_tool_result(tool_result) print(f"❌ {tool_name} failed: {str(e)}") return results def _is_result_sufficient(self, tool_result: ToolResult, question: str) -> bool: """ Kiểm tra xem kết quả tool có đủ để trả lời câu hỏi không """ if not tool_result.success: return False result = tool_result.result # Check for content length and quality if isinstance(result, dict): if tool_result.tool_name == "youtube_tool": return result.get("has_youtube") and (result.get("transcript") or result.get("title")) elif tool_result.tool_name == "wiki_search": return result.get("success") and len(result.get("summary", "")) > 100 elif tool_result.tool_name == "image_ocr": return len(str(result)) > 50 and not result.startswith("Error") elif tool_result.tool_name == "audio_transcript": return len(str(result)) > 50 and not result.startswith("Error") elif isinstance(result, str): return len(result) > 50 and not result.startswith("Error") return False # Tool execution methods def _run_text_processor(self, question: str, task_id: str = "") -> Dict[str, Any]: """Run text processing""" text_analysis = reverse_text_if_needed(question) if isinstance(text_analysis, dict): processed = text_analysis.get("processed_text", question) was_reversed = text_analysis.get("should_reverse", False) else: processed = text_analysis if text_analysis else question was_reversed = processed != question return { "success": True, "original": question, "processed": processed, "was_reversed": was_reversed, "analysis": text_analysis if isinstance(text_analysis, dict) else None } def _run_youtube_tool(self, question: str, task_id: str = "") -> Dict[str, Any]: """Run YouTube content extraction""" try: youtube_tool = YouTubeTool() result = youtube_tool.process_youtube(question) return result.get("data", {}) if result.get("success") else {"success": False, "error": result.get("error", "Unknown error")} except Exception as e: return {"success": False, "error": str(e)} def _run_image_ocr(self, question: str, task_id: str = "") -> str: """Run image OCR""" try: if task_id: result = ocr_image_with_nanonets(task_id=task_id) else: # Không có task_id, không thể OCR result = "No image file provided for OCR" return result except Exception as e: # Fallback to basic image info return fallback_ocr_image(task_id=task_id) def _run_audio_transcript(self, question: str, task_id: str = "") -> str: """Run audio transcription""" try: if task_id: result = transcribe_audio_groq(task_id=task_id) else: result = "No audio file provided for transcription" return result except Exception as e: # Fallback to basic audio info return fallback_audio_info(task_id=task_id) def _run_wiki_search(self, question: str, task_id: str = "") -> Dict[str, Any]: """Run Wikipedia search""" try: result = search_wikipedia_from_question(question) return result except Exception as e: return {"success": False, "error": str(e)} def _run_file_reader(self, question: str, task_id: str = "") -> str: """Run file content reading""" try: if task_id: result = read_file_content(task_id=task_id) else: result = "No file provided for reading" return result except Exception as e: return f"File reading error: {str(e)}" # Orchestrator functions def process_question_with_tools(question: str, task_id: str = "") -> Dict[str, Any]: """ Main function: Xử lý câu hỏi với tools phù hợp """ orchestrator = ToolOrchestrator() # 1. Analyze and prepare question processed_question = orchestrator.analyze_and_prepare_question(question, task_id) # 2. Determine tools to run task_context = orchestrator.state.current_task tools_to_run = orchestrator.determine_tools_to_run( task_context.question_type, task_context.has_file_attachment, bool(task_context.detected_urls) ) print(f"🎯 Tools to run: {', '.join(tools_to_run)}") # 3. Run tools tool_results = orchestrator.run_tools_sequence(tools_to_run, processed_question, task_id) # 4. Compile results successful_results = [r for r in tool_results if r.success] failed_results = [r for r in tool_results if not r.success] return { "processed_question": processed_question, "question_type": task_context.question_type, "tools_used": [r.tool_name for r in tool_results], "successful_tools": [r.tool_name for r in successful_results], "failed_tools": [r.tool_name for r in failed_results], "tool_results": tool_results, "cached_data": orchestrator.state.cached_data, "context_summary": orchestrator.state.generate_context_summary() } def get_best_available_content(tool_results: List[ToolResult]) -> Dict[str, Any]: """ Lấy nội dung tốt nhất từ tool results """ content = {} for result in tool_results: if result.success: if result.tool_name == "youtube_tool" and isinstance(result.result, dict): if result.result.get("has_youtube"): content["youtube"] = result.result elif result.tool_name == "wiki_search" and isinstance(result.result, dict): if result.result.get("success"): content["wikipedia"] = result.result elif result.tool_name in ["image_ocr", "audio_transcript", "file_reader"]: if isinstance(result.result, str) and len(result.result) > 50: content[result.tool_name] = result.result return content # Test function if __name__ == "__main__": # Test orchestrator test_question = "Who was Marie Curie?" result = process_question_with_tools(test_question) print("Orchestration result:") print(f"Question type: {result['question_type']}") print(f"Tools used: {result['tools_used']}") print(f"Successful: {result['successful_tools']}")