# src/patient_history.py import logging import json import html from datetime import datetime from typing import List, Dict, Optional, Tuple, Any class PatientHistoryManager: """ Complete patient history and wound tracking system (schema-aligned). Key schema expectations this class honors: - questionnaire_responses.patient_id -> patients.id (INT/BIGINT) - patients.uuid is the stable string identifier for string-FK tables - wounds.patient_id, wound_images.patient_id, notes.patient_id may be VARCHAR -> store patients.uuid - ai_analyses.questionnaire_id -> questionnaires.id (template-level linkage) """ def __init__(self, database_manager): self.db = database_manager # --------------------------- JSON helpers --------------------------- def _safe_json(self, maybe_json) -> Optional[dict]: """Parse JSON from str/dict safely.""" try: if maybe_json is None: return None if isinstance(maybe_json, dict): return maybe_json if isinstance(maybe_json, (bytes, bytearray)): maybe_json = maybe_json.decode("utf-8", errors="ignore") if isinstance(maybe_json, str) and maybe_json.strip(): return json.loads(maybe_json) except Exception: return None return None def _from_response(self, row: Dict, path: List[str], default=None): """ Extract a nested field from qr.response_data JSON. path example: ["wound_details", "pain_level"] """ data = self._safe_json(row.get("response_data")) cur = data try: for key in path: if isinstance(cur, dict) and key in cur: cur = cur[key] else: return default return cur except Exception: return default # --------------------------- Core queries --------------------------- def get_patient_complete_history( self, user_id: int, patient_name: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict]: """ Full visit list for a practitioner, optionally filtered by patient name. Includes joins to wounds, wound_images (via patients.uuid or string id) and ai_analyses (via questionnaire template). """ try: # Defensive bounds for pagination limit = max(1, min(int(limit), 500)) offset = max(0, int(offset)) sql = f""" SELECT qr.id AS response_id, qr.questionnaire_id, qr.submitted_at AS visit_date, qr.response_data, p.id AS patient_id, p.uuid AS patient_uuid, p.name AS patient_name, p.age AS patient_age, p.gender AS patient_gender, w.position AS wound_location, w.moisture, w.infection, w.notes, wi.image AS image_url, a.analysis_data, a.summary, a.recommendations, a.risk_score, a.risk_level FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id LEFT JOIN wounds w ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR)) LEFT JOIN wound_images wi ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR)) LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.questionnaire_id WHERE qr.practitioner_id = %s { "AND p.name = %s" if patient_name else "" } ORDER BY qr.submitted_at DESC LIMIT %s OFFSET %s """ params: Tuple = (user_id, patient_name, limit, offset) if patient_name else (user_id, limit, offset) rows = self.db.execute_query(sql, params, fetch=True) or [] # Enrich with JSON-derived fields for r in rows: r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"]) loc_json = self._from_response(r, ["wound_details", "location"]) if loc_json: r["wound_location"] = loc_json return rows except Exception as e: logging.error(f"Error fetching patient complete history: {e}", exc_info=True) return [] def get_patient_list(self, user_id: int) -> List[Dict]: """ Unique patients seen by this practitioner with first/last visit and count. """ try: sql = """ SELECT p.id AS id, p.uuid, p.name AS patient_name, p.age AS patient_age, p.gender AS patient_gender, COUNT(qr.id) AS total_visits, MAX(qr.submitted_at) AS last_visit, MIN(qr.submitted_at) AS first_visit FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id WHERE qr.practitioner_id = %s GROUP BY p.id, p.uuid, p.name, p.age, p.gender ORDER BY last_visit DESC """ return self.db.execute_query(sql, (user_id,), fetch=True) or [] except Exception as e: logging.error(f"Error fetching patient list: {e}", exc_info=True) return [] def get_wound_progression(self, user_id: int, patient_name: str) -> List[Dict]: """ Ascending temporal list for one patient (by name) — kept for backward compatibility. Prefer get_wound_progression_by_id(). """ try: sql = """ SELECT qr.submitted_at AS visit_date, qr.response_data, w.position AS wound_location, w.moisture, w.infection, a.risk_score, a.risk_level, a.summary, wi.image AS image_url, p.name AS patient_name FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id LEFT JOIN wounds w ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR)) LEFT JOIN wound_images wi ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR)) LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.questionnaire_id WHERE qr.practitioner_id = %s AND p.name = %s ORDER BY qr.submitted_at ASC """ rows = self.db.execute_query(sql, (user_id, patient_name), fetch=True) or [] for r in rows: r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"]) loc_json = self._from_response(r, ["wound_details", "location"]) if loc_json: r["wound_location"] = loc_json return rows except Exception as e: logging.error(f"Error fetching wound progression: {e}", exc_info=True) return [] def get_wound_progression_by_id(self, user_id: int, patient_id: int) -> List[Dict]: """ Ascending temporal list for one patient (by numeric patient_id). Use this for “View Details” when a patient is chosen from a dropdown. """ try: sql = """ SELECT qr.submitted_at AS visit_date, qr.response_data, w.position AS wound_location, w.moisture, w.infection, a.risk_score, a.risk_level, a.summary, wi.image AS image_url, p.name AS patient_name FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id LEFT JOIN wounds w ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR)) LEFT JOIN wound_images wi ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR)) LEFT JOIN ai_analyses a ON a.questionnaire_id = qr.questionnaire_id WHERE qr.practitioner_id = %s AND p.id = %s ORDER BY qr.submitted_at ASC """ rows = self.db.execute_query(sql, (user_id, int(patient_id)), fetch=True) or [] for r in rows: r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"]) loc_json = self._from_response(r, ["wound_details", "location"]) if loc_json: r["wound_location"] = loc_json return rows except Exception as e: logging.error(f"Error fetching wound progression by id: {e}", exc_info=True) return [] def save_patient_note(self, user_id: int, patient_name: str, note: str) -> bool: """ Persist a clinical note for the patient's UUID into `notes`. Finds the patient by latest encounter under this practitioner. """ try: row = self.db.execute_query_one( """ SELECT p.uuid FROM questionnaire_responses qr JOIN patients p ON p.id = qr.patient_id WHERE qr.practitioner_id = %s AND p.name = %s ORDER BY qr.submitted_at DESC LIMIT 1 """, (user_id, patient_name) ) if not row or not row.get("uuid"): logging.error("save_patient_note: could not resolve patient uuid") return False patient_uuid = row["uuid"] rc = self.db.execute_query( """ INSERT INTO notes (uuid, patient_id, note, added_by, created_at, updated_at) VALUES (UUID(), %s, %s, %s, NOW(), NOW()) """, (patient_uuid, note, str(user_id)) ) return bool(rc) except Exception as e: logging.error(f"Error saving patient note: {e}", exc_info=True) return False # --------------------------- UI Wrappers --------------------------- def get_user_patient_history(self, user_id: int) -> List[Dict]: """Wrapper used by UI: latest history for all patients.""" return self.get_patient_complete_history(user_id=user_id, limit=100, offset=0) def search_patient_by_name(self, user_id: int, patient_name: str) -> List[Dict]: """Wrapper used by UI: history filtered to a single patient name.""" return self.get_patient_complete_history(user_id=user_id, patient_name=patient_name, limit=100, offset=0) # --------------------------- Render helpers --------------------------- def _fmt_dt(self, dt_obj) -> str: try: if hasattr(dt_obj, "strftime"): return dt_obj.strftime('%b %d, %Y %I:%M %p') if isinstance(dt_obj, str): try: dt = datetime.fromisoformat(dt_obj.replace('Z', '+00:00')) return dt.strftime('%b %d, %Y %I:%M %p') except Exception: return dt_obj return str(dt_obj) except Exception: return str(dt_obj) def _risk_chip(self, risk_level: Optional[str]) -> str: rl = (risk_level or "Unknown").strip().lower() bg = "#f0f0f0"; fg = "#333" if rl.startswith("low"): bg, fg = "#d4edda", "#155724" elif rl.startswith("moderate"): bg, fg = "#fff3cd", "#856404" elif rl.startswith("high"): bg, fg = "#f8d7da", "#721c24" return ( "{txt}" ).format(bg=bg, fg=fg, txt=html.escape(risk_level or "Unknown")) def format_history_for_display(self, rows: List[Dict]) -> str: """ Card-style HTML renderer for the general history list. Safe for embedding in Gradio HTML. """ if not rows: return "
No history found.
" parts = ["
"] for r in rows: dt = self._fmt_dt(r.get("visit_date")) patient = f"{html.escape(str(r.get('patient_name') or ''))}" age = html.escape(str(r.get("patient_age") or "N/A")) gender = html.escape(str(r.get("patient_gender") or "")) wound_loc = html.escape(str(r.get("wound_location") or "N/A")) pain = html.escape(str(r.get("pain_level") or "N/A")) risk_chip = self._risk_chip(r.get("risk_level")) summary = r.get("summary") img = r.get("image_url") parts.append("
") parts.append( "
" f"
{patient} • {age} • {gender}
" f"
{dt}
" "
" ) row2 = f"Wound: {wound_loc} • Pain: {pain} • Risk: {risk_chip}" parts.append(f"
{row2}
") if summary: parts.append( "
" f"{html.escape(str(summary))}" "
" ) if img: parts.append( "
" .format(html.escape(img)) ) parts.append("
") # card parts.append("
") return "".join(parts) def format_patient_progress_for_display(self, rows: List[Dict]) -> str: """ Professional timeline for a single patient (used by 'View Details'). """ if not rows: return "
No progression data available.
" pname = html.escape(str(rows[0].get("patient_name", "Unknown Patient"))) header = ( "
" f"

🧭 Wound Progress — {pname}

" "
" ) items: List[str] = [] for r in rows: visit_date = r.get("visit_date") disp_date = visit_date.strftime("%B %d, %Y") if hasattr(visit_date, "strftime") else str(visit_date) img = r.get("image_url") or "" img_tag = ( "" .format(src=html.escape(img)) if img else "" ) risk = str(r.get("risk_level", "Unknown")) risk_chip = self._risk_chip(risk) summary = html.escape(str(r.get("summary") or "No summary.")) items.append( "
" "
" f"
{html.escape(disp_date)}
" f"
{img_tag}
" "
" "
" f"
{risk_chip}
" f"
{summary}
" "
" "
" ) body = "
" + "\n".join(items) + "
" container = ( "
" f"{header}" f"{body}" "
" ) return container def format_patient_data_for_display(self, rows: List[Dict]) -> str: """Renderer for a single patient's history (reuses card format).""" return self.format_history_for_display(rows) # ===================== REPORT GENERATOR ===================== class ReportGenerator: """Professional HTML report generator for wound analysis.""" def __init__(self): pass def _format_recommendations(self, recommendations) -> str: """ Accepts str OR list OR dict and returns an HTML