Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI compatible authentication module | |
| """ | |
| import jwt | |
| import bcrypt | |
| import json | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any | |
| from pathlib import Path | |
| class AuthManager: | |
| def __init__(self, secret_key: str = None): | |
| self.secret_key = secret_key or os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production') | |
| # Use absolute paths and handle Hugging Face Spaces | |
| self.base_dir = Path(__file__).parent.parent # Go up from src/components | |
| self.data_dir = self.base_dir / 'data' | |
| # Fallback to current directory if base_dir doesn't work | |
| if not self.data_dir.exists(): | |
| self.data_dir = Path.cwd() / 'data' | |
| self.users_file = self.data_dir / 'users.json' | |
| self.session_file = self.data_dir / 'active_sessions.json' | |
| # In-memory fallback for read-only environments | |
| self.users_memory = {} | |
| self.sessions_memory = {} | |
| self.use_memory = False | |
| self.ensure_users_file() | |
| self.ensure_admin_user() # Ensure admin exists on startup | |
| print(f"β AuthManager initialized") | |
| print(f"π Data directory: {self.data_dir}") | |
| print(f"π Users file: {self.users_file}") | |
| print(f"πΎ Using memory storage: {self.use_memory}") | |
| def ensure_users_file(self): | |
| """Ensure users file exists with better error handling""" | |
| try: | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| if not self.users_file.exists(): | |
| with open(self.users_file, 'w') as f: | |
| json.dump({}, f) | |
| print(f"β Created users file: {self.users_file}") | |
| # Test write permissions | |
| test_data = self.load_users() | |
| self.save_users(test_data) | |
| print(f"β Write permissions confirmed") | |
| except Exception as e: | |
| print(f"β οΈ File system error: {e}") | |
| print(f"π Switching to in-memory storage") | |
| self.use_memory = True | |
| self.users_memory = {} | |
| self.sessions_memory = {} | |
| def ensure_admin_user(self): | |
| """Ensure admin user exists on startup""" | |
| try: | |
| result = self.create_default_admin() | |
| if result.get('success'): | |
| print(f"β Admin user ready: {result.get('message', 'Available')}") | |
| except Exception as e: | |
| print(f"β οΈ Admin user creation failed: {e}") | |
| def hash_password(self, password: str) -> str: | |
| """Hash password with bcrypt""" | |
| return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| def verify_password(self, password: str, hashed: str) -> bool: | |
| """Verify password against hash""" | |
| try: | |
| return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) | |
| except Exception as e: | |
| print(f"β Password verification error: {e}") | |
| return False | |
| def load_users(self) -> Dict[str, Any]: | |
| """Load users from file or memory""" | |
| if self.use_memory: | |
| return self.users_memory.copy() | |
| try: | |
| if self.users_file.exists(): | |
| with open(self.users_file, 'r') as f: | |
| users = json.load(f) | |
| return users | |
| return {} | |
| except Exception as e: | |
| print(f"β Error loading users: {e}") | |
| return {} | |
| def save_users(self, users: Dict[str, Any]): | |
| """Save users to file or memory""" | |
| if self.use_memory: | |
| self.users_memory = users.copy() | |
| return | |
| try: | |
| with open(self.users_file, 'w') as f: | |
| json.dump(users, f, indent=2) | |
| except Exception as e: | |
| print(f"β Error saving users: {e}") | |
| def create_user(self, username: str, email: str, password: str) -> Dict[str, Any]: | |
| """Create a new user""" | |
| users = self.load_users() | |
| if username in users: | |
| return {'success': False, 'error': 'Username already exists'} | |
| # Check if email already exists | |
| for user_data in users.values(): | |
| if user_data.get('email') == email: | |
| return {'success': False, 'error': 'Email already registered'} | |
| # Create user | |
| user_id = f"user_{len(users) + 1}" | |
| users[username] = { | |
| 'user_id': user_id, | |
| 'email': email, | |
| 'password_hash': self.hash_password(password), | |
| 'created_at': datetime.now().isoformat(), | |
| 'is_active': True | |
| } | |
| self.save_users(users) | |
| return {'success': True, 'user_id': user_id} | |
| def authenticate_user(self, username: str, password: str) -> Dict[str, Any]: | |
| """Authenticate user credentials with detailed logging""" | |
| print(f"π Authentication attempt for: {username}") | |
| users = self.load_users() | |
| print(f"π Total users in database: {len(users)}") | |
| if username not in users: | |
| print(f"β Username '{username}' not found") | |
| print(f"π Available usernames: {list(users.keys())}") | |
| return {'success': False, 'error': 'Invalid username or password'} | |
| user = users[username] | |
| if not self.verify_password(password, user['password_hash']): | |
| print(f"β Invalid password for '{username}'") | |
| return {'success': False, 'error': 'Invalid username or password'} | |
| if not user.get('is_active', True): | |
| print(f"β User '{username}' is not active") | |
| return {'success': False, 'error': 'Account is disabled'} | |
| # Generate JWT token | |
| try: | |
| token = jwt.encode({ | |
| 'user_id': user['user_id'], | |
| 'username': username, | |
| 'exp': datetime.utcnow() + timedelta(hours=8) | |
| }, self.secret_key, algorithm='HS256') | |
| # Track active session | |
| self.add_active_session(user['user_id'], token) | |
| print(f"β Authentication successful for '{username}'") | |
| return { | |
| 'success': True, | |
| 'token': token, | |
| 'user_id': user['user_id'], | |
| 'username': username | |
| } | |
| except Exception as e: | |
| print(f"β JWT generation failed: {e}") | |
| return {'success': False, 'error': 'Authentication failed'} | |
| def verify_token(self, token: str) -> Optional[Dict[str, Any]]: | |
| """Verify JWT token""" | |
| try: | |
| payload = jwt.decode(token, self.secret_key, algorithms=['HS256']) | |
| user_id = payload.get('user_id') | |
| # Check if session is still active | |
| if not self.is_session_active(user_id, token): | |
| return None | |
| # Update session activity | |
| self.update_session_activity(user_id) | |
| return payload | |
| except jwt.ExpiredSignatureError: | |
| print("π Token expired") | |
| return None | |
| except jwt.InvalidTokenError: | |
| print("β Invalid token") | |
| return None | |
| except Exception as e: | |
| print(f"β Token verification error: {e}") | |
| return None | |
| def create_default_admin(self) -> Dict[str, Any]: | |
| """Create default admin user if it doesn't exist""" | |
| users = self.load_users() | |
| admin_username = "admin" | |
| admin_user_id = "admin_user" | |
| # Check if admin already exists | |
| if admin_username in users: | |
| return {'success': True, 'message': 'Admin user already exists'} | |
| # Create admin user | |
| try: | |
| users[admin_username] = { | |
| 'user_id': admin_user_id, | |
| 'email': 'admin@researchmate.local', | |
| 'password_hash': self.hash_password('admin123'), | |
| 'created_at': datetime.now().isoformat(), | |
| 'is_active': True, | |
| 'is_admin': True | |
| } | |
| self.save_users(users) | |
| return { | |
| 'success': True, | |
| 'message': 'Default admin user created', | |
| 'username': admin_username, | |
| 'password': 'admin123' | |
| } | |
| except Exception as e: | |
| print(f"β Failed to create admin user: {e}") | |
| return {'success': False, 'error': str(e)} | |
| def load_active_sessions(self) -> Dict[str, Any]: | |
| """Load active sessions""" | |
| if self.use_memory: | |
| return self.sessions_memory.copy() | |
| try: | |
| if self.session_file.exists(): | |
| with open(self.session_file, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"β Error loading sessions: {e}") | |
| return {} | |
| def save_active_sessions(self, sessions: Dict[str, Any]): | |
| """Save active sessions""" | |
| if self.use_memory: | |
| self.sessions_memory = sessions.copy() | |
| return | |
| try: | |
| with open(self.session_file, 'w') as f: | |
| json.dump(sessions, f, indent=2) | |
| except Exception as e: | |
| print(f"β Error saving sessions: {e}") | |
| def add_active_session(self, user_id: str, token: str): | |
| """Add an active session""" | |
| sessions = self.load_active_sessions() | |
| sessions[user_id] = { | |
| 'token': token, | |
| 'created_at': datetime.now().isoformat(), | |
| 'last_activity': datetime.now().isoformat() | |
| } | |
| self.save_active_sessions(sessions) | |
| def is_session_active(self, user_id: str, token: str) -> bool: | |
| """Check if a session is active""" | |
| sessions = self.load_active_sessions() | |
| if user_id not in sessions: | |
| return False | |
| session = sessions[user_id] | |
| if session.get('token') != token: | |
| return False | |
| # Check if session is expired | |
| try: | |
| created_at = datetime.fromisoformat(session['created_at']) | |
| if datetime.now() - created_at > timedelta(hours=8): | |
| self.remove_active_session(user_id) | |
| return False | |
| except: | |
| return False | |
| return True | |
| def remove_active_session(self, user_id: str): | |
| """Remove an active session""" | |
| sessions = self.load_active_sessions() | |
| if user_id in sessions: | |
| del sessions[user_id] | |
| self.save_active_sessions(sessions) | |
| def update_session_activity(self, user_id: str): | |
| """Update last activity time for a session""" | |
| sessions = self.load_active_sessions() | |
| if user_id in sessions: | |
| sessions[user_id]['last_activity'] = datetime.now().isoformat() | |
| self.save_active_sessions(sessions) | |
| def logout_user(self, user_id: str): | |
| """Logout user and invalidate session""" | |
| self.remove_active_session(user_id) | |
| return {'success': True, 'message': 'Logged out successfully'} | |
| def cleanup_expired_sessions(self): | |
| """Clean up expired sessions""" | |
| sessions = self.load_active_sessions() | |
| current_time = datetime.now() | |
| expired_sessions = [] | |
| for user_id, session in sessions.items(): | |
| try: | |
| created_at = datetime.fromisoformat(session['created_at']) | |
| if current_time - created_at > timedelta(hours=8): | |
| expired_sessions.append(user_id) | |
| except: | |
| expired_sessions.append(user_id) | |
| for user_id in expired_sessions: | |
| del sessions[user_id] | |
| if expired_sessions: | |
| self.save_active_sessions(sessions) | |
| return len(expired_sessions) | |
| def debug_status(self): | |
| """Debug authentication status""" | |
| users = self.load_users() | |
| sessions = self.load_active_sessions() | |
| print("=== AUTH DEBUG STATUS ===") | |
| print(f"Storage mode: {'Memory' if self.use_memory else 'File'}") | |
| print(f"Users file exists: {self.users_file.exists() if not self.use_memory else 'N/A'}") | |
| print(f"Total users: {len(users)}") | |
| print(f"Active sessions: {len(sessions)}") | |
| if users: | |
| print("Users:") | |
| for username, user_data in users.items(): | |
| print(f" - {username}: {user_data.get('user_id', 'No ID')}") | |
| print("========================") | |
| # Global auth manager instance | |
| auth_manager = AuthManager() |