Spaces:
Paused
Paused
| import json | |
| import os | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| import shutil | |
| from core.game_mechanics import Monster | |
| class StateManager: | |
| """Manages persistent state for users and monsters""" | |
| def __init__(self, data_dir: Path): | |
| self.data_dir = Path(data_dir) | |
| self.users_dir = self.data_dir / "users" | |
| self.monsters_dir = self.data_dir / "monsters" | |
| self.cache_dir = self.data_dir / "cache" | |
| # Create directories if they don't exist | |
| for dir_path in [self.users_dir, self.monsters_dir, self.cache_dir]: | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| # In-memory cache for active sessions | |
| self.active_sessions = {} | |
| self.last_save_time = {} | |
| def get_user_dir(self, user_id: str) -> Path: | |
| """Get or create user directory""" | |
| user_dir = self.users_dir / user_id | |
| user_dir.mkdir(exist_ok=True) | |
| return user_dir | |
| def save_monster(self, user_id: str, monster: Monster) -> bool: | |
| """Save monster to persistent storage""" | |
| try: | |
| user_dir = self.get_user_dir(user_id) | |
| # Save monster data | |
| monster_file = user_dir / f"monster_{monster.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(monster_file, 'w') as f: | |
| json.dump(monster.to_dict(), f, indent=2) | |
| # Update current monster reference | |
| current_file = user_dir / "current_monster.json" | |
| current_data = { | |
| 'monster_file': str(monster_file.name), | |
| 'monster_name': monster.name, | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| with open(current_file, 'w') as f: | |
| json.dump(current_data, f, indent=2) | |
| # Update user profile | |
| self._update_user_profile(user_id, monster) | |
| # Cache in memory | |
| self.active_sessions[user_id] = { | |
| 'monster': monster, | |
| 'last_access': datetime.now() | |
| } | |
| return True | |
| except Exception as e: | |
| print(f"Error saving monster: {e}") | |
| return False | |
| def get_current_monster(self, user_id: str) -> Optional[Monster]: | |
| """Get the current active monster for a user""" | |
| # Check memory cache first | |
| if user_id in self.active_sessions: | |
| session = self.active_sessions[user_id] | |
| if datetime.now() - session['last_access'] < timedelta(minutes=30): | |
| session['last_access'] = datetime.now() | |
| return session['monster'] | |
| # Load from disk | |
| try: | |
| user_dir = self.get_user_dir(user_id) | |
| current_file = user_dir / "current_monster.json" | |
| if not current_file.exists(): | |
| return None | |
| with open(current_file, 'r') as f: | |
| current_data = json.load(f) | |
| monster_file = user_dir / current_data['monster_file'] | |
| if not monster_file.exists(): | |
| return None | |
| with open(monster_file, 'r') as f: | |
| monster_data = json.load(f) | |
| monster = Monster.from_dict(monster_data) | |
| # Update cache | |
| self.active_sessions[user_id] = { | |
| 'monster': monster, | |
| 'last_access': datetime.now() | |
| } | |
| return monster | |
| except Exception as e: | |
| print(f"Error loading monster: {e}") | |
| return None | |
| def update_monster(self, user_id: str, monster: Monster) -> bool: | |
| """Update existing monster data""" | |
| # Update in memory | |
| if user_id in self.active_sessions: | |
| self.active_sessions[user_id]['monster'] = monster | |
| self.active_sessions[user_id]['last_access'] = datetime.now() | |
| # Save periodically (every 5 minutes) or if important changes | |
| should_save = False | |
| current_time = datetime.now() | |
| if user_id not in self.last_save_time: | |
| should_save = True | |
| else: | |
| time_since_save = current_time - self.last_save_time[user_id] | |
| if time_since_save > timedelta(minutes=5): | |
| should_save = True | |
| # Always save on evolution or critical states | |
| if monster.care_state['health'] < 30 or monster.care_state['hunger'] < 20: | |
| should_save = True | |
| if should_save: | |
| self.last_save_time[user_id] = current_time | |
| return self.save_monster(user_id, monster) | |
| return True | |
| def get_user_monsters(self, user_id: str) -> List[Dict[str, Any]]: | |
| """Get all monsters for a user""" | |
| try: | |
| user_dir = self.get_user_dir(user_id) | |
| monsters = [] | |
| for file_path in user_dir.glob("monster_*.json"): | |
| if file_path.name != "current_monster.json": | |
| with open(file_path, 'r') as f: | |
| monster_data = json.load(f) | |
| monsters.append({ | |
| 'file': file_path.name, | |
| 'name': monster_data.get('name'), | |
| 'species': monster_data.get('species'), | |
| 'stage': monster_data.get('stage'), | |
| 'birth_time': monster_data.get('birth_time') | |
| }) | |
| # Sort by birth time (newest first) | |
| monsters.sort(key=lambda x: x['birth_time'], reverse=True) | |
| return monsters | |
| except Exception as e: | |
| print(f"Error getting user monsters: {e}") | |
| return [] | |
| def _update_user_profile(self, user_id: str, monster: Monster): | |
| """Update user profile with monster statistics""" | |
| try: | |
| user_dir = self.get_user_dir(user_id) | |
| profile_file = user_dir / "profile.json" | |
| # Load existing profile or create new | |
| if profile_file.exists(): | |
| with open(profile_file, 'r') as f: | |
| profile = json.load(f) | |
| else: | |
| profile = { | |
| 'user_id': user_id, | |
| 'created': datetime.now().isoformat(), | |
| 'monsters_created': 0, | |
| 'total_training_sessions': 0, | |
| 'achievements': [] | |
| } | |
| # Update statistics | |
| profile['monsters_created'] = profile.get('monsters_created', 0) + 1 | |
| profile['last_active'] = datetime.now().isoformat() | |
| profile['current_monster'] = monster.name | |
| # Check for achievements | |
| new_achievements = self._check_achievements(profile, monster) | |
| profile['achievements'].extend(new_achievements) | |
| # Save profile | |
| with open(profile_file, 'w') as f: | |
| json.dump(profile, f, indent=2) | |
| except Exception as e: | |
| print(f"Error updating user profile: {e}") | |
| def _check_achievements(self, profile: Dict, monster: Monster) -> List[Dict[str, Any]]: | |
| """Check for new achievements""" | |
| achievements = [] | |
| current_achievements = {a['id'] for a in profile.get('achievements', [])} | |
| # First monster achievement | |
| if profile['monsters_created'] == 1 and 'first_monster' not in current_achievements: | |
| achievements.append({ | |
| 'id': 'first_monster', | |
| 'name': 'Digital Pioneer', | |
| 'description': 'Created your first digital monster', | |
| 'icon': 'π₯', | |
| 'unlocked': datetime.now().isoformat() | |
| }) | |
| # Multiple monsters achievement | |
| if profile['monsters_created'] == 5 and 'monster_collector' not in current_achievements: | |
| achievements.append({ | |
| 'id': 'monster_collector', | |
| 'name': 'Monster Collector', | |
| 'description': 'Created 5 digital monsters', | |
| 'icon': 'π', | |
| 'unlocked': datetime.now().isoformat() | |
| }) | |
| # Perfect care achievement | |
| if all(monster.care_state[stat] >= 90 for stat in ['hunger', 'happiness', 'health']): | |
| if 'perfect_care' not in current_achievements: | |
| achievements.append({ | |
| 'id': 'perfect_care', | |
| 'name': 'Perfect Caretaker', | |
| 'description': 'Achieved perfect care status', | |
| 'icon': 'π', | |
| 'unlocked': datetime.now().isoformat() | |
| }) | |
| return achievements | |
| def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: | |
| """Get user profile""" | |
| try: | |
| user_dir = self.get_user_dir(user_id) | |
| profile_file = user_dir / "profile.json" | |
| if profile_file.exists(): | |
| with open(profile_file, 'r') as f: | |
| return json.load(f) | |
| return None | |
| except Exception as e: | |
| print(f"Error loading user profile: {e}") | |
| return None | |
| def cleanup_old_sessions(self): | |
| """Clean up old sessions from memory""" | |
| current_time = datetime.now() | |
| expired_users = [] | |
| for user_id, session in self.active_sessions.items(): | |
| if current_time - session['last_access'] > timedelta(hours=1): | |
| expired_users.append(user_id) | |
| for user_id in expired_users: | |
| # Save before removing from cache | |
| if 'monster' in self.active_sessions[user_id]: | |
| self.save_monster(user_id, self.active_sessions[user_id]['monster']) | |
| del self.active_sessions[user_id] | |
| def export_user_data(self, user_id: str) -> Optional[str]: | |
| """Export all user data as a zip file""" | |
| try: | |
| user_dir = self.get_user_dir(user_id) | |
| export_path = self.cache_dir / f"export_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # Create zip archive | |
| shutil.make_archive(str(export_path), 'zip', user_dir) | |
| return f"{export_path}.zip" | |
| except Exception as e: | |
| print(f"Error exporting user data: {e}") | |
| return None |