# database.py (SmartHeal) — dataset ID hardcoded import os import json import logging from datetime import datetime from typing import Optional, Dict, Any, List import mysql.connector from mysql.connector import Error from PIL import Image # ----------------------------- Hardcoded Dataset ----------------------------- DATASET_ID = "SmartHeal/wound-image-uploads" # <— hardcoded as requested # Optional HF dataset integration _HF_AVAILABLE = False try: from huggingface_hub import HfApi, HfFolder, create_repo _HF_AVAILABLE = True except Exception: _HF_AVAILABLE = False # ----------------------------- helpers ----------------------------- def _now(): return datetime.now() def _abs(p: str) -> str: try: return os.path.abspath(p) except Exception: return p def _ensure_dir(path: str): os.makedirs(path, exist_ok=True) def _file_url(path: str) -> str: """ For local files, return /file=/abs/path (works in Gradio/Spaces). For remote (http/https), return as-is. """ if not path: return "" s = str(path) if s.startswith("http://") or s.startswith("https://"): return s return f"/file={_abs(s)}" # -------------------------- DatabaseManager -------------------------- class DatabaseManager: """ Database operations manager for SmartHeal - Keeps patient_id (INT) consistent for questionnaire_responses / ai_analyses joins - Uses patients.uuid (CHAR 36) in wounds / wound_images where those tables store VARCHAR - Saves images locally OR to a hardcoded Hugging Face dataset and stores the URL """ def __init__(self, mysql_config: Dict[str, Any], hf_token: Optional[str] = None): self.mysql_config = mysql_config # storage backend: hardcoded dataset id; token from arg or env self.dataset_id = DATASET_ID self.hf_token = hf_token or os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_TOKEN") self.use_dataset = bool(self.dataset_id and self.hf_token and _HF_AVAILABLE) if self.use_dataset: logging.info(f"HF dataset storage enabled: {self.dataset_id}") try: HfFolder.save_token(self.hf_token) try: create_repo(repo_id=self.dataset_id, repo_type="dataset", exist_ok=True, token=self.hf_token) except Exception: pass self.hf_api = HfApi(token=self.hf_token) except Exception as e: logging.warning(f"Could not initialize HF dataset backend, falling back to local. Err: {e}") self.use_dataset = False else: logging.info("Using LOCAL storage (uploads/)") _ensure_dir("uploads") self.test_connection() self._ensure_default_questionnaire() # -------------------- low-level connection utils -------------------- def test_connection(self): try: conn = self.get_connection() if conn: conn.close() logging.info("✅ Database connection successful") else: logging.error("❌ Database connection failed") except Exception as e: logging.error(f"Database connection test failed: {e}") def get_connection(self): try: return mysql.connector.connect(**self.mysql_config) except Error as e: logging.error(f"Error connecting to MySQL: {e}") return None def execute_query(self, query, params=None, fetch=False): conn = self.get_connection() if not conn: return None cur = None try: cur = conn.cursor(dictionary=True) cur.execute(query, params or ()) if fetch: return cur.fetchall() conn.commit() return cur.rowcount except Error as e: logging.error(f"Error executing query: {e}\nSQL: {query}\nParams: {params}") if conn: conn.rollback() return None finally: if cur: cur.close() if conn and conn.is_connected(): conn.close() def execute_query_one(self, query, params=None): conn = self.get_connection() if not conn: return None cur = None try: cur = conn.cursor(dictionary=True) cur.execute(query, params or ()) return cur.fetchone() except Error as e: logging.error(f"Error executing query: {e}\nSQL: {query}\nParams: {params}") return None finally: if cur: cur.close() if conn and conn.is_connected(): conn.close() # ---------------------- One-time ensures ---------------------- def _ensure_default_questionnaire(self): """Ensure a 'Default Patient Assessment' row exists in questionnaires.""" try: row = self.execute_query_one( "SELECT id FROM questionnaires WHERE name = %s LIMIT 1", ("Default Patient Assessment",) ) if not row: self.execute_query( "INSERT INTO questionnaires (name, description, created_at, updated_at) " "VALUES (%s, %s, NOW(), NOW())", ("Default Patient Assessment", "Standard patient wound assessment form") ) logging.info("Created default questionnaire 'Default Patient Assessment'") except Exception as e: logging.error(f"Error ensuring default questionnaire: {e}") # ------------------------------ patients ------------------------------ def _normalize_age(self, age_val: Any) -> Optional[int]: try: return int(age_val) if age_val not in [None, ""] else None except Exception: return None def get_patient_by_id(self, patient_id: int) -> Optional[Dict[str, Any]]: return self.execute_query_one( "SELECT id, uuid, name, age, gender FROM patients WHERE id = %s LIMIT 1", (patient_id,) ) def get_patient_by_name_age_gender(self, name: str, age: Any, gender: str) -> Optional[Dict[str, Any]]: age_val = self._normalize_age(age) if age_val is None: return self.execute_query_one( "SELECT id, uuid, name, age, gender FROM patients " "WHERE name = %s AND age IS NULL AND gender = %s LIMIT 1", (name, gender), ) return self.execute_query_one( "SELECT id, uuid, name, age, gender FROM patients " "WHERE name = %s AND age = %s AND gender = %s LIMIT 1", (name, age_val, gender), ) def create_patient(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: conn = None cur = None try: conn = self.get_connection() if not conn: return None cur = conn.cursor() import uuid as _uuid p_uuid = str(_uuid.uuid4()) cur.execute(""" INSERT INTO patients (uuid, name, age, gender, illness, allergy, notes, created_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, ( p_uuid, data.get("patient_name"), self._normalize_age(data.get("patient_age")), data.get("patient_gender"), data.get("medical_history", ""), data.get("allergies", ""), data.get("additional_notes", ""), _now() )) conn.commit() return {"id": cur.lastrowid, "uuid": p_uuid} except Exception as e: if conn: conn.rollback() logging.error(f"create_patient error: {e}") return None finally: try: if cur: cur.close() if conn: conn.close() except Exception: pass def list_patients_for_practitioner(self, practitioner_id: int) -> List[Dict[str, Any]]: return self.execute_query(""" SELECT p.id, p.uuid, p.name, p.age, p.gender, COUNT(qr.id) AS total_visits, MAX(qr.submitted_at) AS last_visit, MIN(qr.submitted_at) AS first_visit FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id WHERE qr.practitioner_id = %s GROUP BY p.id, p.uuid, p.name, p.age, p.gender ORDER BY last_visit DESC """, (practitioner_id,), fetch=True) or [] # --------------------------- questionnaires/visits --------------------------- def _ensure_default_questionnaire_id(self, created_by_user_id: Optional[int] = None) -> int: row = self.execute_query_one( "SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1" ) if row: return int(row["id"]) self.execute_query(""" INSERT INTO questionnaires (name, description, created_by, created_at, updated_at) VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', %s, NOW(), NOW()) """, (created_by_user_id,)) row = self.execute_query_one( "SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' ORDER BY id DESC LIMIT 1" ) return int(row["id"]) if row else 1 def save_questionnaire(self, questionnaire_data: Dict[str, Any], existing_patient_id: Optional[int] = None): """ Create a visit (questionnaire_responses) for an existing or new patient. Returns: {'response_id', 'patient_id', 'patient_uuid'} """ conn = None cur = None try: conn = self.get_connection() if not conn: return None cur = conn.cursor(dictionary=True) # Resolve patient if existing_patient_id: p = self.get_patient_by_id(existing_patient_id) if not p: raise Exception("Existing patient id not found") patient_id = p["id"] patient_uuid = p["uuid"] else: found = self.get_patient_by_name_age_gender( questionnaire_data.get("patient_name"), questionnaire_data.get("patient_age"), questionnaire_data.get("patient_gender"), ) if found: patient_id, patient_uuid = found["id"], found["uuid"] else: created = self.create_patient(questionnaire_data) if not created: raise Exception("Failed to create patient") patient_id, patient_uuid = created["id"], created["uuid"] qid = self._ensure_default_questionnaire_id(questionnaire_data.get("user_id")) response_data = { "patient_info": { "name": questionnaire_data.get("patient_name"), "age": questionnaire_data.get("patient_age"), "gender": questionnaire_data.get("patient_gender"), }, "wound_details": { "location": questionnaire_data.get("wound_location"), "duration": questionnaire_data.get("wound_duration"), "pain_level": questionnaire_data.get("pain_level"), "moisture_level": questionnaire_data.get("moisture_level"), "infection_signs": questionnaire_data.get("infection_signs"), "diabetic_status": questionnaire_data.get("diabetic_status"), }, "medical_history": { "previous_treatment": questionnaire_data.get("previous_treatment"), "medical_history": questionnaire_data.get("medical_history"), "medications": questionnaire_data.get("medications"), "allergies": questionnaire_data.get("allergies"), "additional_notes": questionnaire_data.get("additional_notes"), }, } practitioner_id = questionnaire_data.get("user_id") or 1 cur.execute(""" INSERT INTO questionnaire_responses (questionnaire_id, patient_id, practitioner_id, response_data, submitted_at) VALUES (%s,%s,%s,%s,%s) """, ( qid, patient_id, practitioner_id, json.dumps(response_data), _now() )) conn.commit() response_id = cur.lastrowid logging.info(f"✅ Saved response ID {response_id}") return {"response_id": response_id, "patient_id": patient_id, "patient_uuid": patient_uuid} except Exception as e: if conn: conn.rollback() logging.error(f"save_questionnaire error: {e}") return None finally: try: if cur: cur.close() if conn: conn.close() except Exception: pass # ------------------------------- wounds ------------------------------- def create_wound(self, patient_uuid: str, questionnaire_data: Dict[str, Any]) -> Optional[str]: """Create wound (returns wound_uuid). Stores patient_uuid in wounds.patient_id (VARCHAR).""" conn = None cur = None try: conn = self.get_connection() if not conn: return None cur = conn.cursor() import uuid as _uuid wound_uuid = str(_uuid.uuid4()) cur.execute(""" INSERT INTO wounds (uuid, patient_id, position, category, moisture, infection, notes, created_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, ( wound_uuid, patient_uuid, questionnaire_data.get("wound_location") or "", "Assessment", questionnaire_data.get("moisture_level") or "", questionnaire_data.get("infection_signs") or "", questionnaire_data.get("additional_notes") or "", _now() )) conn.commit() return wound_uuid except Exception as e: if conn: conn.rollback() logging.error(f"create_wound error: {e}") return None finally: try: if cur: cur.close() if conn: conn.close() except Exception: pass # --------------------------- image storage --------------------------- def _read_image_size(self, image_path: str): try: with Image.open(image_path) as im: return im.size # (w, h) except Exception: return (None, None) def _copy_to_uploads(self, src_path: str, suffix: str) -> str: ext = os.path.splitext(src_path)[1] or ".jpg" base = f"wound_{datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')}_{suffix}{ext}" dst = os.path.join("uploads", base) try: with open(src_path, "rb") as r, open(dst, "wb") as w: w.write(r.read()) return dst except Exception: return src_path def _hf_upload_and_url(self, local_path: str, subdir: str) -> Optional[str]: """Upload a file to the (hardcoded) HF dataset and return a public resolve URL.""" if not self.use_dataset: return None try: # destination path in repo (datasets use 'main' branch) day = datetime.utcnow().strftime("%Y/%m/%d") fname = os.path.basename(local_path) repo_path = f"{subdir}/{day}/{fname}" self.hf_api.upload_file( path_or_fileobj=local_path, path_in_repo=repo_path, repo_id=self.dataset_id, repo_type="dataset", commit_message=f"Add {repo_path}" ) return f"https://huggingface.co/datasets/{self.dataset_id}/resolve/main/{repo_path}" except Exception as e: logging.error(f"HF upload failed: {e}") return None def _insert_wound_image_row(self, conn, cur, patient_uuid: str, wound_uuid: Optional[str], image_path_or_url: str) -> Optional[int]: """Insert one row in wound_images; width/height detected for local files only.""" width, height = (None, None) if not (image_path_or_url.startswith("http://") or image_path_or_url.startswith("https://")): width, height = self._read_image_size(image_path_or_url) import uuid as _uuid img_uuid = str(_uuid.uuid4()) cur.execute(""" INSERT INTO wound_images (uuid, patient_id, wound_id, image, width, height, created_at) VALUES (%s,%s,%s,%s,%s,%s,%s) """, ( img_uuid, patient_uuid, wound_uuid, image_path_or_url, int(width) if width else None, int(height) if height else None, _now() )) conn.commit() return cur.lastrowid def save_wound_images_bundle( self, patient_uuid: str, original_path: str, analysis_result: Dict[str, Any], wound_uuid: Optional[str] = None ) -> Dict[str, Any]: """ Save original + detection + segmentation images to wound_images. Stores **HF dataset URLs** when configured, else local paths. Returns display URLs for UI. """ out = { "original_id": None, "original_url": None, "detection_id": None, "detection_url": None, "segmentation_id": None, "segmentation_url": None } conn = self.get_connection() if not conn: return out cur = conn.cursor() try: # Ensure wound row exists if not provided (minimal) if not wound_uuid: import uuid as _uuid wound_uuid = str(_uuid.uuid4()) cur.execute(""" INSERT INTO wounds (uuid, patient_id, category, notes, created_at) VALUES (%s,%s,%s,%s,%s) """, (wound_uuid, patient_uuid, "Assessment", "", _now())) conn.commit() # 1) Original local_orig = self._copy_to_uploads(original_path, "original") url_orig = self._hf_upload_and_url(local_orig, subdir="original") if self.use_dataset else None store_orig = url_orig or local_orig img_id = self._insert_wound_image_row(conn, cur, patient_uuid, wound_uuid, store_orig) out["original_id"] = img_id out["original_url"] = _file_url(store_orig) # 2) Detection / Segmentation from analysis_result va = (analysis_result or {}).get("visual_analysis", {}) or {} det = va.get("detection_image_path") seg = va.get("segmentation_image_path") if det and os.path.exists(det): local_det = self._copy_to_uploads(det, "detect") url_det = self._hf_upload_and_url(local_det, subdir="detect") if self.use_dataset else None store_det = url_det or local_det did = self._insert_wound_image_row(conn, cur, patient_uuid, wound_uuid, store_det) out["detection_id"] = did out["detection_url"] = _file_url(store_det) if seg and os.path.exists(seg): local_seg = self._copy_to_uploads(seg, "segment") url_seg = self._hf_upload_and_url(local_seg, subdir="segment") if self.use_dataset else None store_seg = url_seg or local_seg sid = self._insert_wound_image_row(conn, cur, patient_uuid, wound_uuid, store_seg) out["segmentation_id"] = sid out["segmentation_url"] = _file_url(store_seg) return out except Exception as e: logging.error(f"save_wound_images_bundle error: {e}", exc_info=True) try: conn.rollback() except Exception: pass return out finally: try: cur.close() conn.close() except Exception: pass # For older callers that only save one image def save_wound_image(self, patient_id: int, image) -> Optional[Dict[str, Any]]: """ Back-compat single-image save: - If `image` is a PIL.Image -> we write to uploads and (optionally) the dataset. - If `image` is a filepath -> we copy + (optionally) upload. Stores patients.uuid into wound_images.patient_id (VARCHAR). """ try: # Normalize to a temporary local path first _ensure_dir("uploads") image_uid = os.urandom(8).hex() tmp_local = os.path.join("uploads", f"wound_{image_uid}_tmp.jpg") if hasattr(image, "save"): image.save(tmp_local, format="JPEG", quality=95) elif isinstance(image, str) and os.path.exists(image): with Image.open(image) as pil: pil = pil.convert("RGB") pil.save(tmp_local, format="JPEG", quality=95) else: logging.error("Invalid image object/path") return None # Resolve patients.uuid conn = self.get_connection() if not conn: return None cur = conn.cursor() cur2 = conn.cursor(dictionary=True) cur2.execute("SELECT uuid FROM patients WHERE id = %s LIMIT 1", (patient_id,)) row = cur2.fetchone() cur2.close() if not row: logging.error("Patient id not found while saving image") conn.close() return None patient_uuid = row["uuid"] # Push to uploads/dataset with final name local_path = self._copy_to_uploads(tmp_local, "original") url = self._hf_upload_and_url(local_path, subdir="original") if self.use_dataset else None path_or_url = url or local_path width, height = self._read_image_size(local_path) import uuid as _uuid img_uuid = str(_uuid.uuid4()) cur.execute(""" INSERT INTO wound_images (uuid, patient_id, image, width, height, created_at) VALUES (%s,%s,%s,%s,%s,%s) """, ( img_uuid, patient_uuid, path_or_url, int(width) if width else None, int(height) if height else None, _now() )) conn.commit() image_db_id = cur.lastrowid cur.close() conn.close() return { "id": image_db_id, "filename": os.path.basename(local_path), "path": path_or_url, "url": _file_url(path_or_url), } except Exception as e: logging.error(f"Image save error: {e}", exc_info=True) return None # ------------------------------- analyses ------------------------------- def save_analysis(self, questionnaire_id: int, image_id: Optional[int], analysis_data: Dict[str, Any]) -> Optional[int]: conn = None cur = None try: conn = self.get_connection() if not conn: return None cur = conn.cursor() cur.execute(""" INSERT INTO ai_analyses ( questionnaire_id, image_id, analysis_data, summary, recommendations, risk_score, risk_level, processing_time, model_version, created_at ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, ( questionnaire_id, image_id, json.dumps(analysis_data) if analysis_data else None, (analysis_data or {}).get("summary", ""), (analysis_data or {}).get("recommendations", ""), int((analysis_data or {}).get("risk_score", 0) or 0), (analysis_data or {}).get("risk_level", "Unknown"), float((analysis_data or {}).get("processing_time", 0.0) or 0.0), (analysis_data or {}).get("model_version", "v1.0"), _now() )) conn.commit() return cur.lastrowid except Exception as e: if conn: conn.rollback() logging.error(f"save_analysis error: {e}") return None finally: try: if cur: cur.close() if conn: conn.close() except Exception: pass def save_analysis_result(self, questionnaire_response_id: int, analysis_result: Dict[str, Any]): """ Compatibility wrapper used by some callers (without image_id). Stores under the questionnaire template linked to the response. """ try: row = self.execute_query_one( "SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1", (questionnaire_response_id,) ) if not row: logging.error("No questionnaire_response found for analysis_result save") return None questionnaire_id = row["questionnaire_id"] return self.save_analysis( questionnaire_id=questionnaire_id, image_id=None, analysis_data=analysis_result or {} ) except Exception as e: logging.error(f"Error saving analysis result: {e}") return None # ------------------------------- queries for UI ------------------------------- def get_user_history_rows(self, user_id: int) -> List[Dict[str, Any]]: """ Latest 20 visits across patients; joins ai_analyses.image_id -> wound_images to pull display image. """ return self.execute_query(""" SELECT qr.id AS response_id, qr.submitted_at AS visit_date, p.id AS patient_id, p.uuid AS patient_uuid, p.name AS patient_name, p.age AS patient_age, p.gender AS patient_gender, a.summary, a.recommendations, a.risk_score, a.risk_level, a.analysis_data, wi.image AS image_url FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.id LEFT JOIN wound_images wi ON wi.id = a.image_id WHERE qr.practitioner_id = %s ORDER BY qr.submitted_at DESC LIMIT 20 """, (user_id,), fetch=True) or [] def get_patient_progression_rows(self, user_id: int, patient_id: int) -> List[Dict[str, Any]]: """ Chronological visits for a given patient (INT id). """ return self.execute_query(""" SELECT qr.id AS response_id, qr.submitted_at AS visit_date, p.id AS patient_id, p.uuid AS patient_uuid, p.name AS patient_name, p.age AS patient_age, p.gender AS patient_gender, a.summary, a.recommendations, a.risk_score, a.risk_level, a.analysis_data, wi.image AS image_url FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.id LEFT JOIN wound_images wi ON wi.id = a.image_id WHERE qr.practitioner_id = %s AND p.id = %s ORDER BY qr.submitted_at ASC """, (user_id, patient_id), fetch=True) or [] # ----------------------------- organizations ----------------------------- def get_organizations(self): try: rows = self.execute_query( "SELECT id, name as org_name, location FROM organizations ORDER BY name", fetch=True ) return rows or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] except Exception as e: logging.error(f"Error getting organizations: {e}") return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] def create_organization(self, org_data: Dict[str, Any]): try: res = self.execute_query(""" INSERT INTO organizations (name, email, phone, country_code, department, location, created_at) VALUES (%s,%s,%s,%s,%s,%s,%s) """, ( org_data.get('org_name', ''), org_data.get('email', ''), org_data.get('phone', ''), org_data.get('country_code', ''), org_data.get('department', ''), org_data.get('location', ''), _now() )) if res: row = self.execute_query_one( "SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1", (org_data.get('org_name', ''),) ) return row['id'] if row else None return None except Exception as e: logging.error(f"Error creating organization: {e}") return None