final_agent_course / utils /tool_orchestrator.py
tuan3335's picture
use langchain
040a6c6
"""
Tool Orchestrator - Điều phối và quản lý việc sử dụng tools
"""
import time
from typing import Dict, Any, List, Optional, Callable
from .state_manager import AgentState, ToolResult, get_agent_state, analyze_question_type, detect_urls_in_question
from .text_tool import reverse_text_if_needed
from .youtube_tool import YouTubeTool
from .image_tool import ocr_image_with_nanonets, fallback_ocr_image
from .audio_tool import transcribe_audio_groq, fallback_audio_info
from .wiki_tool import search_wikipedia_from_question
from .file_tool import read_file_content
from .prompts import get_system_prompt, get_tool_prompt
class ToolOrchestrator:
"""Điều phối và quản lý tools"""
def __init__(self):
self.state = get_agent_state()
self.tools_registry = {
"text_processor": self._run_text_processor,
"youtube_tool": self._run_youtube_tool,
"image_ocr": self._run_image_ocr,
"audio_transcript": self._run_audio_transcript,
"wiki_search": self._run_wiki_search,
"file_reader": self._run_file_reader
}
def analyze_and_prepare_question(self, question: str, task_id: str = "") -> str:
"""
Phân tích và chuẩn bị câu hỏi
"""
# Start new task
task_context = self.state.start_new_task(task_id or f"task_{int(time.time())}", question)
# Detect URLs
urls = detect_urls_in_question(question)
task_context.detected_urls = urls
# Check if text might be reversed
text_analysis = reverse_text_if_needed(question)
if isinstance(text_analysis, dict):
processed_question = text_analysis.get("processed_text", question)
else:
processed_question = text_analysis if text_analysis else question
task_context.processed_question = processed_question
# Analyze question type
question_type = analyze_question_type(processed_question)
task_context.question_type = question_type
# Check for file attachments (giả sử có task_id thì có file)
task_context.has_file_attachment = bool(task_id)
# Update state
self.state.update_task_context(
question_type=question_type,
processed_question=processed_question,
detected_urls=urls,
has_file_attachment=bool(task_id)
)
print(f"📋 Question Analysis:")
print(f" Type: {question_type}")
print(f" URLs found: {len(urls)}")
print(f" Has file: {task_context.has_file_attachment}")
print(f" Processed: {processed_question != question}")
return processed_question
def determine_tools_to_run(self, question_type: str, has_file: bool, has_urls: bool) -> List[str]:
"""
Xác định tools cần chạy dựa trên question type - FIXED LOGIC
"""
tools_to_run = []
# Tools theo loại câu hỏi từ AI analysis
if question_type == "youtube":
tools_to_run.append("youtube_tool")
elif question_type == "image":
tools_to_run.append("image_ocr")
elif question_type == "audio":
tools_to_run.append("audio_transcript")
elif question_type == "wiki":
tools_to_run.append("wiki_search")
elif question_type == "file":
tools_to_run.append("file_reader")
elif question_type == "text":
tools_to_run.append("text_processor")
# Fallback cho math hoặc unknown
if question_type in ["math", "unknown"] or not tools_to_run:
tools_to_run.append("text_processor")
print(f"🎯 Tools to run: {tools_to_run}")
return tools_to_run
def run_tools_sequence(self, tools_list: List[str], question: str, task_id: str = "") -> List[ToolResult]:
"""
Chạy sequence các tools
"""
results = []
for tool_name in tools_list:
print(f"🔧 Running tool: {tool_name}")
start_time = time.time()
try:
if tool_name in self.tools_registry:
result = self.tools_registry[tool_name](question, task_id)
execution_time = time.time() - start_time
tool_result = ToolResult(
tool_name=tool_name,
success=result.get("success", True) if isinstance(result, dict) else bool(result),
result=result,
execution_time=execution_time
)
# Nếu tool thành công và có kết quả hữu ích, có thể skip các tools khác
if tool_result.success and self._is_result_sufficient(tool_result, question):
print(f"✅ {tool_name} provided sufficient information, skipping remaining tools")
results.append(tool_result)
self.state.add_tool_result(tool_result)
break
else:
tool_result = ToolResult(
tool_name=tool_name,
success=False,
result=None,
error_message=f"Unknown tool: {tool_name}"
)
results.append(tool_result)
self.state.add_tool_result(tool_result)
print(f" Completed in {execution_time:.2f}s")
except Exception as e:
execution_time = time.time() - start_time
tool_result = ToolResult(
tool_name=tool_name,
success=False,
result=None,
error_message=str(e),
execution_time=execution_time
)
results.append(tool_result)
self.state.add_tool_result(tool_result)
print(f"❌ {tool_name} failed: {str(e)}")
return results
def _is_result_sufficient(self, tool_result: ToolResult, question: str) -> bool:
"""
Kiểm tra xem kết quả tool có đủ để trả lời câu hỏi không
"""
if not tool_result.success:
return False
result = tool_result.result
# Check for content length and quality
if isinstance(result, dict):
if tool_result.tool_name == "youtube_tool":
return result.get("has_youtube") and (result.get("transcript") or result.get("title"))
elif tool_result.tool_name == "wiki_search":
return result.get("success") and len(result.get("summary", "")) > 100
elif tool_result.tool_name == "image_ocr":
return len(str(result)) > 50 and not result.startswith("Error")
elif tool_result.tool_name == "audio_transcript":
return len(str(result)) > 50 and not result.startswith("Error")
elif isinstance(result, str):
return len(result) > 50 and not result.startswith("Error")
return False
# Tool execution methods
def _run_text_processor(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Run text processing"""
text_analysis = reverse_text_if_needed(question)
if isinstance(text_analysis, dict):
processed = text_analysis.get("processed_text", question)
was_reversed = text_analysis.get("should_reverse", False)
else:
processed = text_analysis if text_analysis else question
was_reversed = processed != question
return {
"success": True,
"original": question,
"processed": processed,
"was_reversed": was_reversed,
"analysis": text_analysis if isinstance(text_analysis, dict) else None
}
def _run_youtube_tool(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Run YouTube content extraction"""
try:
youtube_tool = YouTubeTool()
result = youtube_tool.process_youtube(question)
return result.get("data", {}) if result.get("success") else {"success": False, "error": result.get("error", "Unknown error")}
except Exception as e:
return {"success": False, "error": str(e)}
def _run_image_ocr(self, question: str, task_id: str = "") -> str:
"""Run image OCR"""
try:
if task_id:
result = ocr_image_with_nanonets(task_id=task_id)
else:
# Không có task_id, không thể OCR
result = "No image file provided for OCR"
return result
except Exception as e:
# Fallback to basic image info
return fallback_ocr_image(task_id=task_id)
def _run_audio_transcript(self, question: str, task_id: str = "") -> str:
"""Run audio transcription"""
try:
if task_id:
result = transcribe_audio_groq(task_id=task_id)
else:
result = "No audio file provided for transcription"
return result
except Exception as e:
# Fallback to basic audio info
return fallback_audio_info(task_id=task_id)
def _run_wiki_search(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Run Wikipedia search"""
try:
result = search_wikipedia_from_question(question)
return result
except Exception as e:
return {"success": False, "error": str(e)}
def _run_file_reader(self, question: str, task_id: str = "") -> str:
"""Run file content reading"""
try:
if task_id:
result = read_file_content(task_id=task_id)
else:
result = "No file provided for reading"
return result
except Exception as e:
return f"File reading error: {str(e)}"
# Orchestrator functions
def process_question_with_tools(question: str, task_id: str = "") -> Dict[str, Any]:
"""
Main function: Xử lý câu hỏi với tools phù hợp
"""
orchestrator = ToolOrchestrator()
# 1. Analyze and prepare question
processed_question = orchestrator.analyze_and_prepare_question(question, task_id)
# 2. Determine tools to run
task_context = orchestrator.state.current_task
tools_to_run = orchestrator.determine_tools_to_run(
task_context.question_type,
task_context.has_file_attachment,
bool(task_context.detected_urls)
)
print(f"🎯 Tools to run: {', '.join(tools_to_run)}")
# 3. Run tools
tool_results = orchestrator.run_tools_sequence(tools_to_run, processed_question, task_id)
# 4. Compile results
successful_results = [r for r in tool_results if r.success]
failed_results = [r for r in tool_results if not r.success]
return {
"processed_question": processed_question,
"question_type": task_context.question_type,
"tools_used": [r.tool_name for r in tool_results],
"successful_tools": [r.tool_name for r in successful_results],
"failed_tools": [r.tool_name for r in failed_results],
"tool_results": tool_results,
"cached_data": orchestrator.state.cached_data,
"context_summary": orchestrator.state.generate_context_summary()
}
def get_best_available_content(tool_results: List[ToolResult]) -> Dict[str, Any]:
"""
Lấy nội dung tốt nhất từ tool results
"""
content = {}
for result in tool_results:
if result.success:
if result.tool_name == "youtube_tool" and isinstance(result.result, dict):
if result.result.get("has_youtube"):
content["youtube"] = result.result
elif result.tool_name == "wiki_search" and isinstance(result.result, dict):
if result.result.get("success"):
content["wikipedia"] = result.result
elif result.tool_name in ["image_ocr", "audio_transcript", "file_reader"]:
if isinstance(result.result, str) and len(result.result) > 50:
content[result.tool_name] = result.result
return content
# Test function
if __name__ == "__main__":
# Test orchestrator
test_question = "Who was Marie Curie?"
result = process_question_with_tools(test_question)
print("Orchestration result:")
print(f"Question type: {result['question_type']}")
print(f"Tools used: {result['tools_used']}")
print(f"Successful: {result['successful_tools']}")