Spaces:
Sleeping
Sleeping
File size: 13,005 Bytes
92d2175 040a6c6 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 a9b5cb5 92d2175 040a6c6 92d2175 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
"""
Tool Orchestrator - Điều phối và quản lý việc sử dụng tools
"""
import time
from typing import Dict, Any, List, Optional, Callable
from .state_manager import AgentState, ToolResult, get_agent_state, analyze_question_type, detect_urls_in_question
from .text_tool import reverse_text_if_needed
from .youtube_tool import YouTubeTool
from .image_tool import ocr_image_with_nanonets, fallback_ocr_image
from .audio_tool import transcribe_audio_groq, fallback_audio_info
from .wiki_tool import search_wikipedia_from_question
from .file_tool import read_file_content
from .prompts import get_system_prompt, get_tool_prompt
class ToolOrchestrator:
"""Điều phối và quản lý tools"""
def __init__(self):
self.state = get_agent_state()
self.tools_registry = {
"text_processor": self._run_text_processor,
"youtube_tool": self._run_youtube_tool,
"image_ocr": self._run_image_ocr,
"audio_transcript": self._run_audio_transcript,
"wiki_search": self._run_wiki_search,
"file_reader": self._run_file_reader
}
def analyze_and_prepare_question(self, question: str, task_id: str = "") -> str:
"""
Phân tích và chuẩn bị câu hỏi
"""
# Start new task
task_context = self.state.start_new_task(task_id or f"task_{int(time.time())}", question)
# Detect URLs
urls = detect_urls_in_question(question)
task_context.detected_urls = urls
# Check if text might be reversed
text_analysis = reverse_text_if_needed(question)
if isinstance(text_analysis, dict):
processed_question = text_analysis.get("processed_text", question)
else:
processed_question = text_analysis if text_analysis else question
task_context.processed_question = processed_question
# Analyze question type
question_type = analyze_question_type(processed_question)
task_context.question_type = question_type
# Check for file attachments (giả sử có task_id thì có file)
task_context.has_file_attachment = bool(task_id)
# Update state
self.state.update_task_context(
question_type=question_type,
processed_question=processed_question,
detected_urls=urls,
has_file_attachment=bool(task_id)
)
print(f"📋 Question Analysis:")
print(f" Type: {question_type}")
print(f" URLs found: {len(urls)}")
print(f" Has file: {task_context.has_file_attachment}")
print(f" Processed: {processed_question != question}")
return processed_question
def determine_tools_to_run(self, question_type: str, has_file: bool, has_urls: bool) -> List[str]:
"""
Xác định tools cần chạy dựa trên question type - FIXED LOGIC
"""
tools_to_run = []
# Tools theo loại câu hỏi từ AI analysis
if question_type == "youtube":
tools_to_run.append("youtube_tool")
elif question_type == "image":
tools_to_run.append("image_ocr")
elif question_type == "audio":
tools_to_run.append("audio_transcript")
elif question_type == "wiki":
tools_to_run.append("wiki_search")
elif question_type == "file":
tools_to_run.append("file_reader")
elif question_type == "text":
tools_to_run.append("text_processor")
# Fallback cho math hoặc unknown
if question_type in ["math", "unknown"] or not tools_to_run:
tools_to_run.append("text_processor")
print(f"🎯 Tools to run: {tools_to_run}")
return tools_to_run
def run_tools_sequence(self, tools_list: List[str], question: str, task_id: str = "") -> List[ToolResult]:
"""
Chạy sequence các tools
"""
results = []
for tool_name in tools_list:
print(f"🔧 Running tool: {tool_name}")
start_time = time.time()
try:
if tool_name in self.tools_registry:
result = self.tools_registry[tool_name](question, task_id)
execution_time = time.time() - start_time
tool_result = ToolResult(
tool_name=tool_name,
success=result.get("success", True) if isinstance(result, dict) else bool(result),
result=result,
execution_time=execution_time
)
# Nếu tool thành công và có kết quả hữu ích, có thể skip các tools khác
if tool_result.success and self._is_result_sufficient(tool_result, question):
print(f"✅ {tool_name} provided sufficient information, skipping remaining tools")
results.append(tool_result)
self.state.add_tool_result(tool_result)
break
else:
tool_result = ToolResult(
tool_name=tool_name,
success=False,
result=None,
error_message=f"Unknown tool: {tool_name}"
)
results.append(tool_result)
self.state.add_tool_result(tool_result)
print(f" Completed in {execution_time:.2f}s")
except Exception as e:
execution_time = time.time() - start_time
tool_result = ToolResult(
tool_name=tool_name,
success=False,
result=None,
error_message=str(e),
execution_time=execution_time
)
results.append(tool_result)
self.state.add_tool_result(tool_result)
print(f"❌ {tool_name} failed: {str(e)}")
return results
def _is_result_sufficient(self, tool_result: ToolResult, question: str) -> bool:
"""
Kiểm tra xem kết quả tool có đủ để trả lời câu hỏi không
"""
if not tool_result.success:
return False
result = tool_result.result
# Check for content length and quality
if isinstance(result, dict):
if tool_result.tool_name == "youtube_tool":
return result.get("has_youtube") and (result.get("transcript") or result.get("title"))
elif tool_result.tool_name == "wiki_search":
return result.get("success") and len(result.get("summary", "")) > 100
elif tool_result.tool_name == "image_ocr":
return len(str(result)) > 50 and not result.startswith("Error")
elif tool_result.tool_name == "audio_transcript":
return len(str(result)) > 50 and not result.startswith("Error")
elif isinstance(result, str):
return len(result) > 50 and not result.startswith("Error")
return False
# Tool execution methods
def _run_text_processor(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Run text processing"""
text_analysis = reverse_text_if_needed(question)
if isinstance(text_analysis, dict):
processed = text_analysis.get("processed_text", question)
was_reversed = text_analysis.get("should_reverse", False)
else:
processed = text_analysis if text_analysis else question
was_reversed = processed != question
return {
"success": True,
"original": question,
"processed": processed,
"was_reversed": was_reversed,
"analysis": text_analysis if isinstance(text_analysis, dict) else None
}
def _run_youtube_tool(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Run YouTube content extraction"""
try:
youtube_tool = YouTubeTool()
result = youtube_tool.process_youtube(question)
return result.get("data", {}) if result.get("success") else {"success": False, "error": result.get("error", "Unknown error")}
except Exception as e:
return {"success": False, "error": str(e)}
def _run_image_ocr(self, question: str, task_id: str = "") -> str:
"""Run image OCR"""
try:
if task_id:
result = ocr_image_with_nanonets(task_id=task_id)
else:
# Không có task_id, không thể OCR
result = "No image file provided for OCR"
return result
except Exception as e:
# Fallback to basic image info
return fallback_ocr_image(task_id=task_id)
def _run_audio_transcript(self, question: str, task_id: str = "") -> str:
"""Run audio transcription"""
try:
if task_id:
result = transcribe_audio_groq(task_id=task_id)
else:
result = "No audio file provided for transcription"
return result
except Exception as e:
# Fallback to basic audio info
return fallback_audio_info(task_id=task_id)
def _run_wiki_search(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""Run Wikipedia search"""
try:
result = search_wikipedia_from_question(question)
return result
except Exception as e:
return {"success": False, "error": str(e)}
def _run_file_reader(self, question: str, task_id: str = "") -> str:
"""Run file content reading"""
try:
if task_id:
result = read_file_content(task_id=task_id)
else:
result = "No file provided for reading"
return result
except Exception as e:
return f"File reading error: {str(e)}"
# Orchestrator functions
def process_question_with_tools(question: str, task_id: str = "") -> Dict[str, Any]:
"""
Main function: Xử lý câu hỏi với tools phù hợp
"""
orchestrator = ToolOrchestrator()
# 1. Analyze and prepare question
processed_question = orchestrator.analyze_and_prepare_question(question, task_id)
# 2. Determine tools to run
task_context = orchestrator.state.current_task
tools_to_run = orchestrator.determine_tools_to_run(
task_context.question_type,
task_context.has_file_attachment,
bool(task_context.detected_urls)
)
print(f"🎯 Tools to run: {', '.join(tools_to_run)}")
# 3. Run tools
tool_results = orchestrator.run_tools_sequence(tools_to_run, processed_question, task_id)
# 4. Compile results
successful_results = [r for r in tool_results if r.success]
failed_results = [r for r in tool_results if not r.success]
return {
"processed_question": processed_question,
"question_type": task_context.question_type,
"tools_used": [r.tool_name for r in tool_results],
"successful_tools": [r.tool_name for r in successful_results],
"failed_tools": [r.tool_name for r in failed_results],
"tool_results": tool_results,
"cached_data": orchestrator.state.cached_data,
"context_summary": orchestrator.state.generate_context_summary()
}
def get_best_available_content(tool_results: List[ToolResult]) -> Dict[str, Any]:
"""
Lấy nội dung tốt nhất từ tool results
"""
content = {}
for result in tool_results:
if result.success:
if result.tool_name == "youtube_tool" and isinstance(result.result, dict):
if result.result.get("has_youtube"):
content["youtube"] = result.result
elif result.tool_name == "wiki_search" and isinstance(result.result, dict):
if result.result.get("success"):
content["wikipedia"] = result.result
elif result.tool_name in ["image_ocr", "audio_transcript", "file_reader"]:
if isinstance(result.result, str) and len(result.result) > 50:
content[result.tool_name] = result.result
return content
# Test function
if __name__ == "__main__":
# Test orchestrator
test_question = "Who was Marie Curie?"
result = process_question_with_tools(test_question)
print("Orchestration result:")
print(f"Question type: {result['question_type']}")
print(f"Tools used: {result['tools_used']}")
print(f"Successful: {result['successful_tools']}") |