SmartHeal's picture
Upload 33 files
185c377 verified
raw
history blame
6.72 kB
import logging
from datetime import datetime
import hashlib
class AuthManager:
"""Authentication and user management"""
def __init__(self, db_manager):
"""Initialize auth manager with database manager"""
self.db = db_manager
def hash_password(self, password):
"""Simple password hashing (in production, use bcrypt or similar)"""
return hashlib.sha256(password.encode()).hexdigest()
def authenticate_user(self, username, password):
"""Authenticate user and return user data"""
try:
user = self.db.execute_query_one(
"SELECT id, username, email, name, role, org FROM users WHERE username = %s",
(username,)
)
if not user:
logging.warning(f"User not found: {username}")
return None
user_password = self.db.execute_query_one(
"SELECT password FROM users WHERE username = %s",
(username,)
)
if not user_password:
logging.warning(f"Password not found for user: {username}")
return None
stored_password = user_password['password']
# Check if stored password is hashed (64 chars) or plain text
is_hashed = len(stored_password) == 64 and all(c in '0123456789abcdef' for c in stored_password)
if is_hashed:
# Compare with hashed input
password_match = stored_password == self.hash_password(password)
else:
# Compare with plain text (for legacy compatibility)
password_match = stored_password == password
logging.info(f"Debug - Username: {username}, Password type: {'hashed' if is_hashed else 'plain'}")
if not password_match:
logging.warning(f"Invalid password for user: {username}")
return None
# Update last login (use updated_at since last_login column may not exist)
try:
self.db.execute_query(
"UPDATE users SET updated_at = %s WHERE id = %s",
(datetime.now(), user['id'])
)
except Exception as e:
logging.warning(f"Could not update last login: {e}")
logging.info(f"User authenticated successfully: {username}")
return user
except Exception as e:
logging.error(f"Authentication error: {e}")
return None
def get_user_data(self, username):
"""Get user data by username - for compatibility with UI"""
try:
user = self.db.execute_query_one(
"SELECT id, username, email, name, role, org FROM users WHERE username = %s",
(username,)
)
if user:
logging.info(f"Retrieved user data for: {username}")
return user
else:
logging.warning(f"User not found: {username}")
return None
except Exception as e:
logging.error(f"Error getting user data: {e}")
return None
def create_user(self, username, email, password, name, role, org_name="",
phone="", country_code="", department="", location="", organization_id=None):
"""Create a new user account"""
try:
# Check if user already exists
existing_user = self.db.execute_query_one(
"SELECT id FROM users WHERE username = %s OR email = %s",
(username, email)
)
if existing_user:
logging.warning(f"User already exists: {username} or {email}")
return False
# Create organization if role is organization
if role == 'organization':
org_result = self.db.execute_query(
"""INSERT INTO organizations (name, email, phone, country_code, department, location, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(org_name or name, email, phone, country_code, department, location, datetime.now())
)
if not org_result:
logging.error("Failed to create organization")
return False
# Get the organization ID
org_id = self.db.execute_query_one(
"SELECT id FROM organizations WHERE email = %s ORDER BY created_at DESC LIMIT 1",
(email,)
)
organization_id = org_id['id'] if org_id else None
# Create user with hashed password
hashed_password = self.hash_password(password)
user_result = self.db.execute_query(
"""INSERT INTO users (username, email, password, name, role, org, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(username, email, hashed_password, name, role, organization_id, datetime.now())
)
if user_result:
logging.info(f"User created successfully: {username}")
return True
else:
logging.error(f"Failed to create user: {username}")
return False
except Exception as e:
logging.error(f"User creation error: {e}")
return False
def get_user_by_id(self, user_id):
"""Get user by ID"""
try:
user = self.db.execute_query_one(
"SELECT id, username, email, name, role, org FROM users WHERE id = %s",
(user_id,)
)
return user
except Exception as e:
logging.error(f"Error fetching user by ID: {e}")
return None
def update_user_last_login(self, user_id):
"""Update user's last login timestamp"""
try:
result = self.db.execute_query(
"UPDATE users SET updated_at = %s WHERE id = %s",
(datetime.now(), user_id)
)
return bool(result)
except Exception as e:
logging.error(f"Error updating last login: {e}")
return False