my-gradio-app / utils /memory.py
Nguyen Trong Lap
Recreate history without binary blobs
eeb0f9c
"""
Conversation Memory - Shared state across all agents
Allows agents to remember user data and coordinate with each other
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
class ConversationMemory:
"""
Shared memory system for all agents
Stores user profile, extracted data, and conversation state
Supports session persistence across app restarts
"""
def __init__(self, user_id: Optional[str] = None, session_store=None):
# User profile data (shared across all agents)
self.user_profile = {
'age': None,
'gender': None,
'weight': None,
'height': None,
'bmi': None,
'activity_level': None,
'fitness_level': None,
'health_conditions': [],
'medications': [],
'allergies': [],
'dietary_restrictions': []
}
# Agent-specific extracted data
self.extracted_data = {
'nutrition': {},
'exercise': {},
'symptom': {},
'mental_health': {},
'general_health': {}
}
# Conversation state
self.conversation_state = {
'current_topic': None,
'current_agent': None,
'previous_agent': None,
'data_collected': {},
'pending_questions': [],
'conversation_flow': []
}
# Metadata
self.metadata = {
'session_id': None,
'user_id': user_id,
'started_at': datetime.now().isoformat(),
'last_updated': datetime.now().isoformat()
}
# Session persistence
self.user_id = user_id
self.session_store = session_store
self.auto_save = True # Auto-save on updates
# Load existing session if user_id provided
if user_id and session_store:
self._load_session()
# ===== User Profile Management =====
def update_profile(self, key: str, value: Any) -> None:
"""Update user profile data"""
if key in self.user_profile:
self.user_profile[key] = value
self.metadata['last_updated'] = datetime.now().isoformat()
# Auto-save if enabled
if self.auto_save and self.user_id and self.session_store:
self._save_session()
def get_profile(self, key: str) -> Any:
"""Get user profile data"""
return self.user_profile.get(key)
def get_full_profile(self) -> Dict[str, Any]:
"""Get complete user profile"""
return self.user_profile.copy()
def get_missing_fields(self, required_fields: List[str]) -> List[str]:
"""Check what required fields are still missing"""
return [field for field in required_fields
if not self.user_profile.get(field)]
def has_complete_profile(self, required_fields: List[str]) -> bool:
"""Check if all required fields are filled"""
return len(self.get_missing_fields(required_fields)) == 0
# ===== Agent Data Management =====
def add_agent_data(self, agent_name: str, key: str, value: Any) -> None:
"""Add agent-specific data"""
if agent_name not in self.extracted_data:
self.extracted_data[agent_name] = {}
self.extracted_data[agent_name][key] = value
self.metadata['last_updated'] = datetime.now().isoformat()
def get_agent_data(self, agent_name: str, key: str = None) -> Any:
"""Get agent-specific data"""
agent_data = self.extracted_data.get(agent_name, {})
if key:
return agent_data.get(key)
return agent_data
def get_all_agent_data(self) -> Dict[str, Any]:
"""Get all agent data"""
return self.extracted_data.copy()
# ===== Conversation State Management =====
def set_current_agent(self, agent_name: str) -> None:
"""Set current active agent"""
self.conversation_state['previous_agent'] = self.conversation_state['current_agent']
self.conversation_state['current_agent'] = agent_name
# Add to conversation flow
self.conversation_state['conversation_flow'].append({
'agent': agent_name,
'timestamp': datetime.now().isoformat()
})
def get_current_agent(self) -> Optional[str]:
"""Get current active agent"""
return self.conversation_state['current_agent']
def get_previous_agent(self) -> Optional[str]:
"""Get previous agent"""
return self.conversation_state['previous_agent']
def set_current_topic(self, topic: str) -> None:
"""Set current conversation topic"""
self.conversation_state['current_topic'] = topic
def get_current_topic(self) -> Optional[str]:
"""Get current conversation topic"""
return self.conversation_state['current_topic']
def add_pending_question(self, question: str, priority: int = 0) -> None:
"""Add a pending question to ask user"""
self.conversation_state['pending_questions'].append({
'question': question,
'priority': priority,
'added_at': datetime.now().isoformat()
})
# Sort by priority (higher first)
self.conversation_state['pending_questions'].sort(
key=lambda x: x['priority'],
reverse=True
)
def get_next_pending_question(self) -> Optional[str]:
"""Get next pending question"""
if self.conversation_state['pending_questions']:
return self.conversation_state['pending_questions'][0]['question']
return None
def clear_pending_questions(self) -> None:
"""Clear all pending questions"""
self.conversation_state['pending_questions'] = []
def get_conversation_flow(self) -> List[Dict[str, Any]]:
"""Get conversation flow history"""
return self.conversation_state['conversation_flow']
# ===== Context Summary =====
def get_context_summary(self) -> str:
"""Get a summary of current context for agents"""
summary_parts = []
# User profile summary
profile = self.user_profile
if profile['age']:
summary_parts.append(f"User: {profile['age']} tuổi")
if profile['gender']:
summary_parts.append(f"giới tính {profile['gender']}")
if profile['weight'] and profile['height']:
summary_parts.append(f"{profile['weight']}kg, {profile['height']}cm")
# Current topic
if self.conversation_state['current_topic']:
summary_parts.append(f"Topic: {self.conversation_state['current_topic']}")
# Previous agent
if self.conversation_state['previous_agent']:
summary_parts.append(f"Previous agent: {self.conversation_state['previous_agent']}")
return " | ".join(summary_parts) if summary_parts else "No context yet"
# ===== Serialization =====
def to_dict(self) -> Dict[str, Any]:
"""Convert memory to dictionary"""
return {
'user_profile': self.user_profile,
'extracted_data': self.extracted_data,
'conversation_state': self.conversation_state,
'metadata': self.metadata
}
def to_json(self) -> str:
"""Convert memory to JSON string"""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ConversationMemory':
"""Create memory from dictionary"""
memory = cls()
memory.user_profile = data.get('user_profile', memory.user_profile)
memory.extracted_data = data.get('extracted_data', memory.extracted_data)
memory.conversation_state = data.get('conversation_state', memory.conversation_state)
memory.metadata = data.get('metadata', memory.metadata)
return memory
# ===== Session Persistence =====
def _save_session(self) -> None:
"""Save current memory state to session store"""
if not self.user_id or not self.session_store:
return
session_data = self.to_dict()
self.session_store.save_session(self.user_id, session_data)
def _load_session(self) -> bool:
"""
Load memory state from session store
Returns:
True if session loaded, False otherwise
"""
if not self.user_id or not self.session_store:
return False
session_data = self.session_store.load_session(self.user_id)
if session_data:
self.user_profile = session_data.get('user_profile', self.user_profile)
self.extracted_data = session_data.get('extracted_data', self.extracted_data)
self.conversation_state = session_data.get('conversation_state', self.conversation_state)
self.metadata = session_data.get('metadata', self.metadata)
print(f"✅ Loaded session for user {self.user_id}")
return True
print(f"ℹ️ No existing session found for user {self.user_id}, starting fresh")
return False
def save_session_now(self) -> None:
"""Manually save session (useful when auto_save is disabled)"""
self._save_session()
def clear_session(self) -> None:
"""Clear session from storage"""
if self.user_id and self.session_store:
self.session_store.delete_session(self.user_id)
# ===== Utility Methods =====
def clear(self) -> None:
"""Clear all memory (start fresh conversation)"""
self.__init__()
def __repr__(self) -> str:
return f"<ConversationMemory: {self.get_context_summary()}>"