Spaces:
Running
Running
| # database.py (SmartHeal) — dataset ID hardcoded, wound_images.patient_id uses INT id | |
| import os | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List, Tuple, Union | |
| import mysql.connector | |
| from mysql.connector import Error | |
| from PIL import Image | |
| # ----------------------------- Hardcoded Dataset ----------------------------- | |
| DATASET_ID = "SmartHeal/wound-image-uploads" # <— hardcoded as requested | |
| # Optional HF dataset integration | |
| _HF_AVAILABLE = False | |
| try: | |
| from huggingface_hub import HfApi, HfFolder, create_repo | |
| _HF_AVAILABLE = True | |
| except Exception: | |
| _HF_AVAILABLE = False | |
| # ----------------------------- helpers ----------------------------- | |
| def _now(): | |
| return datetime.now() | |
| def _abs(p: str) -> str: | |
| try: | |
| return os.path.abspath(p) | |
| except Exception: | |
| return p | |
| def _ensure_dir(path: str): | |
| os.makedirs(path, exist_ok=True) | |
| def _file_url(path: str) -> str: | |
| """ | |
| For local files, return /file=/abs/path (works in Gradio/Spaces). | |
| For remote (http/https), return as-is. | |
| """ | |
| if not path: | |
| return "" | |
| s = str(path) | |
| if s.startswith("http://") or s.startswith("https://"): | |
| return s | |
| return f"/file={_abs(s)}" | |
| # -------------------------- DatabaseManager -------------------------- | |
| class DatabaseManager: | |
| """ | |
| Database operations manager for SmartHeal | |
| - Keeps patient_id (INT) consistent for questionnaire_responses / joins | |
| - Uses patients.id (INT) in wounds.patient_id and wound_images.patient_id (stored as string in VARCHAR columns) | |
| - Saves images locally OR to a hardcoded Hugging Face dataset and stores the URL | |
| """ | |
| def __init__(self, mysql_config: Dict[str, Any], hf_token: Optional[str] = None): | |
| self.mysql_config = mysql_config | |
| # storage backend: hardcoded dataset id; token from arg or env | |
| self.dataset_id = DATASET_ID | |
| self.hf_token = hf_token or os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_TOKEN") | |
| self.use_dataset = bool(self.dataset_id and self.hf_token and _HF_AVAILABLE) | |
| if self.use_dataset: | |
| logging.info(f"HF dataset storage enabled: {self.dataset_id}") | |
| try: | |
| HfFolder.save_token(self.hf_token) | |
| try: | |
| create_repo(repo_id=self.dataset_id, repo_type="dataset", exist_ok=True, token=self.hf_token) | |
| except Exception: | |
| pass | |
| self.hf_api = HfApi(token=self.hf_token) | |
| except Exception as e: | |
| logging.warning(f"Could not initialize HF dataset backend, falling back to local. Err: {e}") | |
| self.use_dataset = False | |
| else: | |
| logging.info("Using LOCAL storage (uploads/)") | |
| _ensure_dir("uploads") | |
| self.test_connection() | |
| self._ensure_default_questionnaire() | |
| # -------------------- low-level connection utils -------------------- | |
| def test_connection(self): | |
| try: | |
| conn = self.get_connection() | |
| if conn: | |
| conn.close() | |
| logging.info("✅ Database connection successful") | |
| else: | |
| logging.error("❌ Database connection failed") | |
| except Exception as e: | |
| logging.error(f"Database connection test failed: {e}") | |
| def get_connection(self): | |
| try: | |
| return mysql.connector.connect(**self.mysql_config) | |
| except Error as e: | |
| logging.error(f"Error connecting to MySQL: {e}") | |
| return None | |
| def execute_query(self, query, params=None, fetch=False): | |
| conn = self.get_connection() | |
| if not conn: | |
| return None | |
| cur = None | |
| try: | |
| cur = conn.cursor(dictionary=True) | |
| cur.execute(query, params or ()) | |
| if fetch: | |
| return cur.fetchall() | |
| conn.commit() | |
| return cur.rowcount | |
| except Error as e: | |
| logging.error(f"Error executing query: {e}\nSQL: {query}\nParams: {params}") | |
| if conn: | |
| conn.rollback() | |
| return None | |
| finally: | |
| if cur: cur.close() | |
| if conn and conn.is_connected(): conn.close() | |
| def execute_query_one(self, query, params=None): | |
| conn = self.get_connection() | |
| if not conn: | |
| return None | |
| cur = None | |
| try: | |
| cur = conn.cursor(dictionary=True) | |
| cur.execute(query, params or ()) | |
| return cur.fetchone() | |
| except Error as e: | |
| logging.error(f"Error executing query: {e}\nSQL: {query}\nParams: {params}") | |
| return None | |
| finally: | |
| if cur: cur.close() | |
| if conn and conn.is_connected(): conn.close() | |
| # ---------------------- One-time ensures ---------------------- | |
| def _ensure_default_questionnaire(self): | |
| """Ensure a 'Default Patient Assessment' row exists in questionnaires.""" | |
| try: | |
| row = self.execute_query_one( | |
| "SELECT id FROM questionnaires WHERE name = %s LIMIT 1", | |
| ("Default Patient Assessment",) | |
| ) | |
| if not row: | |
| self.execute_query( | |
| "INSERT INTO questionnaires (name, description, created_at, updated_at) " | |
| "VALUES (%s, %s, NOW(), NOW())", | |
| ("Default Patient Assessment", "Standard patient wound assessment form") | |
| ) | |
| logging.info("Created default questionnaire 'Default Patient Assessment'") | |
| except Exception as e: | |
| logging.error(f"Error ensuring default questionnaire: {e}") | |
| # ------------------------------ patients ------------------------------ | |
| def _normalize_age(self, age_val: Any) -> Optional[int]: | |
| try: | |
| return int(age_val) if age_val not in [None, ""] else None | |
| except Exception: | |
| return None | |
| def get_patient_by_id(self, patient_id: int) -> Optional[Dict[str, Any]]: | |
| return self.execute_query_one( | |
| "SELECT id, uuid, name, age, gender FROM patients WHERE id = %s LIMIT 1", | |
| (patient_id,) | |
| ) | |
| def get_patient_by_uuid(self, patient_uuid: str) -> Optional[Dict[str, Any]]: | |
| return self.execute_query_one( | |
| "SELECT id, uuid, name, age, gender FROM patients WHERE uuid = %s LIMIT 1", | |
| (patient_uuid,) | |
| ) | |
| def get_patient_by_name_age_gender(self, name: str, age: Any, gender: str) -> Optional[Dict[str, Any]]: | |
| age_val = self._normalize_age(age) | |
| if age_val is None: | |
| return self.execute_query_one( | |
| "SELECT id, uuid, name, age, gender FROM patients " | |
| "WHERE name = %s AND age IS NULL AND gender = %s LIMIT 1", | |
| (name, gender), | |
| ) | |
| return self.execute_query_one( | |
| "SELECT id, uuid, name, age, gender FROM patients " | |
| "WHERE name = %s AND age = %s AND gender = %s LIMIT 1", | |
| (name, age_val, gender), | |
| ) | |
| def _resolve_patient_ids( | |
| self, | |
| patient_ref: Union[int, str] | |
| ) -> Optional[Tuple[int, str]]: | |
| """ | |
| Accepts either a numeric ID or a UUID and returns (id_int, uuid_str). | |
| """ | |
| try: | |
| # numeric ID? | |
| if isinstance(patient_ref, int) or (isinstance(patient_ref, str) and patient_ref.isdigit()): | |
| rid = int(patient_ref) | |
| row = self.get_patient_by_id(rid) | |
| if row: | |
| return (row["id"], row["uuid"]) | |
| return None | |
| # UUID path | |
| row = self.get_patient_by_uuid(str(patient_ref)) | |
| if row: | |
| return (row["id"], row["uuid"]) | |
| return None | |
| except Exception as e: | |
| logging.error(f"_resolve_patient_ids error: {e}") | |
| return None | |
| def create_patient(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| conn = None | |
| cur = None | |
| try: | |
| conn = self.get_connection() | |
| if not conn: return None | |
| cur = conn.cursor() | |
| import uuid as _uuid | |
| p_uuid = str(_uuid.uuid4()) | |
| cur.execute(""" | |
| INSERT INTO patients (uuid, name, age, gender, illness, allergy, notes, created_at, updated_at) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) | |
| """, ( | |
| p_uuid, | |
| data.get("patient_name"), | |
| self._normalize_age(data.get("patient_age")), | |
| data.get("patient_gender"), | |
| data.get("medical_history", ""), | |
| data.get("allergies", ""), | |
| data.get("additional_notes", ""), | |
| _now(), | |
| _now() | |
| )) | |
| conn.commit() | |
| return {"id": cur.lastrowid, "uuid": p_uuid} | |
| except Exception as e: | |
| if conn: conn.rollback() | |
| logging.error(f"create_patient error: {e}") | |
| return None | |
| finally: | |
| try: | |
| if cur: cur.close() | |
| if conn: conn.close() | |
| except Exception: | |
| pass | |
| def list_patients_for_practitioner(self, practitioner_id: int) -> List[Dict[str, Any]]: | |
| return self.execute_query(""" | |
| SELECT | |
| p.id, p.uuid, p.name, p.age, p.gender, | |
| COUNT(qr.id) AS total_visits, | |
| MAX(qr.submitted_at) AS last_visit, | |
| MIN(qr.submitted_at) AS first_visit | |
| FROM questionnaire_responses qr | |
| JOIN patients p ON p.id = qr.patient_id | |
| WHERE qr.practitioner_id = %s | |
| GROUP BY p.id, p.uuid, p.name, p.age, p.gender | |
| ORDER BY last_visit DESC | |
| """, (practitioner_id,), fetch=True) or [] | |
| # --------------------------- questionnaires/visits --------------------------- | |
| def _ensure_default_questionnaire_id(self, created_by_user_id: Optional[int] = None) -> int: | |
| row = self.execute_query_one( | |
| "SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1" | |
| ) | |
| if row: | |
| return int(row["id"]) | |
| self.execute_query(""" | |
| INSERT INTO questionnaires (name, description, created_by, created_at, updated_at) | |
| VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', %s, NOW(), NOW()) | |
| """, (created_by_user_id,)) | |
| row = self.execute_query_one( | |
| "SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' ORDER BY id DESC LIMIT 1" | |
| ) | |
| return int(row["id"]) if row else 1 | |
| def save_questionnaire(self, questionnaire_data: Dict[str, Any], existing_patient_id: Optional[int] = None): | |
| """ | |
| Create a visit (questionnaire_responses) for an existing or new patient. | |
| Returns: {'response_id', 'patient_id', 'patient_uuid'} | |
| """ | |
| conn = None | |
| cur = None | |
| try: | |
| conn = self.get_connection() | |
| if not conn: return None | |
| cur = conn.cursor(dictionary=True) | |
| # Resolve patient | |
| if existing_patient_id: | |
| p = self.get_patient_by_id(existing_patient_id) | |
| if not p: | |
| raise Exception("Existing patient id not found") | |
| patient_id = p["id"] | |
| patient_uuid = p["uuid"] | |
| else: | |
| found = self.get_patient_by_name_age_gender( | |
| questionnaire_data.get("patient_name"), | |
| questionnaire_data.get("patient_age"), | |
| questionnaire_data.get("patient_gender"), | |
| ) | |
| if found: | |
| patient_id, patient_uuid = found["id"], found["uuid"] | |
| else: | |
| created = self.create_patient(questionnaire_data) | |
| if not created: | |
| raise Exception("Failed to create patient") | |
| patient_id, patient_uuid = created["id"], created["uuid"] | |
| qid = self._ensure_default_questionnaire_id(questionnaire_data.get("user_id")) | |
| response_data = { | |
| "patient_info": { | |
| "name": questionnaire_data.get("patient_name"), | |
| "age": questionnaire_data.get("patient_age"), | |
| "gender": questionnaire_data.get("patient_gender"), | |
| }, | |
| "wound_details": { | |
| "location": questionnaire_data.get("wound_location"), | |
| "duration": questionnaire_data.get("wound_duration"), | |
| "pain_level": questionnaire_data.get("pain_level"), | |
| "moisture_level": questionnaire_data.get("moisture_level"), | |
| "infection_signs": questionnaire_data.get("infection_signs"), | |
| "diabetic_status": questionnaire_data.get("diabetic_status"), | |
| }, | |
| "medical_history": { | |
| "previous_treatment": questionnaire_data.get("previous_treatment"), | |
| "medical_history": questionnaire_data.get("medical_history"), | |
| "medications": questionnaire_data.get("medications"), | |
| "allergies": questionnaire_data.get("allergies"), | |
| "additional_notes": questionnaire_data.get("additional_notes"), | |
| }, | |
| } | |
| practitioner_id = questionnaire_data.get("user_id") or 1 | |
| cur.execute(""" | |
| INSERT INTO questionnaire_responses | |
| (questionnaire_id, patient_id, practitioner_id, response_data, submitted_at) | |
| VALUES (%s,%s,%s,%s,%s) | |
| """, ( | |
| qid, patient_id, practitioner_id, json.dumps(response_data), _now() | |
| )) | |
| conn.commit() | |
| response_id = cur.lastrowid | |
| logging.info(f"✅ Saved response ID {response_id}") | |
| return {"response_id": response_id, "patient_id": patient_id, "patient_uuid": patient_uuid} | |
| except Exception as e: | |
| if conn: conn.rollback() | |
| logging.error(f"save_questionnaire error: {e}") | |
| return None | |
| finally: | |
| try: | |
| if cur: cur.close() | |
| if conn: conn.close() | |
| except Exception: | |
| pass | |
| # ------------------------------- wounds ------------------------------- | |
| def create_wound(self, patient_ref: Union[int, str], questionnaire_data: Dict[str, Any]) -> Optional[str]: | |
| """ | |
| Create wound (returns wound_uuid). | |
| Stores **patients.id** (as string) in wounds.patient_id. | |
| """ | |
| conn = None | |
| cur = None | |
| try: | |
| resolved = self._resolve_patient_ids(patient_ref) | |
| if not resolved: | |
| raise Exception("Could not resolve patient by id/uuid") | |
| patient_id_int, _patient_uuid = resolved | |
| conn = self.get_connection() | |
| if not conn: return None | |
| cur = conn.cursor() | |
| import uuid as _uuid | |
| wound_uuid = str(_uuid.uuid4()) | |
| cur.execute(""" | |
| INSERT INTO wounds (uuid, patient_id, position, category, moisture, infection, notes, created_at, updated_at) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) | |
| """, ( | |
| wound_uuid, | |
| str(patient_id_int), # <-- store INT id as string | |
| questionnaire_data.get("wound_location") or "", | |
| "Assessment", | |
| questionnaire_data.get("moisture_level") or "", | |
| questionnaire_data.get("infection_signs") or "", | |
| questionnaire_data.get("additional_notes") or "", | |
| _now(), | |
| _now() | |
| )) | |
| conn.commit() | |
| return wound_uuid | |
| except Exception as e: | |
| if conn: conn.rollback() | |
| logging.error(f"create_wound error: {e}") | |
| return None | |
| finally: | |
| try: | |
| if cur: cur.close() | |
| if conn: conn.close() | |
| except Exception: | |
| pass | |
| # --------------------------- image storage --------------------------- | |
| def _read_image_size(self, image_path: str): | |
| try: | |
| with Image.open(image_path) as im: | |
| return im.size # (w, h) | |
| except Exception: | |
| return (None, None) | |
| def _copy_to_uploads(self, src_path: str, suffix: str) -> str: | |
| ext = os.path.splitext(src_path)[1] or ".jpg" | |
| base = f"wound_{datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')}_{suffix}{ext}" | |
| dst = os.path.join("uploads", base) | |
| try: | |
| with open(src_path, "rb") as r, open(dst, "wb") as w: | |
| w.write(r.read()) | |
| return dst | |
| except Exception: | |
| return src_path | |
| def _hf_upload_and_url(self, local_path: str, subdir: str) -> Optional[str]: | |
| """Upload a file to the (hardcoded) HF dataset and return a public resolve URL.""" | |
| if not self.use_dataset: | |
| return None | |
| try: | |
| # destination path in repo (datasets use 'main' branch) | |
| day = datetime.utcnow().strftime("%Y/%m/%d") | |
| fname = os.path.basename(local_path) | |
| repo_path = f"{subdir}/{day}/{fname}" | |
| self.hf_api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=repo_path, | |
| repo_id=self.dataset_id, | |
| repo_type="dataset", | |
| commit_message=f"Add {repo_path}" | |
| ) | |
| return f"https://huggingface.co/datasets/{self.dataset_id}/resolve/main/{repo_path}" | |
| except Exception as e: | |
| logging.error(f"HF upload failed: {e}") | |
| return None | |
| def _insert_wound_image_row(self, conn, cur, patient_id_str: str, wound_uuid: Optional[str], image_path_or_url: str) -> Optional[int]: | |
| """ | |
| Insert one row in wound_images. | |
| Stores **patients.id** as string in wound_images.patient_id. | |
| """ | |
| width, height = (None, None) | |
| if not (str(image_path_or_url).startswith("http://") or str(image_path_or_url).startswith("https://")): | |
| width, height = self._read_image_size(image_path_or_url) | |
| import uuid as _uuid | |
| img_uuid = str(_uuid.uuid4()) | |
| cur.execute(""" | |
| INSERT INTO wound_images (uuid, patient_id, wound_id, image, width, height, created_at, updated_at) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s) | |
| """, ( | |
| img_uuid, | |
| patient_id_str, # <-- store INT id (as string) | |
| wound_uuid, | |
| image_path_or_url, | |
| int(width) if width else None, | |
| int(height) if height else None, | |
| _now(), | |
| _now() | |
| )) | |
| conn.commit() | |
| return cur.lastrowid | |
| def save_wound_images_bundle( | |
| self, | |
| patient_ref: Union[int, str], | |
| original_path: str, | |
| analysis_result: Dict[str, Any], | |
| wound_uuid: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Save original + detection + segmentation images to wound_images. | |
| - Accepts patient_ref as INT id or UUID | |
| - Stores **patient_id (INT as string)** in wound_images.patient_id | |
| - Ensures a wound row exists (wound_uuid) if not provided | |
| Returns display URLs for UI. | |
| """ | |
| out = { | |
| "original_id": None, "original_url": None, | |
| "detection_id": None, "detection_url": None, | |
| "segmentation_id": None, "segmentation_url": None, | |
| "wound_id": None | |
| } | |
| resolved = self._resolve_patient_ids(patient_ref) | |
| if not resolved: | |
| logging.error("save_wound_images_bundle: could not resolve patient") | |
| return out | |
| patient_id_int, _patient_uuid = resolved | |
| patient_id_str = str(patient_id_int) | |
| conn = self.get_connection() | |
| if not conn: return out | |
| cur = conn.cursor() | |
| try: | |
| # Ensure wound row exists if not provided (minimal) | |
| if not wound_uuid: | |
| import uuid as _uuid | |
| wound_uuid = str(_uuid.uuid4()) | |
| cur.execute(""" | |
| INSERT INTO wounds (uuid, patient_id, category, notes, created_at, updated_at) | |
| VALUES (%s,%s,%s,%s,%s,%s) | |
| """, (wound_uuid, patient_id_str, "Assessment", "", _now(), _now())) | |
| conn.commit() | |
| out["wound_id"] = wound_uuid | |
| # 1) Original | |
| local_orig = self._copy_to_uploads(original_path, "original") | |
| url_orig = self._hf_upload_and_url(local_orig, subdir="original") if self.use_dataset else None | |
| store_orig = url_orig or local_orig | |
| img_id = self._insert_wound_image_row(conn, cur, patient_id_str, wound_uuid, store_orig) | |
| out["original_id"] = img_id | |
| out["original_url"] = _file_url(store_orig) | |
| # 2) Detection / Segmentation from analysis_result | |
| va = (analysis_result or {}).get("visual_analysis", {}) or {} | |
| det = va.get("detection_image_path") | |
| seg = va.get("segmentation_image_path") | |
| if det and os.path.exists(det): | |
| local_det = self._copy_to_uploads(det, "detect") | |
| url_det = self._hf_upload_and_url(local_det, subdir="detect") if self.use_dataset else None | |
| store_det = url_det or local_det | |
| did = self._insert_wound_image_row(conn, cur, patient_id_str, wound_uuid, store_det) | |
| out["detection_id"] = did | |
| out["detection_url"] = _file_url(store_det) | |
| if seg and os.path.exists(seg): | |
| local_seg = self._copy_to_uploads(seg, "segment") | |
| url_seg = self._hf_upload_and_url(local_seg, subdir="segment") if self.use_dataset else None | |
| store_seg = url_seg or local_seg | |
| sid = self._insert_wound_image_row(conn, cur, patient_id_str, wound_uuid, store_seg) | |
| out["segmentation_id"] = sid | |
| out["segmentation_url"] = _file_url(store_seg) | |
| return out | |
| except Exception as e: | |
| logging.error(f"save_wound_images_bundle error: {e}", exc_info=True) | |
| try: | |
| conn.rollback() | |
| except Exception: | |
| pass | |
| return out | |
| finally: | |
| try: | |
| cur.close() | |
| conn.close() | |
| except Exception: | |
| pass | |
| # For older callers that only save one image | |
| def save_wound_image(self, patient_id: int, image) -> Optional[Dict[str, Any]]: | |
| """ | |
| Back-compat single-image save: | |
| - If `image` is a PIL.Image -> we write to uploads and (optionally) the dataset. | |
| - If `image` is a filepath -> we copy + (optionally) upload. | |
| Stores **patients.id** (as string) into wound_images.patient_id. | |
| """ | |
| try: | |
| # Normalize to a temporary local path first | |
| _ensure_dir("uploads") | |
| image_uid = os.urandom(8).hex() | |
| tmp_local = os.path.join("uploads", f"wound_{image_uid}_tmp.jpg") | |
| if hasattr(image, "save"): | |
| image.save(tmp_local, format="JPEG", quality=95) | |
| elif isinstance(image, str) and os.path.exists(image): | |
| with Image.open(image) as pil: | |
| pil = pil.convert("RGB") | |
| pil.save(tmp_local, format="JPEG", quality=95) | |
| else: | |
| logging.error("Invalid image object/path") | |
| return None | |
| # Push to uploads/dataset with final name | |
| local_path = self._copy_to_uploads(tmp_local, "original") | |
| url = self._hf_upload_and_url(local_path, subdir="original") if self.use_dataset else None | |
| path_or_url = url or local_path | |
| width, height = self._read_image_size(local_path) | |
| conn = self.get_connection() | |
| if not conn: return None | |
| cur = conn.cursor() | |
| import uuid as _uuid | |
| img_uuid = str(_uuid.uuid4()) | |
| cur.execute(""" | |
| INSERT INTO wound_images (uuid, patient_id, image, width, height, created_at, updated_at) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s) | |
| """, ( | |
| img_uuid, | |
| str(int(patient_id)), # <-- store INT id as string | |
| path_or_url, | |
| int(width) if width else None, | |
| int(height) if height else None, | |
| _now(), | |
| _now() | |
| )) | |
| conn.commit() | |
| image_db_id = cur.lastrowid | |
| cur.close() | |
| conn.close() | |
| return { | |
| "id": image_db_id, | |
| "filename": os.path.basename(local_path), | |
| "path": path_or_url, | |
| "url": _file_url(path_or_url), | |
| } | |
| except Exception as e: | |
| logging.error(f"Image save error: {e}", exc_info=True) | |
| return None | |
| # ------------------------------- analyses ------------------------------- | |
| def save_analysis(self, questionnaire_id: int, image_id: Optional[int], analysis_data: Dict[str, Any]) -> Optional[int]: | |
| conn = None | |
| cur = None | |
| try: | |
| conn = self.get_connection() | |
| if not conn: return None | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| INSERT INTO ai_analyses ( | |
| questionnaire_id, image_id, analysis_data, summary, recommendations, | |
| risk_score, risk_level, processing_time, model_version, created_at | |
| ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) | |
| """, ( | |
| questionnaire_id, | |
| image_id, | |
| json.dumps(analysis_data) if analysis_data else None, | |
| (analysis_data or {}).get("summary", ""), | |
| (analysis_data or {}).get("recommendations", ""), | |
| int((analysis_data or {}).get("risk_score", 0) or 0), | |
| (analysis_data or {}).get("risk_level", "Unknown"), | |
| float((analysis_data or {}).get("processing_time", 0.0) or 0.0), | |
| (analysis_data or {}).get("model_version", "v1.0"), | |
| _now() | |
| )) | |
| conn.commit() | |
| return cur.lastrowid | |
| except Exception as e: | |
| if conn: conn.rollback() | |
| logging.error(f"save_analysis error: {e}") | |
| return None | |
| finally: | |
| try: | |
| if cur: cur.close() | |
| if conn: conn.close() | |
| except Exception: | |
| pass | |
| def save_analysis_result(self, questionnaire_response_id: int, analysis_result: Dict[str, Any]): | |
| """ | |
| Compatibility wrapper used by some callers (without image_id). | |
| Stores under the questionnaire template linked to the response. | |
| """ | |
| try: | |
| row = self.execute_query_one( | |
| "SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1", | |
| (questionnaire_response_id,) | |
| ) | |
| if not row: | |
| logging.error("No questionnaire_response found for analysis_result save") | |
| return None | |
| questionnaire_id = row["questionnaire_id"] | |
| return self.save_analysis( | |
| questionnaire_id=questionnaire_id, | |
| image_id=None, | |
| analysis_data=analysis_result or {} | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error saving analysis result: {e}") | |
| return None | |
| # ------------------------------- queries for UI ------------------------------- | |
| def get_user_history_rows(self, user_id: int) -> List[Dict[str, Any]]: | |
| """ | |
| Latest 20 visits across patients; joins ai_analyses (by questionnaire_id -> questionnaires.id) | |
| and pulls image via a.image_id -> wound_images.id. | |
| """ | |
| return self.execute_query(""" | |
| SELECT | |
| qr.id AS response_id, | |
| qr.submitted_at AS visit_date, | |
| p.id AS patient_id, | |
| p.uuid AS patient_uuid, | |
| p.name AS patient_name, | |
| p.age AS patient_age, | |
| p.gender AS patient_gender, | |
| a.summary, | |
| a.recommendations, | |
| a.risk_score, | |
| a.risk_level, | |
| a.analysis_data, | |
| wi.image AS image_url | |
| FROM questionnaire_responses qr | |
| JOIN patients p ON p.id = qr.patient_id | |
| LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.questionnaire_id | |
| LEFT JOIN wound_images wi ON wi.id = a.image_id | |
| WHERE qr.practitioner_id = %s | |
| ORDER BY qr.submitted_at DESC | |
| LIMIT 20 | |
| """, (user_id,), fetch=True) or [] | |
| def get_patient_progression_rows(self, user_id: int, patient_id: int) -> List[Dict[str, Any]]: | |
| """ | |
| Chronological visits for a given patient (INT id). | |
| """ | |
| return self.execute_query(""" | |
| SELECT | |
| qr.id AS response_id, | |
| qr.submitted_at AS visit_date, | |
| p.id AS patient_id, | |
| p.uuid AS patient_uuid, | |
| p.name AS patient_name, | |
| p.age AS patient_age, | |
| p.gender AS patient_gender, | |
| a.summary, | |
| a.recommendations, | |
| a.risk_score, | |
| a.risk_level, | |
| a.analysis_data, | |
| wi.image AS image_url | |
| FROM questionnaire_responses qr | |
| JOIN patients p ON p.id = qr.patient_id | |
| LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.questionnaire_id | |
| LEFT JOIN wound_images wi ON wi.id = a.image_id | |
| WHERE qr.practitioner_id = %s AND p.id = %s | |
| ORDER BY qr.submitted_at ASC | |
| """, (user_id, patient_id), fetch=True) or [] | |
| # ----------------------------- organizations ----------------------------- | |
| def get_organizations(self): | |
| try: | |
| rows = self.execute_query( | |
| "SELECT id, name as org_name, location FROM organizations ORDER BY name", | |
| fetch=True | |
| ) | |
| return rows or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] | |
| except Exception as e: | |
| logging.error(f"Error getting organizations: {e}") | |
| return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] | |
| def create_organization(self, org_data: Dict[str, Any]): | |
| try: | |
| res = self.execute_query(""" | |
| INSERT INTO organizations (name, email, phone, country_code, department, location, created_at) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s) | |
| """, ( | |
| org_data.get('org_name', ''), | |
| org_data.get('email', ''), | |
| org_data.get('phone', ''), | |
| org_data.get('country_code', ''), | |
| org_data.get('department', ''), | |
| org_data.get('location', ''), | |
| _now() | |
| )) | |
| if res: | |
| row = self.execute_query_one( | |
| "SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1", | |
| (org_data.get('org_name', ''),) | |
| ) | |
| return row['id'] if row else None | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error creating organization: {e}") | |
| return None |