Spaces:
Sleeping
Sleeping
| """ | |
| Tool Orchestrator - Điều phối và quản lý việc sử dụng tools | |
| """ | |
| import time | |
| from typing import Dict, Any, List, Optional, Callable | |
| from .state_manager import AgentState, ToolResult, get_agent_state, analyze_question_type, detect_urls_in_question | |
| from .text_tool import reverse_text_if_needed | |
| from .youtube_tool import YouTubeTool | |
| from .image_tool import ocr_image_with_nanonets, fallback_ocr_image | |
| from .audio_tool import transcribe_audio_groq, fallback_audio_info | |
| from .wiki_tool import search_wikipedia_from_question | |
| from .file_tool import read_file_content | |
| from .prompts import get_system_prompt, get_tool_prompt | |
| class ToolOrchestrator: | |
| """Điều phối và quản lý tools""" | |
| def __init__(self): | |
| self.state = get_agent_state() | |
| self.tools_registry = { | |
| "text_processor": self._run_text_processor, | |
| "youtube_tool": self._run_youtube_tool, | |
| "image_ocr": self._run_image_ocr, | |
| "audio_transcript": self._run_audio_transcript, | |
| "wiki_search": self._run_wiki_search, | |
| "file_reader": self._run_file_reader | |
| } | |
| def analyze_and_prepare_question(self, question: str, task_id: str = "") -> str: | |
| """ | |
| Phân tích và chuẩn bị câu hỏi | |
| """ | |
| # Start new task | |
| task_context = self.state.start_new_task(task_id or f"task_{int(time.time())}", question) | |
| # Detect URLs | |
| urls = detect_urls_in_question(question) | |
| task_context.detected_urls = urls | |
| # Check if text might be reversed | |
| text_analysis = reverse_text_if_needed(question) | |
| if isinstance(text_analysis, dict): | |
| processed_question = text_analysis.get("processed_text", question) | |
| else: | |
| processed_question = text_analysis if text_analysis else question | |
| task_context.processed_question = processed_question | |
| # Analyze question type | |
| question_type = analyze_question_type(processed_question) | |
| task_context.question_type = question_type | |
| # Check for file attachments (giả sử có task_id thì có file) | |
| task_context.has_file_attachment = bool(task_id) | |
| # Update state | |
| self.state.update_task_context( | |
| question_type=question_type, | |
| processed_question=processed_question, | |
| detected_urls=urls, | |
| has_file_attachment=bool(task_id) | |
| ) | |
| print(f"📋 Question Analysis:") | |
| print(f" Type: {question_type}") | |
| print(f" URLs found: {len(urls)}") | |
| print(f" Has file: {task_context.has_file_attachment}") | |
| print(f" Processed: {processed_question != question}") | |
| return processed_question | |
| def determine_tools_to_run(self, question_type: str, has_file: bool, has_urls: bool) -> List[str]: | |
| """ | |
| Xác định tools cần chạy dựa trên question type - FIXED LOGIC | |
| """ | |
| tools_to_run = [] | |
| # Tools theo loại câu hỏi từ AI analysis | |
| if question_type == "youtube": | |
| tools_to_run.append("youtube_tool") | |
| elif question_type == "image": | |
| tools_to_run.append("image_ocr") | |
| elif question_type == "audio": | |
| tools_to_run.append("audio_transcript") | |
| elif question_type == "wiki": | |
| tools_to_run.append("wiki_search") | |
| elif question_type == "file": | |
| tools_to_run.append("file_reader") | |
| elif question_type == "text": | |
| tools_to_run.append("text_processor") | |
| # Fallback cho math hoặc unknown | |
| if question_type in ["math", "unknown"] or not tools_to_run: | |
| tools_to_run.append("text_processor") | |
| print(f"🎯 Tools to run: {tools_to_run}") | |
| return tools_to_run | |
| def run_tools_sequence(self, tools_list: List[str], question: str, task_id: str = "") -> List[ToolResult]: | |
| """ | |
| Chạy sequence các tools | |
| """ | |
| results = [] | |
| for tool_name in tools_list: | |
| print(f"🔧 Running tool: {tool_name}") | |
| start_time = time.time() | |
| try: | |
| if tool_name in self.tools_registry: | |
| result = self.tools_registry[tool_name](question, task_id) | |
| execution_time = time.time() - start_time | |
| tool_result = ToolResult( | |
| tool_name=tool_name, | |
| success=result.get("success", True) if isinstance(result, dict) else bool(result), | |
| result=result, | |
| execution_time=execution_time | |
| ) | |
| # Nếu tool thành công và có kết quả hữu ích, có thể skip các tools khác | |
| if tool_result.success and self._is_result_sufficient(tool_result, question): | |
| print(f"✅ {tool_name} provided sufficient information, skipping remaining tools") | |
| results.append(tool_result) | |
| self.state.add_tool_result(tool_result) | |
| break | |
| else: | |
| tool_result = ToolResult( | |
| tool_name=tool_name, | |
| success=False, | |
| result=None, | |
| error_message=f"Unknown tool: {tool_name}" | |
| ) | |
| results.append(tool_result) | |
| self.state.add_tool_result(tool_result) | |
| print(f" Completed in {execution_time:.2f}s") | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| tool_result = ToolResult( | |
| tool_name=tool_name, | |
| success=False, | |
| result=None, | |
| error_message=str(e), | |
| execution_time=execution_time | |
| ) | |
| results.append(tool_result) | |
| self.state.add_tool_result(tool_result) | |
| print(f"❌ {tool_name} failed: {str(e)}") | |
| return results | |
| def _is_result_sufficient(self, tool_result: ToolResult, question: str) -> bool: | |
| """ | |
| Kiểm tra xem kết quả tool có đủ để trả lời câu hỏi không | |
| """ | |
| if not tool_result.success: | |
| return False | |
| result = tool_result.result | |
| # Check for content length and quality | |
| if isinstance(result, dict): | |
| if tool_result.tool_name == "youtube_tool": | |
| return result.get("has_youtube") and (result.get("transcript") or result.get("title")) | |
| elif tool_result.tool_name == "wiki_search": | |
| return result.get("success") and len(result.get("summary", "")) > 100 | |
| elif tool_result.tool_name == "image_ocr": | |
| return len(str(result)) > 50 and not result.startswith("Error") | |
| elif tool_result.tool_name == "audio_transcript": | |
| return len(str(result)) > 50 and not result.startswith("Error") | |
| elif isinstance(result, str): | |
| return len(result) > 50 and not result.startswith("Error") | |
| return False | |
| # Tool execution methods | |
| def _run_text_processor(self, question: str, task_id: str = "") -> Dict[str, Any]: | |
| """Run text processing""" | |
| text_analysis = reverse_text_if_needed(question) | |
| if isinstance(text_analysis, dict): | |
| processed = text_analysis.get("processed_text", question) | |
| was_reversed = text_analysis.get("should_reverse", False) | |
| else: | |
| processed = text_analysis if text_analysis else question | |
| was_reversed = processed != question | |
| return { | |
| "success": True, | |
| "original": question, | |
| "processed": processed, | |
| "was_reversed": was_reversed, | |
| "analysis": text_analysis if isinstance(text_analysis, dict) else None | |
| } | |
| def _run_youtube_tool(self, question: str, task_id: str = "") -> Dict[str, Any]: | |
| """Run YouTube content extraction""" | |
| try: | |
| youtube_tool = YouTubeTool() | |
| result = youtube_tool.process_youtube(question) | |
| return result.get("data", {}) if result.get("success") else {"success": False, "error": result.get("error", "Unknown error")} | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def _run_image_ocr(self, question: str, task_id: str = "") -> str: | |
| """Run image OCR""" | |
| try: | |
| if task_id: | |
| result = ocr_image_with_nanonets(task_id=task_id) | |
| else: | |
| # Không có task_id, không thể OCR | |
| result = "No image file provided for OCR" | |
| return result | |
| except Exception as e: | |
| # Fallback to basic image info | |
| return fallback_ocr_image(task_id=task_id) | |
| def _run_audio_transcript(self, question: str, task_id: str = "") -> str: | |
| """Run audio transcription""" | |
| try: | |
| if task_id: | |
| result = transcribe_audio_groq(task_id=task_id) | |
| else: | |
| result = "No audio file provided for transcription" | |
| return result | |
| except Exception as e: | |
| # Fallback to basic audio info | |
| return fallback_audio_info(task_id=task_id) | |
| def _run_wiki_search(self, question: str, task_id: str = "") -> Dict[str, Any]: | |
| """Run Wikipedia search""" | |
| try: | |
| result = search_wikipedia_from_question(question) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def _run_file_reader(self, question: str, task_id: str = "") -> str: | |
| """Run file content reading""" | |
| try: | |
| if task_id: | |
| result = read_file_content(task_id=task_id) | |
| else: | |
| result = "No file provided for reading" | |
| return result | |
| except Exception as e: | |
| return f"File reading error: {str(e)}" | |
| # Orchestrator functions | |
| def process_question_with_tools(question: str, task_id: str = "") -> Dict[str, Any]: | |
| """ | |
| Main function: Xử lý câu hỏi với tools phù hợp | |
| """ | |
| orchestrator = ToolOrchestrator() | |
| # 1. Analyze and prepare question | |
| processed_question = orchestrator.analyze_and_prepare_question(question, task_id) | |
| # 2. Determine tools to run | |
| task_context = orchestrator.state.current_task | |
| tools_to_run = orchestrator.determine_tools_to_run( | |
| task_context.question_type, | |
| task_context.has_file_attachment, | |
| bool(task_context.detected_urls) | |
| ) | |
| print(f"🎯 Tools to run: {', '.join(tools_to_run)}") | |
| # 3. Run tools | |
| tool_results = orchestrator.run_tools_sequence(tools_to_run, processed_question, task_id) | |
| # 4. Compile results | |
| successful_results = [r for r in tool_results if r.success] | |
| failed_results = [r for r in tool_results if not r.success] | |
| return { | |
| "processed_question": processed_question, | |
| "question_type": task_context.question_type, | |
| "tools_used": [r.tool_name for r in tool_results], | |
| "successful_tools": [r.tool_name for r in successful_results], | |
| "failed_tools": [r.tool_name for r in failed_results], | |
| "tool_results": tool_results, | |
| "cached_data": orchestrator.state.cached_data, | |
| "context_summary": orchestrator.state.generate_context_summary() | |
| } | |
| def get_best_available_content(tool_results: List[ToolResult]) -> Dict[str, Any]: | |
| """ | |
| Lấy nội dung tốt nhất từ tool results | |
| """ | |
| content = {} | |
| for result in tool_results: | |
| if result.success: | |
| if result.tool_name == "youtube_tool" and isinstance(result.result, dict): | |
| if result.result.get("has_youtube"): | |
| content["youtube"] = result.result | |
| elif result.tool_name == "wiki_search" and isinstance(result.result, dict): | |
| if result.result.get("success"): | |
| content["wikipedia"] = result.result | |
| elif result.tool_name in ["image_ocr", "audio_transcript", "file_reader"]: | |
| if isinstance(result.result, str) and len(result.result) > 50: | |
| content[result.tool_name] = result.result | |
| return content | |
| # Test function | |
| if __name__ == "__main__": | |
| # Test orchestrator | |
| test_question = "Who was Marie Curie?" | |
| result = process_question_with_tools(test_question) | |
| print("Orchestration result:") | |
| print(f"Question type: {result['question_type']}") | |
| print(f"Tools used: {result['tools_used']}") | |
| print(f"Successful: {result['successful_tools']}") |