File size: 8,600 Bytes
92d2175
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
State Manager - Quản lý trạng thái và context của agent
"""

import json
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class ToolResult:
    """Kết quả từ một tool"""
    tool_name: str
    success: bool
    result: Any
    error_message: Optional[str] = None
    execution_time: Optional[float] = None
    timestamp: str = ""
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()

@dataclass 
class TaskContext:
    """Context của một task"""
    task_id: str
    question: str
    original_question: str  # Câu hỏi gốc (trước khi reverse)
    question_type: str = "unknown"  # youtube, image, audio, wiki, text, file, math
    has_file_attachment: bool = False
    detected_urls: List[str] = None
    processed_question: str = ""  # Câu hỏi sau khi xử lý
    
    def __post_init__(self):
        if self.detected_urls is None:
            self.detected_urls = []

class AgentState:
    """Quản lý trạng thái của Agent"""
    
    def __init__(self):
        self.current_task: Optional[TaskContext] = None
        self.tool_results: List[ToolResult] = []
        self.conversation_history: List[Dict[str, Any]] = []
        self.cached_data: Dict[str, Any] = {}
        self.session_id: str = datetime.now().strftime("%Y%m%d_%H%M%S")
        
    def start_new_task(self, task_id: str, question: str) -> TaskContext:
        """Bắt đầu task mới"""
        self.current_task = TaskContext(
            task_id=task_id,
            question=question,
            original_question=question
        )
        self.tool_results = []  # Reset tool results cho task mới
        return self.current_task
    
    def update_task_context(self, **kwargs) -> None:
        """Cập nhật context của task hiện tại"""
        if self.current_task:
            for key, value in kwargs.items():
                if hasattr(self.current_task, key):
                    setattr(self.current_task, key, value)
    
    def add_tool_result(self, tool_result: ToolResult) -> None:
        """Thêm kết quả tool"""
        self.tool_results.append(tool_result)
        
        # Cache một số kết quả quan trọng
        if tool_result.success:
            if tool_result.tool_name == "youtube_tool":
                self.cached_data["youtube_content"] = tool_result.result
            elif tool_result.tool_name == "wiki_search":
                self.cached_data["wiki_content"] = tool_result.result
            elif tool_result.tool_name == "image_ocr":
                self.cached_data["image_text"] = tool_result.result
            elif tool_result.tool_name == "audio_transcript":
                self.cached_data["audio_text"] = tool_result.result
    
    def get_tool_results(self, tool_name: Optional[str] = None) -> List[ToolResult]:
        """Lấy kết quả tools"""
        if tool_name:
            return [r for r in self.tool_results if r.tool_name == tool_name]
        return self.tool_results
    
    def has_successful_tool(self, tool_name: str) -> bool:
        """Kiểm tra xem tool đã chạy thành công chưa"""
        return any(r.tool_name == tool_name and r.success for r in self.tool_results)
    
    def get_cached_data(self, key: str) -> Any:
        """Lấy cached data"""
        return self.cached_data.get(key)
    
    def add_conversation_turn(self, role: str, content: str, metadata: Dict[str, Any] = None):
        """Thêm lượt hội thoại"""
        turn = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            "metadata": metadata or {}
        }
        self.conversation_history.append(turn)
    
    def get_conversation_context(self, max_turns: int = 5) -> List[Dict[str, Any]]:
        """Lấy context hội thoại gần nhất"""
        return self.conversation_history[-max_turns:] if self.conversation_history else []
    
    def generate_context_summary(self) -> str:
        """Tạo tóm tắt context cho AI"""
        if not self.current_task:
            return "No active task"
        
        summary_parts = []
        
        # Task info
        summary_parts.append(f"Current Task: {self.current_task.task_id}")
        summary_parts.append(f"Question Type: {self.current_task.question_type}")
        summary_parts.append(f"Original Question: {self.current_task.original_question}")
        
        if self.current_task.processed_question != self.current_task.original_question:
            summary_parts.append(f"Processed Question: {self.current_task.processed_question}")
        
        # Tools used
        successful_tools = [r.tool_name for r in self.tool_results if r.success]
        if successful_tools:
            summary_parts.append(f"Successful Tools: {', '.join(set(successful_tools))}")
        
        failed_tools = [r.tool_name for r in self.tool_results if not r.success]
        if failed_tools:
            summary_parts.append(f"Failed Tools: {', '.join(set(failed_tools))}")
        
        # Available cached data
        if self.cached_data:
            summary_parts.append(f"Available Data: {', '.join(self.cached_data.keys())}")
        
        return "\n".join(summary_parts)
    
    def export_state(self) -> Dict[str, Any]:
        """Export state để debug hoặc logging"""
        return {
            "session_id": self.session_id,
            "current_task": asdict(self.current_task) if self.current_task else None,
            "tool_results": [asdict(r) for r in self.tool_results],
            "conversation_history": self.conversation_history,
            "cached_data_keys": list(self.cached_data.keys())
        }
    
    def clear_state(self):
        """Xóa state (cho task mới)"""
        self.current_task = None
        self.tool_results = []
        self.cached_data = {}
        # Giữ conversation_history để maintain context

# Singleton instance
_agent_state = AgentState()

def get_agent_state() -> AgentState:
    """Lấy global agent state"""
    return _agent_state

def reset_agent_state():
    """Reset global agent state"""
    global _agent_state
    _agent_state = AgentState()

# Utility functions
def analyze_question_type(question: str) -> str:
    """Phân tích loại câu hỏi"""
    # Type check and convert if needed
    if not isinstance(question, str):
        if hasattr(question, 'get'):  # Is dict-like
            question = str(question.get('question', question.get('content', str(question))))
        else:
            question = str(question)
    
    question_lower = question.lower()
    
    # Check for URLs/file attachments
    if "youtube.com" in question or "youtu.be" in question:
        return "youtube"
    elif "http" in question and any(ext in question for ext in ['.jpg', '.png', '.pdf', '.doc']):
        return "file_url"
    elif any(word in question_lower for word in ['image', 'picture', 'photo', 'diagram']):
        return "image"
    elif any(word in question_lower for word in ['audio', 'sound', 'voice', 'music']):
        return "audio"
    elif any(word in question_lower for word in ['who is', 'what is', 'when was', 'where is']):
        return "wiki"
    elif any(word in question_lower for word in ['calculate', 'solve', 'math', 'equation']):
        return "math"
    elif "file:" in question_lower or "attachment" in question_lower:
        return "file"
    else:
        return "text"

def detect_urls_in_question(question: str) -> List[str]:
    """Detect URLs trong câu hỏi"""
    # Type check and convert if needed
    if not isinstance(question, str):
        if hasattr(question, 'get'):  # Is dict-like
            question = str(question.get('question', question.get('content', str(question))))
        else:
            question = str(question)
    
    import re
    url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
    return re.findall(url_pattern, question)

# Test functions
if __name__ == "__main__":
    # Test state manager
    state = get_agent_state()
    
    # Test task
    task = state.start_new_task("test_001", "Who is Marie Curie?")
    print("Task created:", task.task_id)
    
    # Test tool result
    result = ToolResult(
        tool_name="wiki_search",
        success=True,
        result={"title": "Marie Curie", "summary": "Polish physicist..."}
    )
    state.add_tool_result(result)
    
    print("State summary:")
    print(state.generate_context_summary())