SmartHeal-Agentic-AI / src /database.py
SmartHeal's picture
Update src/database.py
0165d1a verified
raw
history blame
30 kB
# database.py (SmartHeal) β€” dataset ID hardcoded
import os
import json
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List
import mysql.connector
from mysql.connector import Error
from PIL import Image
# ----------------------------- Hardcoded Dataset -----------------------------
DATASET_ID = "SmartHeal/wound-image-uploads" # <β€” hardcoded as requested
# Optional HF dataset integration
_HF_AVAILABLE = False
try:
from huggingface_hub import HfApi, HfFolder, create_repo
_HF_AVAILABLE = True
except Exception:
_HF_AVAILABLE = False
# ----------------------------- helpers -----------------------------
def _now():
return datetime.now()
def _abs(p: str) -> str:
try:
return os.path.abspath(p)
except Exception:
return p
def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def _file_url(path: str) -> str:
"""
For local files, return /file=/abs/path (works in Gradio/Spaces).
For remote (http/https), return as-is.
"""
if not path:
return ""
s = str(path)
if s.startswith("http://") or s.startswith("https://"):
return s
return f"/file={_abs(s)}"
# -------------------------- DatabaseManager --------------------------
class DatabaseManager:
"""
Database operations manager for SmartHeal
- Keeps patient_id (INT) consistent for questionnaire_responses / ai_analyses joins
- Uses patients.uuid (CHAR 36) in wounds / wound_images where those tables store VARCHAR
- Saves images locally OR to a hardcoded Hugging Face dataset and stores the URL
"""
def __init__(self, mysql_config: Dict[str, Any], hf_token: Optional[str] = None):
self.mysql_config = mysql_config
# storage backend: hardcoded dataset id; token from arg or env
self.dataset_id = DATASET_ID
self.hf_token = hf_token or os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_TOKEN")
self.use_dataset = bool(self.dataset_id and self.hf_token and _HF_AVAILABLE)
if self.use_dataset:
logging.info(f"HF dataset storage enabled: {self.dataset_id}")
try:
HfFolder.save_token(self.hf_token)
try:
create_repo(repo_id=self.dataset_id, repo_type="dataset", exist_ok=True, token=self.hf_token)
except Exception:
pass
self.hf_api = HfApi(token=self.hf_token)
except Exception as e:
logging.warning(f"Could not initialize HF dataset backend, falling back to local. Err: {e}")
self.use_dataset = False
else:
logging.info("Using LOCAL storage (uploads/)")
_ensure_dir("uploads")
self.test_connection()
self._ensure_default_questionnaire()
# -------------------- low-level connection utils --------------------
def test_connection(self):
try:
conn = self.get_connection()
if conn:
conn.close()
logging.info("βœ… Database connection successful")
else:
logging.error("❌ Database connection failed")
except Exception as e:
logging.error(f"Database connection test failed: {e}")
def get_connection(self):
try:
return mysql.connector.connect(**self.mysql_config)
except Error as e:
logging.error(f"Error connecting to MySQL: {e}")
return None
def execute_query(self, query, params=None, fetch=False):
conn = self.get_connection()
if not conn:
return None
cur = None
try:
cur = conn.cursor(dictionary=True)
cur.execute(query, params or ())
if fetch:
return cur.fetchall()
conn.commit()
return cur.rowcount
except Error as e:
logging.error(f"Error executing query: {e}\nSQL: {query}\nParams: {params}")
if conn:
conn.rollback()
return None
finally:
if cur: cur.close()
if conn and conn.is_connected(): conn.close()
def execute_query_one(self, query, params=None):
conn = self.get_connection()
if not conn:
return None
cur = None
try:
cur = conn.cursor(dictionary=True)
cur.execute(query, params or ())
return cur.fetchone()
except Error as e:
logging.error(f"Error executing query: {e}\nSQL: {query}\nParams: {params}")
return None
finally:
if cur: cur.close()
if conn and conn.is_connected(): conn.close()
# ---------------------- One-time ensures ----------------------
def _ensure_default_questionnaire(self):
"""Ensure a 'Default Patient Assessment' row exists in questionnaires."""
try:
row = self.execute_query_one(
"SELECT id FROM questionnaires WHERE name = %s LIMIT 1",
("Default Patient Assessment",)
)
if not row:
self.execute_query(
"INSERT INTO questionnaires (name, description, created_at, updated_at) "
"VALUES (%s, %s, NOW(), NOW())",
("Default Patient Assessment", "Standard patient wound assessment form")
)
logging.info("Created default questionnaire 'Default Patient Assessment'")
except Exception as e:
logging.error(f"Error ensuring default questionnaire: {e}")
# ------------------------------ patients ------------------------------
def _normalize_age(self, age_val: Any) -> Optional[int]:
try:
return int(age_val) if age_val not in [None, ""] else None
except Exception:
return None
def get_patient_by_id(self, patient_id: int) -> Optional[Dict[str, Any]]:
return self.execute_query_one(
"SELECT id, uuid, name, age, gender FROM patients WHERE id = %s LIMIT 1",
(patient_id,)
)
def get_patient_by_name_age_gender(self, name: str, age: Any, gender: str) -> Optional[Dict[str, Any]]:
age_val = self._normalize_age(age)
if age_val is None:
return self.execute_query_one(
"SELECT id, uuid, name, age, gender FROM patients "
"WHERE name = %s AND age IS NULL AND gender = %s LIMIT 1",
(name, gender),
)
return self.execute_query_one(
"SELECT id, uuid, name, age, gender FROM patients "
"WHERE name = %s AND age = %s AND gender = %s LIMIT 1",
(name, age_val, gender),
)
def create_patient(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
conn = None
cur = None
try:
conn = self.get_connection()
if not conn: return None
cur = conn.cursor()
import uuid as _uuid
p_uuid = str(_uuid.uuid4())
cur.execute("""
INSERT INTO patients (uuid, name, age, gender, illness, allergy, notes, created_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""", (
p_uuid,
data.get("patient_name"),
self._normalize_age(data.get("patient_age")),
data.get("patient_gender"),
data.get("medical_history", ""),
data.get("allergies", ""),
data.get("additional_notes", ""),
_now()
))
conn.commit()
return {"id": cur.lastrowid, "uuid": p_uuid}
except Exception as e:
if conn: conn.rollback()
logging.error(f"create_patient error: {e}")
return None
finally:
try:
if cur: cur.close()
if conn: conn.close()
except Exception:
pass
def list_patients_for_practitioner(self, practitioner_id: int) -> List[Dict[str, Any]]:
return self.execute_query("""
SELECT
p.id, p.uuid, p.name, p.age, p.gender,
COUNT(qr.id) AS total_visits,
MAX(qr.submitted_at) AS last_visit,
MIN(qr.submitted_at) AS first_visit
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
WHERE qr.practitioner_id = %s
GROUP BY p.id, p.uuid, p.name, p.age, p.gender
ORDER BY last_visit DESC
""", (practitioner_id,), fetch=True) or []
# --------------------------- questionnaires/visits ---------------------------
def _ensure_default_questionnaire_id(self, created_by_user_id: Optional[int] = None) -> int:
row = self.execute_query_one(
"SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1"
)
if row:
return int(row["id"])
self.execute_query("""
INSERT INTO questionnaires (name, description, created_by, created_at, updated_at)
VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', %s, NOW(), NOW())
""", (created_by_user_id,))
row = self.execute_query_one(
"SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' ORDER BY id DESC LIMIT 1"
)
return int(row["id"]) if row else 1
def save_questionnaire(self, questionnaire_data: Dict[str, Any], existing_patient_id: Optional[int] = None):
"""
Create a visit (questionnaire_responses) for an existing or new patient.
Returns: {'response_id', 'patient_id', 'patient_uuid'}
"""
conn = None
cur = None
try:
conn = self.get_connection()
if not conn: return None
cur = conn.cursor(dictionary=True)
# Resolve patient
if existing_patient_id:
p = self.get_patient_by_id(existing_patient_id)
if not p:
raise Exception("Existing patient id not found")
patient_id = p["id"]
patient_uuid = p["uuid"]
else:
found = self.get_patient_by_name_age_gender(
questionnaire_data.get("patient_name"),
questionnaire_data.get("patient_age"),
questionnaire_data.get("patient_gender"),
)
if found:
patient_id, patient_uuid = found["id"], found["uuid"]
else:
created = self.create_patient(questionnaire_data)
if not created:
raise Exception("Failed to create patient")
patient_id, patient_uuid = created["id"], created["uuid"]
qid = self._ensure_default_questionnaire_id(questionnaire_data.get("user_id"))
response_data = {
"patient_info": {
"name": questionnaire_data.get("patient_name"),
"age": questionnaire_data.get("patient_age"),
"gender": questionnaire_data.get("patient_gender"),
},
"wound_details": {
"location": questionnaire_data.get("wound_location"),
"duration": questionnaire_data.get("wound_duration"),
"pain_level": questionnaire_data.get("pain_level"),
"moisture_level": questionnaire_data.get("moisture_level"),
"infection_signs": questionnaire_data.get("infection_signs"),
"diabetic_status": questionnaire_data.get("diabetic_status"),
},
"medical_history": {
"previous_treatment": questionnaire_data.get("previous_treatment"),
"medical_history": questionnaire_data.get("medical_history"),
"medications": questionnaire_data.get("medications"),
"allergies": questionnaire_data.get("allergies"),
"additional_notes": questionnaire_data.get("additional_notes"),
},
}
practitioner_id = questionnaire_data.get("user_id") or 1
cur.execute("""
INSERT INTO questionnaire_responses
(questionnaire_id, patient_id, practitioner_id, response_data, submitted_at)
VALUES (%s,%s,%s,%s,%s)
""", (
qid, patient_id, practitioner_id, json.dumps(response_data), _now()
))
conn.commit()
response_id = cur.lastrowid
logging.info(f"βœ… Saved response ID {response_id}")
return {"response_id": response_id, "patient_id": patient_id, "patient_uuid": patient_uuid}
except Exception as e:
if conn: conn.rollback()
logging.error(f"save_questionnaire error: {e}")
return None
finally:
try:
if cur: cur.close()
if conn: conn.close()
except Exception:
pass
# ------------------------------- wounds -------------------------------
def create_wound(self, patient_uuid: str, questionnaire_data: Dict[str, Any]) -> Optional[str]:
"""Create wound (returns wound_uuid). Stores patient_uuid in wounds.patient_id (VARCHAR)."""
conn = None
cur = None
try:
conn = self.get_connection()
if not conn: return None
cur = conn.cursor()
import uuid as _uuid
wound_uuid = str(_uuid.uuid4())
cur.execute("""
INSERT INTO wounds (uuid, patient_id, position, category, moisture, infection, notes, created_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""", (
wound_uuid,
patient_uuid,
questionnaire_data.get("wound_location") or "",
"Assessment",
questionnaire_data.get("moisture_level") or "",
questionnaire_data.get("infection_signs") or "",
questionnaire_data.get("additional_notes") or "",
_now()
))
conn.commit()
return wound_uuid
except Exception as e:
if conn: conn.rollback()
logging.error(f"create_wound error: {e}")
return None
finally:
try:
if cur: cur.close()
if conn: conn.close()
except Exception:
pass
# --------------------------- image storage ---------------------------
def _read_image_size(self, image_path: str):
try:
with Image.open(image_path) as im:
return im.size # (w, h)
except Exception:
return (None, None)
def _copy_to_uploads(self, src_path: str, suffix: str) -> str:
ext = os.path.splitext(src_path)[1] or ".jpg"
base = f"wound_{datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')}_{suffix}{ext}"
dst = os.path.join("uploads", base)
try:
with open(src_path, "rb") as r, open(dst, "wb") as w:
w.write(r.read())
return dst
except Exception:
return src_path
def _hf_upload_and_url(self, local_path: str, subdir: str) -> Optional[str]:
"""Upload a file to the (hardcoded) HF dataset and return a public resolve URL."""
if not self.use_dataset:
return None
try:
# destination path in repo (datasets use 'main' branch)
day = datetime.utcnow().strftime("%Y/%m/%d")
fname = os.path.basename(local_path)
repo_path = f"{subdir}/{day}/{fname}"
self.hf_api.upload_file(
path_or_fileobj=local_path,
path_in_repo=repo_path,
repo_id=self.dataset_id,
repo_type="dataset",
commit_message=f"Add {repo_path}"
)
return f"https://huggingface.co/datasets/{self.dataset_id}/resolve/main/{repo_path}"
except Exception as e:
logging.error(f"HF upload failed: {e}")
return None
def _insert_wound_image_row(self, conn, cur, patient_uuid: str, wound_uuid: Optional[str], image_path_or_url: str) -> Optional[int]:
"""Insert one row in wound_images; width/height detected for local files only."""
width, height = (None, None)
if not (image_path_or_url.startswith("http://") or image_path_or_url.startswith("https://")):
width, height = self._read_image_size(image_path_or_url)
import uuid as _uuid
img_uuid = str(_uuid.uuid4())
cur.execute("""
INSERT INTO wound_images (uuid, patient_id, wound_id, image, width, height, created_at)
VALUES (%s,%s,%s,%s,%s,%s,%s)
""", (
img_uuid,
patient_uuid,
wound_uuid,
image_path_or_url,
int(width) if width else None,
int(height) if height else None,
_now()
))
conn.commit()
return cur.lastrowid
def save_wound_images_bundle(
self,
patient_uuid: str,
original_path: str,
analysis_result: Dict[str, Any],
wound_uuid: Optional[str] = None
) -> Dict[str, Any]:
"""
Save original + detection + segmentation images to wound_images.
Stores **HF dataset URLs** when configured, else local paths.
Returns display URLs for UI.
"""
out = {
"original_id": None, "original_url": None,
"detection_id": None, "detection_url": None,
"segmentation_id": None, "segmentation_url": None
}
conn = self.get_connection()
if not conn: return out
cur = conn.cursor()
try:
# Ensure wound row exists if not provided (minimal)
if not wound_uuid:
import uuid as _uuid
wound_uuid = str(_uuid.uuid4())
cur.execute("""
INSERT INTO wounds (uuid, patient_id, category, notes, created_at)
VALUES (%s,%s,%s,%s,%s)
""", (wound_uuid, patient_uuid, "Assessment", "", _now()))
conn.commit()
# 1) Original
local_orig = self._copy_to_uploads(original_path, "original")
url_orig = self._hf_upload_and_url(local_orig, subdir="original") if self.use_dataset else None
store_orig = url_orig or local_orig
img_id = self._insert_wound_image_row(conn, cur, patient_uuid, wound_uuid, store_orig)
out["original_id"] = img_id
out["original_url"] = _file_url(store_orig)
# 2) Detection / Segmentation from analysis_result
va = (analysis_result or {}).get("visual_analysis", {}) or {}
det = va.get("detection_image_path")
seg = va.get("segmentation_image_path")
if det and os.path.exists(det):
local_det = self._copy_to_uploads(det, "detect")
url_det = self._hf_upload_and_url(local_det, subdir="detect") if self.use_dataset else None
store_det = url_det or local_det
did = self._insert_wound_image_row(conn, cur, patient_uuid, wound_uuid, store_det)
out["detection_id"] = did
out["detection_url"] = _file_url(store_det)
if seg and os.path.exists(seg):
local_seg = self._copy_to_uploads(seg, "segment")
url_seg = self._hf_upload_and_url(local_seg, subdir="segment") if self.use_dataset else None
store_seg = url_seg or local_seg
sid = self._insert_wound_image_row(conn, cur, patient_uuid, wound_uuid, store_seg)
out["segmentation_id"] = sid
out["segmentation_url"] = _file_url(store_seg)
return out
except Exception as e:
logging.error(f"save_wound_images_bundle error: {e}", exc_info=True)
try:
conn.rollback()
except Exception:
pass
return out
finally:
try:
cur.close()
conn.close()
except Exception:
pass
# For older callers that only save one image
def save_wound_image(self, patient_id: int, image) -> Optional[Dict[str, Any]]:
"""
Back-compat single-image save:
- If `image` is a PIL.Image -> we write to uploads and (optionally) the dataset.
- If `image` is a filepath -> we copy + (optionally) upload.
Stores patients.uuid into wound_images.patient_id (VARCHAR).
"""
try:
# Normalize to a temporary local path first
_ensure_dir("uploads")
image_uid = os.urandom(8).hex()
tmp_local = os.path.join("uploads", f"wound_{image_uid}_tmp.jpg")
if hasattr(image, "save"):
image.save(tmp_local, format="JPEG", quality=95)
elif isinstance(image, str) and os.path.exists(image):
with Image.open(image) as pil:
pil = pil.convert("RGB")
pil.save(tmp_local, format="JPEG", quality=95)
else:
logging.error("Invalid image object/path")
return None
# Resolve patients.uuid
conn = self.get_connection()
if not conn: return None
cur = conn.cursor()
cur2 = conn.cursor(dictionary=True)
cur2.execute("SELECT uuid FROM patients WHERE id = %s LIMIT 1", (patient_id,))
row = cur2.fetchone()
cur2.close()
if not row:
logging.error("Patient id not found while saving image")
conn.close()
return None
patient_uuid = row["uuid"]
# Push to uploads/dataset with final name
local_path = self._copy_to_uploads(tmp_local, "original")
url = self._hf_upload_and_url(local_path, subdir="original") if self.use_dataset else None
path_or_url = url or local_path
width, height = self._read_image_size(local_path)
import uuid as _uuid
img_uuid = str(_uuid.uuid4())
cur.execute("""
INSERT INTO wound_images (uuid, patient_id, image, width, height, created_at)
VALUES (%s,%s,%s,%s,%s,%s)
""", (
img_uuid,
patient_uuid,
path_or_url,
int(width) if width else None,
int(height) if height else None,
_now()
))
conn.commit()
image_db_id = cur.lastrowid
cur.close()
conn.close()
return {
"id": image_db_id,
"filename": os.path.basename(local_path),
"path": path_or_url,
"url": _file_url(path_or_url),
}
except Exception as e:
logging.error(f"Image save error: {e}", exc_info=True)
return None
# ------------------------------- analyses -------------------------------
def save_analysis(self, questionnaire_id: int, image_id: Optional[int], analysis_data: Dict[str, Any]) -> Optional[int]:
conn = None
cur = None
try:
conn = self.get_connection()
if not conn: return None
cur = conn.cursor()
cur.execute("""
INSERT INTO ai_analyses (
questionnaire_id, image_id, analysis_data, summary, recommendations,
risk_score, risk_level, processing_time, model_version, created_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
questionnaire_id,
image_id,
json.dumps(analysis_data) if analysis_data else None,
(analysis_data or {}).get("summary", ""),
(analysis_data or {}).get("recommendations", ""),
int((analysis_data or {}).get("risk_score", 0) or 0),
(analysis_data or {}).get("risk_level", "Unknown"),
float((analysis_data or {}).get("processing_time", 0.0) or 0.0),
(analysis_data or {}).get("model_version", "v1.0"),
_now()
))
conn.commit()
return cur.lastrowid
except Exception as e:
if conn: conn.rollback()
logging.error(f"save_analysis error: {e}")
return None
finally:
try:
if cur: cur.close()
if conn: conn.close()
except Exception:
pass
def save_analysis_result(self, questionnaire_response_id: int, analysis_result: Dict[str, Any]):
"""
Compatibility wrapper used by some callers (without image_id).
Stores under the questionnaire template linked to the response.
"""
try:
row = self.execute_query_one(
"SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1",
(questionnaire_response_id,)
)
if not row:
logging.error("No questionnaire_response found for analysis_result save")
return None
questionnaire_id = row["questionnaire_id"]
return self.save_analysis(
questionnaire_id=questionnaire_id,
image_id=None,
analysis_data=analysis_result or {}
)
except Exception as e:
logging.error(f"Error saving analysis result: {e}")
return None
# ------------------------------- queries for UI -------------------------------
def get_user_history_rows(self, user_id: int) -> List[Dict[str, Any]]:
"""
Latest 20 visits across patients; joins ai_analyses.image_id -> wound_images to pull display image.
"""
return self.execute_query("""
SELECT
qr.id AS response_id,
qr.submitted_at AS visit_date,
p.id AS patient_id,
p.uuid AS patient_uuid,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
a.summary,
a.recommendations,
a.risk_score,
a.risk_level,
a.analysis_data,
wi.image AS image_url
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.id
LEFT JOIN wound_images wi ON wi.id = a.image_id
WHERE qr.practitioner_id = %s
ORDER BY qr.submitted_at DESC
LIMIT 20
""", (user_id,), fetch=True) or []
def get_patient_progression_rows(self, user_id: int, patient_id: int) -> List[Dict[str, Any]]:
"""
Chronological visits for a given patient (INT id).
"""
return self.execute_query("""
SELECT
qr.id AS response_id,
qr.submitted_at AS visit_date,
p.id AS patient_id,
p.uuid AS patient_uuid,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
a.summary,
a.recommendations,
a.risk_score,
a.risk_level,
a.analysis_data,
wi.image AS image_url
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.id
LEFT JOIN wound_images wi ON wi.id = a.image_id
WHERE qr.practitioner_id = %s AND p.id = %s
ORDER BY qr.submitted_at ASC
""", (user_id, patient_id), fetch=True) or []
# ----------------------------- organizations -----------------------------
def get_organizations(self):
try:
rows = self.execute_query(
"SELECT id, name as org_name, location FROM organizations ORDER BY name",
fetch=True
)
return rows or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}]
except Exception as e:
logging.error(f"Error getting organizations: {e}")
return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}]
def create_organization(self, org_data: Dict[str, Any]):
try:
res = self.execute_query("""
INSERT INTO organizations (name, email, phone, country_code, department, location, created_at)
VALUES (%s,%s,%s,%s,%s,%s,%s)
""", (
org_data.get('org_name', ''),
org_data.get('email', ''),
org_data.get('phone', ''),
org_data.get('country_code', ''),
org_data.get('department', ''),
org_data.get('location', ''),
_now()
))
if res:
row = self.execute_query_one(
"SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1",
(org_data.get('org_name', ''),)
)
return row['id'] if row else None
return None
except Exception as e:
logging.error(f"Error creating organization: {e}")
return None