Spaces:
Sleeping
Sleeping
File size: 5,636 Bytes
185c377 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import logging
from datetime import datetime
from src.database_manager import DatabaseManager
class AuthManager:
def __init__(self):
self.db_manager = DatabaseManager()
def authenticate_user(self, username, password):
"""Authenticate user and return user data"""
try:
# Find user by username
user = self.db_manager.execute_query_one(
"SELECT id, username, email, name, role, org FROM users WHERE username = %s",
(username,)
)
if not user:
return None
# Check password (plaintext comparison as per requirements)
user_password = self.db_manager.execute_query_one(
"SELECT password FROM users WHERE username = %s",
(username,)
)
if not user_password or user_password['password'] != password:
return None
# Update last login
self.db_manager.execute_query(
"UPDATE users SET last_login = %s WHERE id = %s",
(datetime.now(), user['id'])
)
return user
except Exception as e:
logging.error(f"Authentication error: {e}")
return None
def create_user(self, username, email, password, name, role, org_name="", phone="",
country_code="", department="", location="", organization_id=None):
"""Create a new user account"""
try:
# Check if username or email already exists
existing_user = self.db_manager.execute_query_one(
"SELECT id FROM users WHERE username = %s OR email = %s",
(username, email)
)
if existing_user:
return False
# Handle organization creation
if role == 'organization':
# Create organization entry first
org_result = self.db_manager.execute_query(
"""INSERT INTO organizations (name, email, phone, country_code, department, location, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(org_name or name, email, phone, country_code, department, location, datetime.now())
)
if not org_result:
return False
# Get the organization ID
org_id = self.db_manager.execute_query_one(
"SELECT id FROM organizations WHERE email = %s ORDER BY created_at DESC LIMIT 1",
(email,)
)
organization_id = org_id['id'] if org_id else None
# Create user entry
user_result = self.db_manager.execute_query(
"""INSERT INTO users (username, email, password, name, role, org, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(username, email, password, name, role, organization_id, datetime.now())
)
return bool(user_result)
except Exception as e:
logging.error(f"User creation error: {e}")
return False
def get_organizations(self):
"""Get list of all organizations"""
try:
organizations = self.db_manager.execute_query(
"SELECT id, name FROM organizations ORDER BY name",
fetch=True
)
return organizations or []
except Exception as e:
logging.error(f"Error fetching organizations: {e}")
return []
def get_organization_practitioners(self, organization_id):
"""Get practitioners for an organization"""
try:
practitioners = self.db_manager.execute_query(
"""SELECT id, username, name, email, created_at, last_login
FROM users WHERE org = %s AND role = 'practitioner'
ORDER BY created_at DESC""",
(organization_id,),
fetch=True
)
return practitioners or []
except Exception as e:
logging.error(f"Error fetching practitioners: {e}")
return []
def update_user_profile(self, user_id, name=None, email=None):
"""Update user profile information"""
try:
if name:
self.db_manager.execute_query(
"UPDATE users SET name = %s WHERE id = %s",
(name, user_id)
)
if email:
# Check if email is already taken by another user
existing = self.db_manager.execute_query_one(
"SELECT id FROM users WHERE email = %s AND id != %s",
(email, user_id)
)
if not existing:
self.db_manager.execute_query(
"UPDATE users SET email = %s WHERE id = %s",
(email, user_id)
)
return True
else:
return False
return True
except Exception as e:
logging.error(f"Error updating user profile: {e}")
return False
|