Spaces:
Sleeping
Sleeping
File size: 8,600 Bytes
92d2175 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
"""
State Manager - Quản lý trạng thái và context của agent
"""
import json
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class ToolResult:
"""Kết quả từ một tool"""
tool_name: str
success: bool
result: Any
error_message: Optional[str] = None
execution_time: Optional[float] = None
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
@dataclass
class TaskContext:
"""Context của một task"""
task_id: str
question: str
original_question: str # Câu hỏi gốc (trước khi reverse)
question_type: str = "unknown" # youtube, image, audio, wiki, text, file, math
has_file_attachment: bool = False
detected_urls: List[str] = None
processed_question: str = "" # Câu hỏi sau khi xử lý
def __post_init__(self):
if self.detected_urls is None:
self.detected_urls = []
class AgentState:
"""Quản lý trạng thái của Agent"""
def __init__(self):
self.current_task: Optional[TaskContext] = None
self.tool_results: List[ToolResult] = []
self.conversation_history: List[Dict[str, Any]] = []
self.cached_data: Dict[str, Any] = {}
self.session_id: str = datetime.now().strftime("%Y%m%d_%H%M%S")
def start_new_task(self, task_id: str, question: str) -> TaskContext:
"""Bắt đầu task mới"""
self.current_task = TaskContext(
task_id=task_id,
question=question,
original_question=question
)
self.tool_results = [] # Reset tool results cho task mới
return self.current_task
def update_task_context(self, **kwargs) -> None:
"""Cập nhật context của task hiện tại"""
if self.current_task:
for key, value in kwargs.items():
if hasattr(self.current_task, key):
setattr(self.current_task, key, value)
def add_tool_result(self, tool_result: ToolResult) -> None:
"""Thêm kết quả tool"""
self.tool_results.append(tool_result)
# Cache một số kết quả quan trọng
if tool_result.success:
if tool_result.tool_name == "youtube_tool":
self.cached_data["youtube_content"] = tool_result.result
elif tool_result.tool_name == "wiki_search":
self.cached_data["wiki_content"] = tool_result.result
elif tool_result.tool_name == "image_ocr":
self.cached_data["image_text"] = tool_result.result
elif tool_result.tool_name == "audio_transcript":
self.cached_data["audio_text"] = tool_result.result
def get_tool_results(self, tool_name: Optional[str] = None) -> List[ToolResult]:
"""Lấy kết quả tools"""
if tool_name:
return [r for r in self.tool_results if r.tool_name == tool_name]
return self.tool_results
def has_successful_tool(self, tool_name: str) -> bool:
"""Kiểm tra xem tool đã chạy thành công chưa"""
return any(r.tool_name == tool_name and r.success for r in self.tool_results)
def get_cached_data(self, key: str) -> Any:
"""Lấy cached data"""
return self.cached_data.get(key)
def add_conversation_turn(self, role: str, content: str, metadata: Dict[str, Any] = None):
"""Thêm lượt hội thoại"""
turn = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
self.conversation_history.append(turn)
def get_conversation_context(self, max_turns: int = 5) -> List[Dict[str, Any]]:
"""Lấy context hội thoại gần nhất"""
return self.conversation_history[-max_turns:] if self.conversation_history else []
def generate_context_summary(self) -> str:
"""Tạo tóm tắt context cho AI"""
if not self.current_task:
return "No active task"
summary_parts = []
# Task info
summary_parts.append(f"Current Task: {self.current_task.task_id}")
summary_parts.append(f"Question Type: {self.current_task.question_type}")
summary_parts.append(f"Original Question: {self.current_task.original_question}")
if self.current_task.processed_question != self.current_task.original_question:
summary_parts.append(f"Processed Question: {self.current_task.processed_question}")
# Tools used
successful_tools = [r.tool_name for r in self.tool_results if r.success]
if successful_tools:
summary_parts.append(f"Successful Tools: {', '.join(set(successful_tools))}")
failed_tools = [r.tool_name for r in self.tool_results if not r.success]
if failed_tools:
summary_parts.append(f"Failed Tools: {', '.join(set(failed_tools))}")
# Available cached data
if self.cached_data:
summary_parts.append(f"Available Data: {', '.join(self.cached_data.keys())}")
return "\n".join(summary_parts)
def export_state(self) -> Dict[str, Any]:
"""Export state để debug hoặc logging"""
return {
"session_id": self.session_id,
"current_task": asdict(self.current_task) if self.current_task else None,
"tool_results": [asdict(r) for r in self.tool_results],
"conversation_history": self.conversation_history,
"cached_data_keys": list(self.cached_data.keys())
}
def clear_state(self):
"""Xóa state (cho task mới)"""
self.current_task = None
self.tool_results = []
self.cached_data = {}
# Giữ conversation_history để maintain context
# Singleton instance
_agent_state = AgentState()
def get_agent_state() -> AgentState:
"""Lấy global agent state"""
return _agent_state
def reset_agent_state():
"""Reset global agent state"""
global _agent_state
_agent_state = AgentState()
# Utility functions
def analyze_question_type(question: str) -> str:
"""Phân tích loại câu hỏi"""
# Type check and convert if needed
if not isinstance(question, str):
if hasattr(question, 'get'): # Is dict-like
question = str(question.get('question', question.get('content', str(question))))
else:
question = str(question)
question_lower = question.lower()
# Check for URLs/file attachments
if "youtube.com" in question or "youtu.be" in question:
return "youtube"
elif "http" in question and any(ext in question for ext in ['.jpg', '.png', '.pdf', '.doc']):
return "file_url"
elif any(word in question_lower for word in ['image', 'picture', 'photo', 'diagram']):
return "image"
elif any(word in question_lower for word in ['audio', 'sound', 'voice', 'music']):
return "audio"
elif any(word in question_lower for word in ['who is', 'what is', 'when was', 'where is']):
return "wiki"
elif any(word in question_lower for word in ['calculate', 'solve', 'math', 'equation']):
return "math"
elif "file:" in question_lower or "attachment" in question_lower:
return "file"
else:
return "text"
def detect_urls_in_question(question: str) -> List[str]:
"""Detect URLs trong câu hỏi"""
# Type check and convert if needed
if not isinstance(question, str):
if hasattr(question, 'get'): # Is dict-like
question = str(question.get('question', question.get('content', str(question))))
else:
question = str(question)
import re
url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
return re.findall(url_pattern, question)
# Test functions
if __name__ == "__main__":
# Test state manager
state = get_agent_state()
# Test task
task = state.start_new_task("test_001", "Who is Marie Curie?")
print("Task created:", task.task_id)
# Test tool result
result = ToolResult(
tool_name="wiki_search",
success=True,
result={"title": "Marie Curie", "summary": "Polish physicist..."}
)
state.add_tool_result(result)
print("State summary:")
print(state.generate_context_summary()) |