Spaces:
Runtime error
Runtime error
| """ | |
| Conversation Memory - Shared state across all agents | |
| Allows agents to remember user data and coordinate with each other | |
| """ | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| import json | |
| class ConversationMemory: | |
| """ | |
| Shared memory system for all agents | |
| Stores user profile, extracted data, and conversation state | |
| Supports session persistence across app restarts | |
| """ | |
| def __init__(self, user_id: Optional[str] = None, session_store=None): | |
| # User profile data (shared across all agents) | |
| self.user_profile = { | |
| 'age': None, | |
| 'gender': None, | |
| 'weight': None, | |
| 'height': None, | |
| 'bmi': None, | |
| 'activity_level': None, | |
| 'fitness_level': None, | |
| 'health_conditions': [], | |
| 'medications': [], | |
| 'allergies': [], | |
| 'dietary_restrictions': [] | |
| } | |
| # Agent-specific extracted data | |
| self.extracted_data = { | |
| 'nutrition': {}, | |
| 'exercise': {}, | |
| 'symptom': {}, | |
| 'mental_health': {}, | |
| 'general_health': {} | |
| } | |
| # Conversation state | |
| self.conversation_state = { | |
| 'current_topic': None, | |
| 'current_agent': None, | |
| 'previous_agent': None, | |
| 'data_collected': {}, | |
| 'pending_questions': [], | |
| 'conversation_flow': [] | |
| } | |
| # Metadata | |
| self.metadata = { | |
| 'session_id': None, | |
| 'user_id': user_id, | |
| 'started_at': datetime.now().isoformat(), | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| # Session persistence | |
| self.user_id = user_id | |
| self.session_store = session_store | |
| self.auto_save = True # Auto-save on updates | |
| # Load existing session if user_id provided | |
| if user_id and session_store: | |
| self._load_session() | |
| # ===== User Profile Management ===== | |
| def update_profile(self, key: str, value: Any) -> None: | |
| """Update user profile data""" | |
| if key in self.user_profile: | |
| self.user_profile[key] = value | |
| self.metadata['last_updated'] = datetime.now().isoformat() | |
| # Auto-save if enabled | |
| if self.auto_save and self.user_id and self.session_store: | |
| self._save_session() | |
| def get_profile(self, key: str) -> Any: | |
| """Get user profile data""" | |
| return self.user_profile.get(key) | |
| def get_full_profile(self) -> Dict[str, Any]: | |
| """Get complete user profile""" | |
| return self.user_profile.copy() | |
| def get_missing_fields(self, required_fields: List[str]) -> List[str]: | |
| """Check what required fields are still missing""" | |
| return [field for field in required_fields | |
| if not self.user_profile.get(field)] | |
| def has_complete_profile(self, required_fields: List[str]) -> bool: | |
| """Check if all required fields are filled""" | |
| return len(self.get_missing_fields(required_fields)) == 0 | |
| # ===== Agent Data Management ===== | |
| def add_agent_data(self, agent_name: str, key: str, value: Any) -> None: | |
| """Add agent-specific data""" | |
| if agent_name not in self.extracted_data: | |
| self.extracted_data[agent_name] = {} | |
| self.extracted_data[agent_name][key] = value | |
| self.metadata['last_updated'] = datetime.now().isoformat() | |
| def get_agent_data(self, agent_name: str, key: str = None) -> Any: | |
| """Get agent-specific data""" | |
| agent_data = self.extracted_data.get(agent_name, {}) | |
| if key: | |
| return agent_data.get(key) | |
| return agent_data | |
| def get_all_agent_data(self) -> Dict[str, Any]: | |
| """Get all agent data""" | |
| return self.extracted_data.copy() | |
| # ===== Conversation State Management ===== | |
| def set_current_agent(self, agent_name: str) -> None: | |
| """Set current active agent""" | |
| self.conversation_state['previous_agent'] = self.conversation_state['current_agent'] | |
| self.conversation_state['current_agent'] = agent_name | |
| # Add to conversation flow | |
| self.conversation_state['conversation_flow'].append({ | |
| 'agent': agent_name, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| def get_current_agent(self) -> Optional[str]: | |
| """Get current active agent""" | |
| return self.conversation_state['current_agent'] | |
| def get_previous_agent(self) -> Optional[str]: | |
| """Get previous agent""" | |
| return self.conversation_state['previous_agent'] | |
| def set_current_topic(self, topic: str) -> None: | |
| """Set current conversation topic""" | |
| self.conversation_state['current_topic'] = topic | |
| def get_current_topic(self) -> Optional[str]: | |
| """Get current conversation topic""" | |
| return self.conversation_state['current_topic'] | |
| def add_pending_question(self, question: str, priority: int = 0) -> None: | |
| """Add a pending question to ask user""" | |
| self.conversation_state['pending_questions'].append({ | |
| 'question': question, | |
| 'priority': priority, | |
| 'added_at': datetime.now().isoformat() | |
| }) | |
| # Sort by priority (higher first) | |
| self.conversation_state['pending_questions'].sort( | |
| key=lambda x: x['priority'], | |
| reverse=True | |
| ) | |
| def get_next_pending_question(self) -> Optional[str]: | |
| """Get next pending question""" | |
| if self.conversation_state['pending_questions']: | |
| return self.conversation_state['pending_questions'][0]['question'] | |
| return None | |
| def clear_pending_questions(self) -> None: | |
| """Clear all pending questions""" | |
| self.conversation_state['pending_questions'] = [] | |
| def get_conversation_flow(self) -> List[Dict[str, Any]]: | |
| """Get conversation flow history""" | |
| return self.conversation_state['conversation_flow'] | |
| # ===== Context Summary ===== | |
| def get_context_summary(self) -> str: | |
| """Get a summary of current context for agents""" | |
| summary_parts = [] | |
| # User profile summary | |
| profile = self.user_profile | |
| if profile['age']: | |
| summary_parts.append(f"User: {profile['age']} tuổi") | |
| if profile['gender']: | |
| summary_parts.append(f"giới tính {profile['gender']}") | |
| if profile['weight'] and profile['height']: | |
| summary_parts.append(f"{profile['weight']}kg, {profile['height']}cm") | |
| # Current topic | |
| if self.conversation_state['current_topic']: | |
| summary_parts.append(f"Topic: {self.conversation_state['current_topic']}") | |
| # Previous agent | |
| if self.conversation_state['previous_agent']: | |
| summary_parts.append(f"Previous agent: {self.conversation_state['previous_agent']}") | |
| return " | ".join(summary_parts) if summary_parts else "No context yet" | |
| # ===== Serialization ===== | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert memory to dictionary""" | |
| return { | |
| 'user_profile': self.user_profile, | |
| 'extracted_data': self.extracted_data, | |
| 'conversation_state': self.conversation_state, | |
| 'metadata': self.metadata | |
| } | |
| def to_json(self) -> str: | |
| """Convert memory to JSON string""" | |
| return json.dumps(self.to_dict(), ensure_ascii=False, indent=2) | |
| def from_dict(cls, data: Dict[str, Any]) -> 'ConversationMemory': | |
| """Create memory from dictionary""" | |
| memory = cls() | |
| memory.user_profile = data.get('user_profile', memory.user_profile) | |
| memory.extracted_data = data.get('extracted_data', memory.extracted_data) | |
| memory.conversation_state = data.get('conversation_state', memory.conversation_state) | |
| memory.metadata = data.get('metadata', memory.metadata) | |
| return memory | |
| # ===== Session Persistence ===== | |
| def _save_session(self) -> None: | |
| """Save current memory state to session store""" | |
| if not self.user_id or not self.session_store: | |
| return | |
| session_data = self.to_dict() | |
| self.session_store.save_session(self.user_id, session_data) | |
| def _load_session(self) -> bool: | |
| """ | |
| Load memory state from session store | |
| Returns: | |
| True if session loaded, False otherwise | |
| """ | |
| if not self.user_id or not self.session_store: | |
| return False | |
| session_data = self.session_store.load_session(self.user_id) | |
| if session_data: | |
| self.user_profile = session_data.get('user_profile', self.user_profile) | |
| self.extracted_data = session_data.get('extracted_data', self.extracted_data) | |
| self.conversation_state = session_data.get('conversation_state', self.conversation_state) | |
| self.metadata = session_data.get('metadata', self.metadata) | |
| print(f"✅ Loaded session for user {self.user_id}") | |
| return True | |
| print(f"ℹ️ No existing session found for user {self.user_id}, starting fresh") | |
| return False | |
| def save_session_now(self) -> None: | |
| """Manually save session (useful when auto_save is disabled)""" | |
| self._save_session() | |
| def clear_session(self) -> None: | |
| """Clear session from storage""" | |
| if self.user_id and self.session_store: | |
| self.session_store.delete_session(self.user_id) | |
| # ===== Utility Methods ===== | |
| def clear(self) -> None: | |
| """Clear all memory (start fresh conversation)""" | |
| self.__init__() | |
| def __repr__(self) -> str: | |
| return f"<ConversationMemory: {self.get_context_summary()}>" | |