SmartHeal-Agentic-AI / src /database.py
SmartHeal's picture
Upload 33 files
185c377 verified
raw
history blame
22.9 kB
import mysql.connector
from mysql.connector import Error
import logging
from datetime import datetime
import json
import uuid
import os
from PIL import Image
class DatabaseManager:
"""Database operations manager for SmartHeal application"""
def __init__(self, mysql_config):
"""Initialize database manager with MySQL configuration"""
self.mysql_config = mysql_config
self.test_connection()
def test_connection(self):
"""Test database connection"""
try:
connection = self.get_connection()
if connection:
connection.close()
logging.info("✅ Database connection successful")
else:
logging.error("❌ Database connection failed")
except Exception as e:
logging.error(f"Database connection test failed: {e}")
def get_connection(self):
"""Get a database connection"""
try:
connection = mysql.connector.connect(**self.mysql_config)
return connection
except Error as e:
logging.error(f"Error connecting to MySQL: {e}")
return None
def execute_query(self, query, params=None, fetch=False):
"""Execute a query and return results if fetch=True"""
connection = self.get_connection()
if not connection:
return None
cursor = None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
if fetch:
result = cursor.fetchall()
else:
connection.commit()
result = cursor.rowcount
return result
except Error as e:
logging.error(f"Error executing query: {e}")
if connection:
connection.rollback()
return None
finally:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
def execute_query_one(self, query, params=None):
"""Execute a query and return one result"""
connection = self.get_connection()
if not connection:
return None
cursor = None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
result = cursor.fetchone()
return result
except Error as e:
logging.error(f"Error executing query: {e}")
return None
finally:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
def create_tables(self):
"""Create all required database tables"""
tables = {
"users": """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
name VARCHAR(100) NOT NULL,
role ENUM('practitioner', 'organization') NOT NULL,
org INT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP NULL,
is_active BOOLEAN DEFAULT TRUE,
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_role (role)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
"organizations": """
CREATE TABLE IF NOT EXISTS organizations (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(200) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
phone VARCHAR(20),
country_code VARCHAR(10),
department VARCHAR(100),
location VARCHAR(200),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE,
INDEX idx_name (name),
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
"questionnaires": """
CREATE TABLE IF NOT EXISTS questionnaires (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
patient_name VARCHAR(100) NOT NULL,
patient_age INT,
patient_gender ENUM('Male', 'Female', 'Other'),
wound_location VARCHAR(200),
wound_duration VARCHAR(100),
pain_level INT DEFAULT 0,
previous_treatment TEXT,
medical_history TEXT,
medications TEXT,
allergies TEXT,
additional_notes TEXT,
moisture_level VARCHAR(50),
infection_signs VARCHAR(50),
diabetic_status VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_patient_name (patient_name),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
"wound_images": """
CREATE TABLE IF NOT EXISTS wound_images (
id INT AUTO_INCREMENT PRIMARY KEY,
questionnaire_id INT NOT NULL,
image_url VARCHAR(500) NOT NULL,
original_filename VARCHAR(255),
file_size INT,
image_width INT,
image_height INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_questionnaire_id (questionnaire_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
"ai_analyses": """
CREATE TABLE IF NOT EXISTS ai_analyses (
id INT AUTO_INCREMENT PRIMARY KEY,
questionnaire_id INT NOT NULL,
image_id INT,
analysis_data TEXT,
summary TEXT,
recommendations TEXT,
risk_score INT DEFAULT 0,
risk_level ENUM('Low', 'Moderate', 'High', 'Unknown') DEFAULT 'Unknown',
wound_type VARCHAR(100),
wound_dimensions VARCHAR(100),
processing_time DECIMAL(5,2),
model_version VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_questionnaire_id (questionnaire_id),
INDEX idx_risk_level (risk_level),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
"analysis_sessions": """
CREATE TABLE IF NOT EXISTS analysis_sessions (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
questionnaire_id INT NOT NULL,
image_id INT,
analysis_id INT,
session_duration DECIMAL(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"""
}
for table_name, table_sql in tables.items():
try:
result = self.execute_query(table_sql)
if result is not None:
logging.info(f"Table '{table_name}' created or already exists")
except Exception as e:
logging.error(f"Error creating table '{table_name}': {e}")
def save_questionnaire(self, questionnaire_data):
"""
Save questionnaire response using the default questionnaire.
This fixes the foreign key constraint issue by using an existing questionnaire ID.
"""
connection = None
cursor = None
try:
connection = self.get_connection()
if not connection:
return None
cursor = connection.cursor()
# (1) Create or get patient
patient_id = self._create_or_get_patient(cursor, questionnaire_data)
if not patient_id:
raise Exception("Failed to get or create patient")
# (2) Get default questionnaire ID
cursor.execute("SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1")
questionnaire_row = cursor.fetchone()
if not questionnaire_row:
# Create default questionnaire if it doesn't exist
cursor.execute("""
INSERT INTO questionnaires (name, description, created_at)
VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', NOW())
""")
connection.commit()
questionnaire_id = cursor.lastrowid
else:
questionnaire_id = questionnaire_row[0]
# (3) Prepare response_data JSON
response_data = {
'patient_info': {
'name': questionnaire_data['patient_name'],
'age': questionnaire_data['patient_age'],
'gender': questionnaire_data['patient_gender']
},
'wound_details': {
'location': questionnaire_data['wound_location'],
'duration': questionnaire_data['wound_duration'],
'pain_level': questionnaire_data['pain_level'],
'moisture_level': questionnaire_data['moisture_level'],
'infection_signs': questionnaire_data['infection_signs'],
'diabetic_status': questionnaire_data['diabetic_status']
},
'medical_history': {
'previous_treatment': questionnaire_data['previous_treatment'],
'medical_history': questionnaire_data['medical_history'],
'medications': questionnaire_data['medications'],
'allergies': questionnaire_data['allergies'],
'additional_notes': questionnaire_data['additional_notes']
}
}
# (4) Insert into questionnaire_responses with correct foreign key
insert_resp = """
INSERT INTO questionnaire_responses
(questionnaire_id, patient_id, practitioner_id, response_data, submitted_at)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(insert_resp, (
questionnaire_id, # Use the existing questionnaire ID
patient_id,
questionnaire_data['user_id'],
json.dumps(response_data),
datetime.now()
))
response_id = cursor.lastrowid
connection.commit()
logging.info(f"✅ Saved response ID {response_id} for questionnaire {questionnaire_id}")
return response_id
except Exception as e:
logging.error(f"❌ Error saving questionnaire: {e}")
if connection:
connection.rollback()
return None
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def _create_or_get_patient(self, cursor, questionnaire_data):
"""Create or get existing patient record"""
try:
# Check if patient exists
select_query = """
SELECT id FROM patients
WHERE name = %s AND age = %s AND gender = %s
"""
cursor.execute(select_query, (
questionnaire_data['patient_name'],
questionnaire_data['patient_age'],
questionnaire_data['patient_gender']
))
existing_patient = cursor.fetchone()
if existing_patient:
return existing_patient[0]
# Create new patient
import uuid
insert_query = """
INSERT INTO patients (
uuid, name, age, gender, illness, allergy, notes, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
patient_uuid = str(uuid.uuid4())
cursor.execute(insert_query, (
patient_uuid,
questionnaire_data['patient_name'],
questionnaire_data['patient_age'],
questionnaire_data['patient_gender'],
questionnaire_data.get('medical_history', ''),
questionnaire_data.get('allergies', ''),
questionnaire_data.get('additional_notes', ''),
datetime.now()
))
return cursor.lastrowid
except Exception as e:
logging.error(f"Error creating/getting patient: {e}")
return None
def _create_wound_record(self, cursor, patient_id, questionnaire_data):
"""Create wound record"""
try:
import uuid
wound_uuid = str(uuid.uuid4())
query = """
INSERT INTO wounds (
uuid, patient_id, position, category, moisture, infection,
notes, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(query, (
wound_uuid,
str(patient_id),
questionnaire_data.get('wound_location', ''),
'Assessment', # Default category
questionnaire_data.get('moisture_level', ''),
questionnaire_data.get('infection_signs', ''),
questionnaire_data.get('additional_notes', ''),
datetime.now()
))
return cursor.lastrowid
except Exception as e:
logging.error(f"Error creating wound record: {e}")
return None
def save_wound_image(self, patient_id, image):
"""Save wound image to filesystem and database"""
try:
import uuid
import os
# Generate unique filename
image_id = str(uuid.uuid4())
filename = f"wound_{image_id}.jpg"
file_path = os.path.join("uploads", filename)
# Ensure uploads directory exists
os.makedirs("uploads", exist_ok=True)
# Save image to disk
if hasattr(image, 'save'):
image.save(file_path, format='JPEG', quality=95)
# Get image dimensions
width, height = image.size
# Save to database using proper wound_images table structure
query = """
INSERT INTO wound_images (
uuid, patient_id, image, width, height, created_at
) VALUES (%s, %s, %s, %s, %s, %s)
"""
params = (
str(uuid.uuid4()),
str(patient_id),
file_path,
str(width),
str(height),
datetime.now()
)
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor()
cursor.execute(query, params)
connection.commit()
image_db_id = cursor.lastrowid
logging.info(f"Image saved with ID: {image_db_id}")
return {
'id': image_db_id,
'filename': filename,
'path': file_path
}
except Error as e:
logging.error(f"Error saving image to database: {e}")
connection.rollback()
return None
finally:
cursor.close()
connection.close()
else:
logging.error("Invalid image object")
return None
except Exception as e:
logging.error(f"Image save error: {e}")
return None
def save_analysis(self, questionnaire_id, image_id, analysis_data):
"""Save AI analysis results to database"""
try:
query = """
INSERT INTO ai_analyses (
questionnaire_id, image_id, analysis_data, summary,
recommendations, risk_score, risk_level, wound_type,
wound_dimensions, processing_time, model_version, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
questionnaire_id,
image_id,
json.dumps(analysis_data) if analysis_data else None,
analysis_data.get('summary', ''),
analysis_data.get('recommendations', ''),
analysis_data.get('risk_score', 0),
analysis_data.get('risk_level', 'Unknown'),
analysis_data.get('wound_type', ''),
analysis_data.get('wound_dimensions', ''),
analysis_data.get('processing_time', 0.0),
analysis_data.get('model_version', 'v1.0'),
datetime.now()
)
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor()
cursor.execute(query, params)
connection.commit()
analysis_id = cursor.lastrowid
logging.info(f"Analysis saved with ID: {analysis_id}")
return analysis_id
except Error as e:
logging.error(f"Error saving analysis: {e}")
connection.rollback()
return None
finally:
cursor.close()
connection.close()
except Exception as e:
logging.error(f"Analysis save error: {e}")
return None
def get_user_history(self, user_id):
"""Get user's analysis history"""
try:
query = """
SELECT
q.patient_name,
q.wound_location,
q.created_at,
a.risk_level,
a.summary,
a.recommendations
FROM questionnaires q
LEFT JOIN ai_analyses a ON q.id = a.questionnaire_id
WHERE q.user_id = %s
ORDER BY q.created_at DESC
LIMIT 20
"""
result = self.execute_query(query, (user_id,), fetch=True)
return result or []
except Exception as e:
logging.error(f"Error fetching user history: {e}")
return []
def get_organizations(self):
"""Get list of all organizations"""
try:
query = "SELECT id, name as org_name, location FROM organizations ORDER BY name"
result = self.execute_query(query, fetch=True)
return result or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}]
except Exception as e:
logging.error(f"Error getting organizations: {e}")
return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}]
def create_organization(self, org_data):
"""Create a new organization"""
try:
query = """INSERT INTO organizations (name, email, phone, country_code, department, location, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)"""
params = (
org_data.get('org_name', ''),
org_data.get('email', ''),
org_data.get('phone', ''),
org_data.get('country_code', ''),
org_data.get('department', ''),
org_data.get('location', ''),
datetime.now()
)
result = self.execute_query(query, params)
if result:
# Get the created organization ID
org_id = self.execute_query_one(
"SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1",
(org_data.get('org_name', ''),)
)
return org_id['id'] if org_id else None
return None
except Exception as e:
logging.error(f"Error creating organization: {e}")
return None
def save_analysis_result(self, questionnaire_id, analysis_result):
"""Save analysis result to database"""
try:
query = """INSERT INTO ai_analyses (questionnaire_id, analysis_data, summary, created_at)
VALUES (%s, %s, %s, %s)"""
params = (
questionnaire_id,
json.dumps(analysis_result) if analysis_result else None,
analysis_result.get('summary', 'Analysis completed'),
datetime.now()
)
result = self.execute_query(query, params)
return result
except Exception as e:
logging.error(f"Error saving analysis result: {e}")
return None