SmartHeal-Agentic-AI / src /database.py
SmartHeal's picture
Update src/database.py
abdcfa7 verified
raw
history blame
18.1 kB
import mysql.connector
from mysql.connector import Error
import logging
from datetime import datetime
import json
import uuid
import os
from PIL import Image
class DatabaseManager:
"""Database operations manager for SmartHeal application (aligned to existing schema)."""
def __init__(self, mysql_config):
self.mysql_config = mysql_config
self.test_connection()
self._ensure_default_questionnaire()
# ---------------------- Connection helpers ----------------------
def test_connection(self):
try:
conn = self.get_connection()
if conn:
conn.close()
logging.info("✅ Database connection successful")
else:
logging.error("❌ Database connection failed")
except Exception as e:
logging.error(f"Database connection test failed: {e}")
def get_connection(self):
try:
return mysql.connector.connect(**self.mysql_config)
except Error as e:
logging.error(f"Error connecting to MySQL: {e}")
return None
def execute_query(self, query, params=None, fetch=False):
conn = self.get_connection()
if not conn:
return None
cur = None
try:
cur = conn.cursor(dictionary=True)
cur.execute(query, params or ())
if fetch:
return cur.fetchall()
conn.commit()
return cur.rowcount
except Error as e:
logging.error(f"Error executing query: {e} | SQL: {query} | Params: {params}")
if conn:
conn.rollback()
return None
finally:
if cur: cur.close()
if conn and conn.is_connected(): conn.close()
def execute_query_one(self, query, params=None):
conn = self.get_connection()
if not conn:
return None
cur = None
try:
cur = conn.cursor(dictionary=True)
cur.execute(query, params or ())
return cur.fetchone()
except Error as e:
logging.error(f"Error executing query: {e} | SQL: {query} | Params: {params}")
return None
finally:
if cur: cur.close()
if conn and conn.is_connected(): conn.close()
# ---------------------- One-time ensures ----------------------
def _ensure_default_questionnaire(self):
"""Ensure a 'Default Patient Assessment' row exists in questionnaires."""
try:
row = self.execute_query_one(
"SELECT id FROM questionnaires WHERE name = %s LIMIT 1",
("Default Patient Assessment",)
)
if not row:
self.execute_query(
"INSERT INTO questionnaires (name, description, created_at, updated_at) VALUES (%s, %s, NOW(), NOW())",
("Default Patient Assessment", "Standard patient wound assessment form")
)
logging.info("Created default questionnaire 'Default Patient Assessment'")
except Exception as e:
logging.error(f"Error ensuring default questionnaire: {e}")
# ---------------------- Business ops ----------------------
def save_questionnaire(self, questionnaire_data):
"""
Creates/gets patient in EXISTING `patients` table, then creates a row in `questionnaire_responses`.
Returns the `questionnaire_response_id` (int) or None.
"""
conn = None
cur = None
try:
conn = self.get_connection()
if not conn:
return None
cur = conn.cursor(dictionary=True)
# 1) Create or get patient (EXISTING `patients` table)
patient_id = self._create_or_get_patient(cur, questionnaire_data)
if not patient_id:
raise Exception("Failed to get or create patient")
# 2) Get template questionnaire id
cur.execute("SELECT id FROM questionnaires WHERE name = %s LIMIT 1",
("Default Patient Assessment",))
row = cur.fetchone()
questionnaire_id = row["id"] if row else None
if not questionnaire_id:
raise Exception("Default questionnaire not found")
# 3) Build response_data
response_data = {
'patient_info': {
'name': questionnaire_data.get('patient_name'),
'age': questionnaire_data.get('patient_age'),
'gender': questionnaire_data.get('patient_gender')
},
'wound_details': {
'location': questionnaire_data.get('wound_location'),
'duration': questionnaire_data.get('wound_duration'),
'pain_level': questionnaire_data.get('pain_level'),
'moisture_level': questionnaire_data.get('moisture_level'),
'infection_signs': questionnaire_data.get('infection_signs'),
'diabetic_status': questionnaire_data.get('diabetic_status')
},
'medical_history': {
'previous_treatment': questionnaire_data.get('previous_treatment'),
'medical_history': questionnaire_data.get('medical_history'),
'medications': questionnaire_data.get('medications'),
'allergies': questionnaire_data.get('allergies'),
'additional_notes': questionnaire_data.get('additional_notes')
}
}
practitioner_id = questionnaire_data.get('user_id')
if not practitioner_id:
# Fall back gracefully; your schema expects NOT NULL here.
practitioner_id = 1
# 4) Insert into `questionnaire_responses`
insert_sql = """
INSERT INTO questionnaire_responses
(questionnaire_id, patient_id, practitioner_id, response_data, submitted_at)
VALUES (%s, %s, %s, %s, %s)
"""
cur.execute(insert_sql, (
questionnaire_id,
patient_id, # <-- This is patients.id (BIGINT) and WILL be filled now.
practitioner_id,
json.dumps(response_data, ensure_ascii=False),
datetime.now()
))
response_id = cur.lastrowid
conn.commit()
logging.info(f"✅ Saved questionnaire response ID {response_id} (patient_id={patient_id})")
return response_id
except Exception as e:
logging.error(f"❌ Error saving questionnaire: {e}")
if conn: conn.rollback()
return None
finally:
if cur: cur.close()
if conn: conn.close()
def _create_or_get_patient(self, cur, questionnaire_data):
"""
Works against the EXISTING `patients` table:
columns include id (PK), uuid, name, age (int), gender (varchar), illness, allergy, notes, etc.
Returns patients.id (int).
"""
try:
name = questionnaire_data.get('patient_name')
age = questionnaire_data.get('patient_age')
gender = questionnaire_data.get('patient_gender')
# Try to find an existing patient via (name, age, gender)
cur.execute("""
SELECT id FROM patients
WHERE name = %s AND (age = %s OR %s IS NULL) AND (gender = %s OR %s IS NULL)
LIMIT 1
""", (name, age, age, gender, gender))
row = cur.fetchone()
if row:
return row["id"]
# Create new patient
cur.execute("""
INSERT INTO patients (uuid, name, age, gender, illness, allergy, notes, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
""", (
str(uuid.uuid4()),
name,
int(age) if (isinstance(age, (int, float, str)) and str(age).isdigit()) else None,
gender,
questionnaire_data.get('medical_history', ''),
questionnaire_data.get('allergies', ''),
questionnaire_data.get('additional_notes', '')
))
return cur.lastrowid
except Exception as e:
logging.error(f"Error creating/getting patient: {e}")
return None
def _get_patient_uuid(self, patient_id, cur=None):
"""Fetch patients.uuid for a given patients.id (helps when tables store patient_id as VARCHAR uuid)."""
owns_cursor = False
conn = None
try:
if cur is None:
conn = self.get_connection()
if not conn:
return None
cur = conn.cursor(dictionary=True)
owns_cursor = True
cur.execute("SELECT uuid FROM patients WHERE id = %s LIMIT 1", (patient_id,))
row = cur.fetchone()
return row["uuid"] if row else None
except Exception as e:
logging.error(f"Error fetching patient uuid: {e}")
return None
finally:
if owns_cursor and cur: cur.close()
if owns_cursor and conn: conn.close()
def save_wound_image(self, patient_id, image):
"""
Save wound image to filesystem and EXISTING `wound_images` table.
Your `wound_images.patient_id` is VARCHAR, so we store patients.uuid there.
Returns dict {id, filename, path} or None.
"""
try:
# 1) Persist to disk
image_uid = str(uuid.uuid4())
filename = f"wound_{image_uid}.jpg"
os.makedirs("uploads", exist_ok=True)
file_path = os.path.join("uploads", filename)
if hasattr(image, 'save'):
image.save(file_path, format='JPEG', quality=95)
width, height = image.size
file_size = os.path.getsize(file_path)
original_filename = getattr(image, "filename", filename)
elif isinstance(image, str) and os.path.exists(image):
with Image.open(image) as pil:
pil = pil.convert('RGB')
pil.save(file_path, format='JPEG', quality=95)
width, height = pil.size
file_size = os.path.getsize(file_path)
original_filename = os.path.basename(image)
else:
logging.error("Invalid image object/path")
return None
# 2) Resolve patients.uuid (VARCHAR target)
conn = self.get_connection()
if not conn: return None
cur = conn.cursor()
try:
patient_uuid = self._get_patient_uuid(patient_id)
if not patient_uuid:
raise Exception("Patient UUID not found for given patient_id")
# Insert into existing wound_images schema (uuid, patient_id (varchar), image, width, height, etc.)
cur.execute("""
INSERT INTO wound_images (
uuid, patient_id, image, width, height, area, notes, created_at, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
""", (
str(uuid.uuid4()),
patient_uuid, # VARCHAR column → store patient's UUID
file_path,
str(width),
str(height),
None,
None
))
conn.commit()
image_db_id = cur.lastrowid
logging.info(f"Image saved with ID: {image_db_id}")
return {'id': image_db_id, 'filename': filename, 'path': file_path}
finally:
cur.close()
conn.close()
except Exception as e:
logging.error(f"Image save error: {e}")
return None
def save_analysis(self, questionnaire_response_id, image_id, analysis_data):
"""
Save AI analysis results.
Your live `ai_analyses` table expects `questionnaire_id` (the TEMPLATE), not the response id.
So we first look up the template id from questionnaire_responses and store that.
"""
try:
# Resolve template questionnaire_id from the response
row = self.execute_query_one(
"SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1",
(questionnaire_response_id,)
)
if not row:
logging.error("No questionnaire_response found for analysis save")
return None
questionnaire_id = row["questionnaire_id"]
query = """
INSERT INTO ai_analyses (
questionnaire_id, image_id, analysis_data, summary,
recommendations, risk_score, risk_level,
processing_time, model_version, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
questionnaire_id,
image_id,
json.dumps(analysis_data) if analysis_data else None,
(analysis_data or {}).get('summary', ''),
(analysis_data or {}).get('recommendations', ''),
(analysis_data or {}).get('risk_score', 0),
(analysis_data or {}).get('risk_level', 'Unknown'),
(analysis_data or {}).get('processing_time', 0.0),
(analysis_data or {}).get('model_version', 'v1.0'),
datetime.now()
)
return self.execute_query(query, params, fetch=False)
except Exception as e:
logging.error(f"Analysis save error: {e}")
return None
def get_user_history(self, user_id):
"""
Latest 20 submissions for a practitioner, with optional analysis summary if present.
Aligns to existing schema: joins questionnaire_responses -> questionnaires and LEFT joins ai_analyses (by template).
"""
try:
query = """
SELECT
r.id AS response_id,
r.submitted_at,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
q.name AS questionnaire_name,
a.risk_level,
a.summary,
a.recommendations
FROM questionnaire_responses r
JOIN patients p ON p.id = r.patient_id
JOIN questionnaires q ON q.id = r.questionnaire_id
LEFT JOIN ai_analyses a ON a.questionnaire_id = r.questionnaire_id
WHERE r.practitioner_id = %s
ORDER BY r.submitted_at DESC
LIMIT 20
"""
result = self.execute_query(query, (user_id,), fetch=True)
return result or []
except Exception as e:
logging.error(f"Error fetching user history: {e}")
return []
# Convenience for existing callers (kept signature)
def create_organization(self, org_data):
try:
query = """INSERT INTO organizations (name, email, phone, country_code, department, location, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())"""
params = (
org_data.get('org_name', ''),
org_data.get('email', ''),
org_data.get('phone', ''),
org_data.get('country_code', ''),
org_data.get('department', ''),
org_data.get('location', '')
)
rc = self.execute_query(query, params)
if rc:
row = self.execute_query_one(
"SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1",
(org_data.get('org_name', ''),)
)
return row['id'] if row else None
return None
except Exception as e:
logging.error(f"Error creating organization: {e}")
return None
def get_organizations(self):
try:
query = "SELECT id, name as org_name, location FROM organizations ORDER BY name"
result = self.execute_query(query, fetch=True)
return result or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}]
except Exception as e:
logging.error(f"Error getting organizations: {e}")
return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}]
# Back-compat helper kept (writes same as save_analysis, minimal data)
def save_analysis_result(self, questionnaire_response_id, analysis_result):
try:
# store under template id like save_analysis()
row = self.execute_query_one(
"SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1",
(questionnaire_response_id,)
)
if not row:
logging.error("No questionnaire_response found for analysis_result save")
return None
questionnaire_id = row["questionnaire_id"]
query = """INSERT INTO ai_analyses (questionnaire_id, analysis_data, summary, created_at)
VALUES (%s, %s, %s, %s)"""
params = (
questionnaire_id,
json.dumps(analysis_result) if analysis_result else None,
(analysis_result or {}).get('summary', 'Analysis completed'),
datetime.now()
)
return self.execute_query(query, params)
except Exception as e:
logging.error(f"Error saving analysis result: {e}")
return None