Spaces:
Sleeping
Sleeping
| import logging | |
| from datetime import datetime | |
| import hashlib | |
| class AuthManager: | |
| """Authentication and user management""" | |
| def __init__(self, db_manager): | |
| """Initialize auth manager with database manager""" | |
| self.db = db_manager | |
| def hash_password(self, password): | |
| """Simple password hashing (in production, use bcrypt or similar)""" | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def authenticate_user(self, username, password): | |
| """Authenticate user and return user data""" | |
| try: | |
| user = self.db.execute_query_one( | |
| "SELECT id, username, email, name, role, org FROM users WHERE username = %s", | |
| (username,) | |
| ) | |
| if not user: | |
| logging.warning(f"User not found: {username}") | |
| return None | |
| user_password = self.db.execute_query_one( | |
| "SELECT password FROM users WHERE username = %s", | |
| (username,) | |
| ) | |
| if not user_password: | |
| logging.warning(f"Password not found for user: {username}") | |
| return None | |
| stored_password = user_password['password'] | |
| # Check if stored password is hashed (64 chars) or plain text | |
| is_hashed = len(stored_password) == 64 and all(c in '0123456789abcdef' for c in stored_password) | |
| if is_hashed: | |
| # Compare with hashed input | |
| password_match = stored_password == self.hash_password(password) | |
| else: | |
| # Compare with plain text (for legacy compatibility) | |
| password_match = stored_password == password | |
| logging.info(f"Debug - Username: {username}, Password type: {'hashed' if is_hashed else 'plain'}") | |
| if not password_match: | |
| logging.warning(f"Invalid password for user: {username}") | |
| return None | |
| # Update last login (use updated_at since last_login column may not exist) | |
| try: | |
| self.db.execute_query( | |
| "UPDATE users SET updated_at = %s WHERE id = %s", | |
| (datetime.now(), user['id']) | |
| ) | |
| except Exception as e: | |
| logging.warning(f"Could not update last login: {e}") | |
| logging.info(f"User authenticated successfully: {username}") | |
| return user | |
| except Exception as e: | |
| logging.error(f"Authentication error: {e}") | |
| return None | |
| def get_user_data(self, username): | |
| """Get user data by username - for compatibility with UI""" | |
| try: | |
| user = self.db.execute_query_one( | |
| "SELECT id, username, email, name, role, org FROM users WHERE username = %s", | |
| (username,) | |
| ) | |
| if user: | |
| logging.info(f"Retrieved user data for: {username}") | |
| return user | |
| else: | |
| logging.warning(f"User not found: {username}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error getting user data: {e}") | |
| return None | |
| def create_user(self, username, email, password, name, role, org_name="", | |
| phone="", country_code="", department="", location="", organization_id=None): | |
| """Create a new user account""" | |
| try: | |
| # Check if user already exists | |
| existing_user = self.db.execute_query_one( | |
| "SELECT id FROM users WHERE username = %s OR email = %s", | |
| (username, email) | |
| ) | |
| if existing_user: | |
| logging.warning(f"User already exists: {username} or {email}") | |
| return False | |
| # Create organization if role is organization | |
| if role == 'organization': | |
| org_result = self.db.execute_query( | |
| """INSERT INTO organizations (name, email, phone, country_code, department, location, created_at) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s)""", | |
| (org_name or name, email, phone, country_code, department, location, datetime.now()) | |
| ) | |
| if not org_result: | |
| logging.error("Failed to create organization") | |
| return False | |
| # Get the organization ID | |
| org_id = self.db.execute_query_one( | |
| "SELECT id FROM organizations WHERE email = %s ORDER BY created_at DESC LIMIT 1", | |
| (email,) | |
| ) | |
| organization_id = org_id['id'] if org_id else None | |
| # Create user with hashed password | |
| hashed_password = self.hash_password(password) | |
| user_result = self.db.execute_query( | |
| """INSERT INTO users (username, email, password, name, role, org, created_at) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s)""", | |
| (username, email, hashed_password, name, role, organization_id, datetime.now()) | |
| ) | |
| if user_result: | |
| logging.info(f"User created successfully: {username}") | |
| return True | |
| else: | |
| logging.error(f"Failed to create user: {username}") | |
| return False | |
| except Exception as e: | |
| logging.error(f"User creation error: {e}") | |
| return False | |
| def get_user_by_id(self, user_id): | |
| """Get user by ID""" | |
| try: | |
| user = self.db.execute_query_one( | |
| "SELECT id, username, email, name, role, org FROM users WHERE id = %s", | |
| (user_id,) | |
| ) | |
| return user | |
| except Exception as e: | |
| logging.error(f"Error fetching user by ID: {e}") | |
| return None | |
| def update_user_last_login(self, user_id): | |
| """Update user's last login timestamp""" | |
| try: | |
| result = self.db.execute_query( | |
| "UPDATE users SET updated_at = %s WHERE id = %s", | |
| (datetime.now(), user_id) | |
| ) | |
| return bool(result) | |
| except Exception as e: | |
| logging.error(f"Error updating last login: {e}") | |
| return False | |