# src/patient_history.py
import logging
import json
import html
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Any
class PatientHistoryManager:
"""
Complete patient history and wound tracking system (schema-aligned).
Key schema expectations this class honors:
- questionnaire_responses.patient_id -> patients.id (INT/BIGINT)
- patients.uuid is the stable string identifier for string-FK tables
- wounds.patient_id, wound_images.patient_id, notes.patient_id may be VARCHAR -> store patients.uuid
- ai_analyses.questionnaire_id -> questionnaires.id (template-level linkage)
"""
def __init__(self, database_manager):
self.db = database_manager
# --------------------------- JSON helpers ---------------------------
def _safe_json(self, maybe_json) -> Optional[dict]:
"""Parse JSON from str/dict safely."""
try:
if maybe_json is None:
return None
if isinstance(maybe_json, dict):
return maybe_json
if isinstance(maybe_json, (bytes, bytearray)):
maybe_json = maybe_json.decode("utf-8", errors="ignore")
if isinstance(maybe_json, str) and maybe_json.strip():
return json.loads(maybe_json)
except Exception:
return None
return None
def _from_response(self, row: Dict, path: List[str], default=None):
"""
Extract a nested field from qr.response_data JSON.
path example: ["wound_details", "pain_level"]
"""
data = self._safe_json(row.get("response_data"))
cur = data
try:
for key in path:
if isinstance(cur, dict) and key in cur:
cur = cur[key]
else:
return default
return cur
except Exception:
return default
# --------------------------- Core queries ---------------------------
def get_patient_complete_history(
self,
user_id: int,
patient_name: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""
Full visit list for a practitioner, optionally filtered by patient name.
Includes joins to wounds, wound_images (via patients.uuid or string id)
and ai_analyses (via questionnaire template).
"""
try:
# Defensive bounds for pagination
limit = max(1, min(int(limit), 500))
offset = max(0, int(offset))
sql = f"""
SELECT
qr.id AS response_id,
qr.questionnaire_id,
qr.submitted_at AS visit_date,
qr.response_data,
p.id AS patient_id,
p.uuid AS patient_uuid,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
w.position AS wound_location,
w.moisture,
w.infection,
w.notes,
wi.image AS image_url,
a.analysis_data,
a.summary,
a.recommendations,
a.risk_score,
a.risk_level
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN wounds w
ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR))
LEFT JOIN wound_images wi
ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR))
LEFT JOIN ai_analyses a
ON a.questionnaire_id = qr.questionnaire_id
WHERE qr.practitioner_id = %s
{ "AND p.name = %s" if patient_name else "" }
ORDER BY qr.submitted_at DESC
LIMIT %s OFFSET %s
"""
params: Tuple = (user_id, patient_name, limit, offset) if patient_name else (user_id, limit, offset)
rows = self.db.execute_query(sql, params, fetch=True) or []
# Enrich with JSON-derived fields
for r in rows:
r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"])
loc_json = self._from_response(r, ["wound_details", "location"])
if loc_json:
r["wound_location"] = loc_json
return rows
except Exception as e:
logging.error(f"Error fetching patient complete history: {e}", exc_info=True)
return []
def get_patient_list(self, user_id: int) -> List[Dict]:
"""
Unique patients seen by this practitioner with first/last visit and count.
"""
try:
sql = """
SELECT
p.id AS id,
p.uuid,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
COUNT(qr.id) AS total_visits,
MAX(qr.submitted_at) AS last_visit,
MIN(qr.submitted_at) AS first_visit
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
WHERE qr.practitioner_id = %s
GROUP BY p.id, p.uuid, p.name, p.age, p.gender
ORDER BY last_visit DESC
"""
return self.db.execute_query(sql, (user_id,), fetch=True) or []
except Exception as e:
logging.error(f"Error fetching patient list: {e}", exc_info=True)
return []
def get_wound_progression(self, user_id: int, patient_name: str) -> List[Dict]:
"""
Ascending temporal list for one patient (by name) — kept for backward compatibility.
Prefer get_wound_progression_by_id().
"""
try:
sql = """
SELECT
qr.submitted_at AS visit_date,
qr.response_data,
w.position AS wound_location,
w.moisture,
w.infection,
a.risk_score,
a.risk_level,
a.summary,
wi.image AS image_url,
p.name AS patient_name
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN wounds w
ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR))
LEFT JOIN wound_images wi
ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR))
LEFT JOIN ai_analyses a
ON a.questionnaire_id = qr.questionnaire_id
WHERE qr.practitioner_id = %s
AND p.name = %s
ORDER BY qr.submitted_at ASC
"""
rows = self.db.execute_query(sql, (user_id, patient_name), fetch=True) or []
for r in rows:
r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"])
loc_json = self._from_response(r, ["wound_details", "location"])
if loc_json:
r["wound_location"] = loc_json
return rows
except Exception as e:
logging.error(f"Error fetching wound progression: {e}", exc_info=True)
return []
def get_wound_progression_by_id(self, user_id: int, patient_id: int) -> List[Dict]:
"""
Ascending temporal list for one patient (by numeric patient_id).
Use this for “View Details” when a patient is chosen from a dropdown.
"""
try:
sql = """
SELECT
qr.submitted_at AS visit_date,
qr.response_data,
w.position AS wound_location,
w.moisture,
w.infection,
a.risk_score,
a.risk_level,
a.summary,
wi.image AS image_url,
p.name AS patient_name
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN wounds w
ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR))
LEFT JOIN wound_images wi
ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR))
LEFT JOIN ai_analyses a
ON a.questionnaire_id = qr.questionnaire_id
WHERE qr.practitioner_id = %s
AND p.id = %s
ORDER BY qr.submitted_at ASC
"""
rows = self.db.execute_query(sql, (user_id, int(patient_id)), fetch=True) or []
for r in rows:
r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"])
loc_json = self._from_response(r, ["wound_details", "location"])
if loc_json:
r["wound_location"] = loc_json
return rows
except Exception as e:
logging.error(f"Error fetching wound progression by id: {e}", exc_info=True)
return []
def save_patient_note(self, user_id: int, patient_name: str, note: str) -> bool:
"""
Persist a clinical note for the patient's UUID into `notes`.
Finds the patient by latest encounter under this practitioner.
"""
try:
row = self.db.execute_query_one(
"""
SELECT p.uuid
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
WHERE qr.practitioner_id = %s AND p.name = %s
ORDER BY qr.submitted_at DESC
LIMIT 1
""",
(user_id, patient_name)
)
if not row or not row.get("uuid"):
logging.error("save_patient_note: could not resolve patient uuid")
return False
patient_uuid = row["uuid"]
rc = self.db.execute_query(
"""
INSERT INTO notes (uuid, patient_id, note, added_by, created_at, updated_at)
VALUES (UUID(), %s, %s, %s, NOW(), NOW())
""",
(patient_uuid, note, str(user_id))
)
return bool(rc)
except Exception as e:
logging.error(f"Error saving patient note: {e}", exc_info=True)
return False
# --------------------------- UI Wrappers ---------------------------
def get_user_patient_history(self, user_id: int) -> List[Dict]:
"""Wrapper used by UI: latest history for all patients."""
return self.get_patient_complete_history(user_id=user_id, limit=100, offset=0)
def search_patient_by_name(self, user_id: int, patient_name: str) -> List[Dict]:
"""Wrapper used by UI: history filtered to a single patient name."""
return self.get_patient_complete_history(user_id=user_id, patient_name=patient_name, limit=100, offset=0)
# --------------------------- Render helpers ---------------------------
def _fmt_dt(self, dt_obj) -> str:
try:
if hasattr(dt_obj, "strftime"):
return dt_obj.strftime('%b %d, %Y %I:%M %p')
if isinstance(dt_obj, str):
try:
dt = datetime.fromisoformat(dt_obj.replace('Z', '+00:00'))
return dt.strftime('%b %d, %Y %I:%M %p')
except Exception:
return dt_obj
return str(dt_obj)
except Exception:
return str(dt_obj)
def _risk_chip(self, risk_level: Optional[str]) -> str:
rl = (risk_level or "Unknown").strip().lower()
bg = "#f0f0f0"; fg = "#333"
if rl.startswith("low"):
bg, fg = "#d4edda", "#155724"
elif rl.startswith("moderate"):
bg, fg = "#fff3cd", "#856404"
elif rl.startswith("high"):
bg, fg = "#f8d7da", "#721c24"
return (
"{txt}"
).format(bg=bg, fg=fg, txt=html.escape(risk_level or "Unknown"))
def format_history_for_display(self, rows: List[Dict]) -> str:
"""
Card-style HTML renderer for the general history list.
Safe for embedding in Gradio HTML.
"""
if not rows:
return "
No history found.
"
parts = [""]
for r in rows:
dt = self._fmt_dt(r.get("visit_date"))
patient = f"{html.escape(str(r.get('patient_name') or ''))}"
age = html.escape(str(r.get("patient_age") or "N/A"))
gender = html.escape(str(r.get("patient_gender") or ""))
wound_loc = html.escape(str(r.get("wound_location") or "N/A"))
pain = html.escape(str(r.get("pain_level") or "N/A"))
risk_chip = self._risk_chip(r.get("risk_level"))
summary = r.get("summary")
img = r.get("image_url")
parts.append("
")
parts.append(
"
"
f"
{patient} • {age} • {gender}
"
f"
{dt}
"
"
"
)
row2 = f"Wound: {wound_loc} • Pain: {pain} • Risk: {risk_chip}"
parts.append(f"
{row2}
")
if summary:
parts.append(
"
"
f"{html.escape(str(summary))}"
"
"
)
if img:
parts.append(
"
"
.format(html.escape(img))
)
parts.append("
") # card
parts.append("
")
return "".join(parts)
def format_patient_progress_for_display(self, rows: List[Dict]) -> str:
"""
Professional timeline for a single patient (used by 'View Details').
"""
if not rows:
return "No progression data available.
"
pname = html.escape(str(rows[0].get("patient_name", "Unknown Patient")))
header = (
""
f"
🧭 Wound Progress — {pname}
"
""
)
items: List[str] = []
for r in rows:
visit_date = r.get("visit_date")
disp_date = visit_date.strftime("%B %d, %Y") if hasattr(visit_date, "strftime") else str(visit_date)
img = r.get("image_url") or ""
img_tag = (
"
"
.format(src=html.escape(img))
if img else ""
)
risk = str(r.get("risk_level", "Unknown"))
risk_chip = self._risk_chip(risk)
summary = html.escape(str(r.get("summary") or "No summary."))
items.append(
""
"
"
f"
{html.escape(disp_date)}
"
f"
{img_tag}
"
"
"
"
"
f"
{risk_chip}
"
f"
{summary}
"
"
"
"
"
)
body = "" + "\n".join(items) + "
"
container = (
""
f"{header}"
f"{body}"
"
"
)
return container
def format_patient_data_for_display(self, rows: List[Dict]) -> str:
"""Renderer for a single patient's history (reuses card format)."""
return self.format_history_for_display(rows)
# ===================== REPORT GENERATOR =====================
class ReportGenerator:
"""Professional HTML report generator for wound analysis."""
def __init__(self):
pass
def _format_recommendations(self, recommendations) -> str:
"""
Accepts str OR list OR dict and returns an HTML .
"""
# If already a list
if isinstance(recommendations, list):
items = [str(x).strip() for x in recommendations if str(x).strip()]
# If dict, join "key: value"
elif isinstance(recommendations, dict):
items = [f"{k}: {v}" for k, v in recommendations.items() if str(v).strip()]
else:
# str → split on common delimiters
rec = str(recommendations or "").strip()
if not rec:
return "No specific recommendations available.
"
for delim in ["\n", ". ", "; "]:
if delim in rec:
items = [x.strip() for x in rec.split(delim) if x.strip()]
break
else:
items = [rec]
if not items:
return "No specific recommendations available.
"
lis = "".join(f"- {html.escape(i)}
" for i in items if len(i) > 2)
return f""
def generate_analysis_report(
self, patient_data: Dict[str, Any], analysis_data: Dict[str, Any], image_url: Optional[str] = None
) -> str:
"""
Generate comprehensive printable HTML report for a single analysis.
(No nested f-strings)
"""
risk_level = (analysis_data or {}).get("risk_level", "Unknown")
risk_class = f"risk-{str(risk_level).lower().replace(' ', '-')}"
summary = (analysis_data or {}).get("summary", "No analysis summary available.")
recs = self._format_recommendations((analysis_data or {}).get("recommendations", ""))
# Build optional image section first to avoid nested f-strings
if image_url:
image_section = (
""
"
Wound Image
"
"
"
f"
}\")
"
"
"
)
else:
image_section = ""
# Note: this is an f-string. All literal braces are doubled.
return f"""
SmartHeal AI - Wound Analysis Report
Patient Information
Patient Name
{html.escape(str(patient_data.get('patient_name','N/A')))}
Age
{html.escape(str(patient_data.get('patient_age','N/A')))} years
Gender
{html.escape(str(patient_data.get('patient_gender','N/A')))}
Assessment Date
{datetime.now().strftime('%B %d, %Y at %I:%M %p')}
Wound Assessment
Location
{html.escape(str(patient_data.get('wound_location','N/A')))}
Duration
{html.escape(str(patient_data.get('wound_duration','N/A')))}
Pain Level
{html.escape(str(patient_data.get('pain_level','N/A')))} / 10
Risk Assessment
{html.escape(str(risk_level))} Risk
{image_section}
AI Analysis Summary
{html.escape(str(summary))}
🎯 Clinical Recommendations
{recs}
Medical History
Medical History
{html.escape(str(patient_data.get('medical_history','None reported')))}
Current Medications
{html.escape(str(patient_data.get('medications','None reported')))}
Known Allergies
{html.escape(str(patient_data.get('allergies','None reported')))}
Additional Notes
{html.escape(str(patient_data.get('additional_notes','None')))}
"""
def generate_patient_history_report(self, patient_history: List[Dict]) -> str:
"""
Large, printable HTML summarizing multiple visits for a patient.
(Fixed: no nested f-strings)
"""
if not patient_history:
return "No patient history available.
"
pname = html.escape(str(patient_history[0].get('patient_name', 'Unknown Patient')))
rows_html: List[str] = []
for i, visit in enumerate(patient_history):
dt = visit.get('visit_date')
if hasattr(dt, "strftime"):
dt_str = dt.strftime('%B %d, %Y')
else:
dt_str = str(dt)
wound_loc = html.escape(str(visit.get('wound_location', 'N/A')))
pain = html.escape(str(visit.get('pain_level', 'N/A')))
risk = html.escape(str(visit.get('risk_level', 'Unknown')))
summary = visit.get('summary')
# Build optional summary block separately to avoid nested f-string
if summary:
summary_block = (
"Summary: "
f"{html.escape(str(summary))}"
"
"
)
else:
summary_block = ""
rows_html.append(
""
"
"
f"
Visit #{len(patient_history)-i}
"
f"{html.escape(dt_str)}"
""
"
"
f"
Location: {wound_loc}
"
f"
Pain: {pain}/10
"
f"
Risk: {risk}
"
"
"
f"{summary_block}"
"
"
)
return (
""
"
"
f"
📋 Patient History: {pname}
"
"
Complete Treatment Timeline
"
"
"
"
"
+ "".join(rows_html) +
"
"
"
"
)