SmartHeal-Agentic-AI / src /patient_history.py
SmartHeal's picture
Update src/patient_history.py
39b256f verified
raw
history blame
27.5 kB
# src/patient_history.py
import logging
import json
import html
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Any
class PatientHistoryManager:
"""
Complete patient history and wound tracking system (schema-aligned).
Key schema expectations this class honors:
- questionnaire_responses.patient_id -> patients.id (INT/BIGINT)
- patients.uuid is the stable string identifier for string-FK tables
- wounds.patient_id, wound_images.patient_id, notes.patient_id may be VARCHAR -> store patients.uuid
- ai_analyses.questionnaire_id -> questionnaires.id (template-level linkage)
"""
def __init__(self, database_manager):
self.db = database_manager
# --------------------------- JSON helpers ---------------------------
def _safe_json(self, maybe_json) -> Optional[dict]:
"""Parse JSON from str/dict safely."""
try:
if maybe_json is None:
return None
if isinstance(maybe_json, dict):
return maybe_json
if isinstance(maybe_json, (bytes, bytearray)):
maybe_json = maybe_json.decode("utf-8", errors="ignore")
if isinstance(maybe_json, str) and maybe_json.strip():
return json.loads(maybe_json)
except Exception:
return None
return None
def _from_response(self, row: Dict, path: List[str], default=None):
"""
Extract a nested field from qr.response_data JSON.
path example: ["wound_details", "pain_level"]
"""
data = self._safe_json(row.get("response_data"))
cur = data
try:
for key in path:
if isinstance(cur, dict) and key in cur:
cur = cur[key]
else:
return default
return cur
except Exception:
return default
# --------------------------- Core queries ---------------------------
def get_patient_complete_history(
self,
user_id: int,
patient_name: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""
Full visit list for a practitioner, optionally filtered by patient name.
Includes joins to wounds, wound_images (via patients.uuid or string id)
and ai_analyses (via questionnaire template).
"""
try:
# Defensive bounds for pagination
limit = max(1, min(int(limit), 500))
offset = max(0, int(offset))
sql = f"""
SELECT
qr.id AS response_id,
qr.questionnaire_id,
qr.submitted_at AS visit_date,
qr.response_data,
p.id AS patient_id,
p.uuid AS patient_uuid,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
w.position AS wound_location,
w.moisture,
w.infection,
w.notes,
wi.image AS image_url,
a.analysis_data,
a.summary,
a.recommendations,
a.risk_score,
a.risk_level
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN wounds w
ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR))
LEFT JOIN wound_images wi
ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR))
LEFT JOIN ai_analyses a
ON a.questionnaire_id = qr.questionnaire_id
WHERE qr.practitioner_id = %s
{ "AND p.name = %s" if patient_name else "" }
ORDER BY qr.submitted_at DESC
LIMIT %s OFFSET %s
"""
params: Tuple = (user_id, patient_name, limit, offset) if patient_name else (user_id, limit, offset)
rows = self.db.execute_query(sql, params, fetch=True) or []
# Enrich with JSON-derived fields
for r in rows:
r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"])
loc_json = self._from_response(r, ["wound_details", "location"])
if loc_json:
r["wound_location"] = loc_json
return rows
except Exception as e:
logging.error(f"Error fetching patient complete history: {e}", exc_info=True)
return []
def get_patient_list(self, user_id: int) -> List[Dict]:
"""
Unique patients seen by this practitioner with first/last visit and count.
"""
try:
sql = """
SELECT
p.id AS id,
p.uuid,
p.name AS patient_name,
p.age AS patient_age,
p.gender AS patient_gender,
COUNT(qr.id) AS total_visits,
MAX(qr.submitted_at) AS last_visit,
MIN(qr.submitted_at) AS first_visit
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
WHERE qr.practitioner_id = %s
GROUP BY p.id, p.uuid, p.name, p.age, p.gender
ORDER BY last_visit DESC
"""
return self.db.execute_query(sql, (user_id,), fetch=True) or []
except Exception as e:
logging.error(f"Error fetching patient list: {e}", exc_info=True)
return []
def get_wound_progression(self, user_id: int, patient_name: str) -> List[Dict]:
"""
Ascending temporal list for one patient (by name) — kept for backward compatibility.
Prefer get_wound_progression_by_id().
"""
try:
sql = """
SELECT
qr.submitted_at AS visit_date,
qr.response_data,
w.position AS wound_location,
w.moisture,
w.infection,
a.risk_score,
a.risk_level,
a.summary,
wi.image AS image_url,
p.name AS patient_name
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN wounds w
ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR))
LEFT JOIN wound_images wi
ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR))
LEFT JOIN ai_analyses a
ON a.questionnaire_id = qr.questionnaire_id
WHERE qr.practitioner_id = %s
AND p.name = %s
ORDER BY qr.submitted_at ASC
"""
rows = self.db.execute_query(sql, (user_id, patient_name), fetch=True) or []
for r in rows:
r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"])
loc_json = self._from_response(r, ["wound_details", "location"])
if loc_json:
r["wound_location"] = loc_json
return rows
except Exception as e:
logging.error(f"Error fetching wound progression: {e}", exc_info=True)
return []
def get_wound_progression_by_id(self, user_id: int, patient_id: int) -> List[Dict]:
"""
Ascending temporal list for one patient (by numeric patient_id).
Use this for “View Details” when a patient is chosen from a dropdown.
"""
try:
sql = """
SELECT
qr.submitted_at AS visit_date,
qr.response_data,
w.position AS wound_location,
w.moisture,
w.infection,
a.risk_score,
a.risk_level,
a.summary,
wi.image AS image_url,
p.name AS patient_name
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
LEFT JOIN wounds w
ON (w.patient_id = p.uuid OR w.patient_id = CAST(p.id AS CHAR))
LEFT JOIN wound_images wi
ON (wi.patient_id = p.uuid OR wi.patient_id = CAST(p.id AS CHAR))
LEFT JOIN ai_analyses a
ON a.questionnaire_id = qr.questionnaire_id
WHERE qr.practitioner_id = %s
AND p.id = %s
ORDER BY qr.submitted_at ASC
"""
rows = self.db.execute_query(sql, (user_id, int(patient_id)), fetch=True) or []
for r in rows:
r["pain_level"] = self._from_response(r, ["wound_details", "pain_level"])
loc_json = self._from_response(r, ["wound_details", "location"])
if loc_json:
r["wound_location"] = loc_json
return rows
except Exception as e:
logging.error(f"Error fetching wound progression by id: {e}", exc_info=True)
return []
def save_patient_note(self, user_id: int, patient_name: str, note: str) -> bool:
"""
Persist a clinical note for the patient's UUID into `notes`.
Finds the patient by latest encounter under this practitioner.
"""
try:
row = self.db.execute_query_one(
"""
SELECT p.uuid
FROM questionnaire_responses qr
JOIN patients p ON p.id = qr.patient_id
WHERE qr.practitioner_id = %s AND p.name = %s
ORDER BY qr.submitted_at DESC
LIMIT 1
""",
(user_id, patient_name)
)
if not row or not row.get("uuid"):
logging.error("save_patient_note: could not resolve patient uuid")
return False
patient_uuid = row["uuid"]
rc = self.db.execute_query(
"""
INSERT INTO notes (uuid, patient_id, note, added_by, created_at, updated_at)
VALUES (UUID(), %s, %s, %s, NOW(), NOW())
""",
(patient_uuid, note, str(user_id))
)
return bool(rc)
except Exception as e:
logging.error(f"Error saving patient note: {e}", exc_info=True)
return False
# --------------------------- UI Wrappers ---------------------------
def get_user_patient_history(self, user_id: int) -> List[Dict]:
"""Wrapper used by UI: latest history for all patients."""
return self.get_patient_complete_history(user_id=user_id, limit=100, offset=0)
def search_patient_by_name(self, user_id: int, patient_name: str) -> List[Dict]:
"""Wrapper used by UI: history filtered to a single patient name."""
return self.get_patient_complete_history(user_id=user_id, patient_name=patient_name, limit=100, offset=0)
# --------------------------- Render helpers ---------------------------
def _fmt_dt(self, dt_obj) -> str:
try:
if hasattr(dt_obj, "strftime"):
return dt_obj.strftime('%b %d, %Y %I:%M %p')
if isinstance(dt_obj, str):
try:
dt = datetime.fromisoformat(dt_obj.replace('Z', '+00:00'))
return dt.strftime('%b %d, %Y %I:%M %p')
except Exception:
return dt_obj
return str(dt_obj)
except Exception:
return str(dt_obj)
def _risk_chip(self, risk_level: Optional[str]) -> str:
rl = (risk_level or "Unknown").strip().lower()
bg = "#f0f0f0"; fg = "#333"
if rl.startswith("low"):
bg, fg = "#d4edda", "#155724"
elif rl.startswith("moderate"):
bg, fg = "#fff3cd", "#856404"
elif rl.startswith("high"):
bg, fg = "#f8d7da", "#721c24"
return (
"<span style='background:{bg};color:{fg};padding:2px 8px;border-radius:10px;"
"font-size:12px;font-weight:600;'>{txt}</span>"
).format(bg=bg, fg=fg, txt=html.escape(risk_level or "Unknown"))
def format_history_for_display(self, rows: List[Dict]) -> str:
"""
Card-style HTML renderer for the general history list.
Safe for embedding in Gradio HTML.
"""
if not rows:
return "<div class='status-warning'>No history found.</div>"
parts = ["<div style='display:flex;flex-direction:column;gap:12px;'>"]
for r in rows:
dt = self._fmt_dt(r.get("visit_date"))
patient = f"{html.escape(str(r.get('patient_name') or ''))}"
age = html.escape(str(r.get("patient_age") or "N/A"))
gender = html.escape(str(r.get("patient_gender") or ""))
wound_loc = html.escape(str(r.get("wound_location") or "N/A"))
pain = html.escape(str(r.get("pain_level") or "N/A"))
risk_chip = self._risk_chip(r.get("risk_level"))
summary = r.get("summary")
img = r.get("image_url")
parts.append("<div style='padding:12px;border:1px solid #e2e8f0;border-radius:10px;background:#fff;'>")
parts.append(
"<div style='display:flex;justify-content:space-between;align-items:center;gap:8px;flex-wrap:wrap'>"
f"<div><strong>{patient}</strong> • {age}{gender}</div>"
f"<div style='color:#4a5568;'>{dt}</div>"
"</div>"
)
row2 = f"Wound: {wound_loc} • Pain: {pain} • Risk: {risk_chip}"
parts.append(f"<div style='margin-top:6px'>{row2}</div>")
if summary:
parts.append(
"<div style='margin-top:6px;color:#2d3748'><em>"
f"{html.escape(str(summary))}"
"</em></div>"
)
if img:
parts.append(
"<div style='margin-top:10px'><img src='{}' "
"style='max-width:260px;border-radius:8px;border:1px solid #edf2f7'></div>"
.format(html.escape(img))
)
parts.append("</div>") # card
parts.append("</div>")
return "".join(parts)
def format_patient_progress_for_display(self, rows: List[Dict]) -> str:
"""
Professional timeline for a single patient (used by 'View Details').
"""
if not rows:
return "<div class='status-warning'>No progression data available.</div>"
pname = html.escape(str(rows[0].get("patient_name", "Unknown Patient")))
header = (
"<div style='background:linear-gradient(135deg,#3182ce 0%,#2c5aa0 100%);color:#fff;"
"padding:16px;border-radius:12px;margin-bottom:14px;'>"
f"<h3 style='margin:0'>🧭 Wound Progress — {pname}</h3>"
"</div>"
)
items: List[str] = []
for r in rows:
visit_date = r.get("visit_date")
disp_date = visit_date.strftime("%B %d, %Y") if hasattr(visit_date, "strftime") else str(visit_date)
img = r.get("image_url") or ""
img_tag = (
"<img src='{src}' style='max-width:380px;border-radius:10px;box-shadow:0 2px 10px rgba(0,0,0,.08);' />"
.format(src=html.escape(img))
if img else ""
)
risk = str(r.get("risk_level", "Unknown"))
risk_chip = self._risk_chip(risk)
summary = html.escape(str(r.get("summary") or "No summary."))
items.append(
"<div style='display:grid;grid-template-columns:1fr 2fr;gap:16px;align-items:start;"
"background:#fff;border:1px solid #e9ecef;border-radius:12px;padding:16px;'>"
"<div style='text-align:center;'>"
f"<div style='font-weight:700;color:#0f172a;'>{html.escape(disp_date)}</div>"
f"<div style='margin-top:8px;'>{img_tag}</div>"
"</div>"
"<div>"
f"<div style='margin-bottom:10px;'>{risk_chip}</div>"
f"<div style='color:#334155;line-height:1.6;'>{summary}</div>"
"</div>"
"</div>"
)
body = "<div style='display:flex;flex-direction:column;gap:14px;'>" + "\n".join(items) + "</div>"
container = (
"<div style='max-width:1200px;margin:0 auto;'>"
f"{header}"
f"{body}"
"</div>"
)
return container
def format_patient_data_for_display(self, rows: List[Dict]) -> str:
"""Renderer for a single patient's history (reuses card format)."""
return self.format_history_for_display(rows)
# ===================== REPORT GENERATOR =====================
class ReportGenerator:
"""Professional HTML report generator for wound analysis."""
def __init__(self):
pass
def _format_recommendations(self, recommendations) -> str:
"""
Accepts str OR list OR dict and returns an HTML <ul>.
"""
# If already a list
if isinstance(recommendations, list):
items = [str(x).strip() for x in recommendations if str(x).strip()]
# If dict, join "key: value"
elif isinstance(recommendations, dict):
items = [f"{k}: {v}" for k, v in recommendations.items() if str(v).strip()]
else:
# str → split on common delimiters
rec = str(recommendations or "").strip()
if not rec:
return "<p>No specific recommendations available.</p>"
for delim in ["\n", ". ", "; "]:
if delim in rec:
items = [x.strip() for x in rec.split(delim) if x.strip()]
break
else:
items = [rec]
if not items:
return "<p>No specific recommendations available.</p>"
lis = "".join(f"<li>{html.escape(i)}</li>" for i in items if len(i) > 2)
return f"<ul>{lis}</ul>"
def generate_analysis_report(
self, patient_data: Dict[str, Any], analysis_data: Dict[str, Any], image_url: Optional[str] = None
) -> str:
"""
Generate comprehensive printable HTML report for a single analysis.
(No nested f-strings)
"""
risk_level = (analysis_data or {}).get("risk_level", "Unknown")
risk_class = f"risk-{str(risk_level).lower().replace(' ', '-')}"
summary = (analysis_data or {}).get("summary", "No analysis summary available.")
recs = self._format_recommendations((analysis_data or {}).get("recommendations", ""))
# Build optional image section first to avoid nested f-strings
if image_url:
image_section = (
"<div class=\"section\">"
"<h2>Wound Image</h2>"
"<div class=\"image-wrap\">"
f"<img src=\"{html.escape(image_url)}\" alt=\"Wound Image\" class=\"wound-image\" />"
"</div></div>"
)
else:
image_section = ""
# Note: this is an f-string. All literal braces are doubled.
return f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SmartHeal AI - Wound Analysis Report</title>
<style>
body {{
font-family:'Segoe UI',Tahoma,Geneva,Verdana,sans-serif; line-height:1.6; margin:0; padding:20px; background-color:#f8f9fa;
}}
.report-container {{
max-width:920px; margin:0 auto; background:#fff; border-radius:10px; box-shadow:0 4px 20px rgba(0,0,0,0.1); overflow:hidden;
}}
.header {{
background:linear-gradient(135deg,#3182ce 0%,#2c5aa0 100%); color:#fff; padding:30px; text-align:center;
}}
.header h1 {{ margin:0; font-size:28px; font-weight:600; }}
.header p {{ margin:10px 0 0 0; opacity:.9; font-size:16px; }}
.content {{ padding:30px; }}
.section {{ margin-bottom:30px; border-left:4px solid #3182ce; padding-left:20px; }}
.section h2 {{ color:#2c5aa0; margin-top:0; font-size:20px; font-weight:600; }}
.grid2 {{ display:grid; grid-template-columns:1fr 1fr; gap:20px; }}
.card {{ background:#f8f9fa; padding:15px; border-radius:8px; border:1px solid #e9ecef; }}
.card h3 {{ margin:0 0 10px 0; color:#495057; font-size:13px; font-weight:700; text-transform:uppercase; }}
.card p {{ margin:0; font-weight:500; color:#212529; }}
.risk-indicator {{ display:inline-block; padding:8px 16px; border-radius:20px; font-weight:600; text-transform:uppercase; font-size:12px; letter-spacing:.5px; }}
.risk-low {{ background:#d4edda; color:#155724; }}
.risk-moderate {{ background:#fff3cd; color:#856404; }}
.risk-high {{ background:#f8d7da; color:#721c24; }}
.recommendations {{ background:#e7f3ff; border:1px solid #b3d9ff; border-radius:8px; padding:20px; margin-top:15px; }}
.image-wrap {{ text-align:center; margin:20px 0; }}
.wound-image {{ max-width:100%; height:auto; border-radius:8px; box-shadow:0 4px 12px rgba(0,0,0,.15); }}
.footer {{ background:#f8f9fa; padding:20px 30px; text-align:center; color:#6c757d; border-top:1px solid #e9ecef; }}
@media print {{ body {{ background:white; }} .report-container {{ box-shadow:none; }} }}
</style>
</head>
<body>
<div class="report-container">
<div class="header">
<h1>🩺 SmartHeal AI Wound Analysis Report</h1>
<p>Advanced AI-Powered Clinical Assessment</p>
</div>
<div class="content">
<div class="section">
<h2>Patient Information</h2>
<div class="grid2">
<div class="card"><h3>Patient Name</h3><p>{html.escape(str(patient_data.get('patient_name','N/A')))}</p></div>
<div class="card"><h3>Age</h3><p>{html.escape(str(patient_data.get('patient_age','N/A')))} years</p></div>
<div class="card"><h3>Gender</h3><p>{html.escape(str(patient_data.get('patient_gender','N/A')))}</p></div>
<div class="card"><h3>Assessment Date</h3><p>{datetime.now().strftime('%B %d, %Y at %I:%M %p')}</p></div>
</div>
</div>
<div class="section">
<h2>Wound Assessment</h2>
<div class="grid2">
<div class="card"><h3>Location</h3><p>{html.escape(str(patient_data.get('wound_location','N/A')))}</p></div>
<div class="card"><h3>Duration</h3><p>{html.escape(str(patient_data.get('wound_duration','N/A')))}</p></div>
<div class="card"><h3>Pain Level</h3><p>{html.escape(str(patient_data.get('pain_level','N/A')))} / 10</p></div>
<div class="card"><h3>Risk Assessment</h3>
<p><span class="risk-indicator risk-{html.escape(risk_class)}">{html.escape(str(risk_level))} Risk</span></p>
</div>
</div>
</div>
{image_section}
<div class="section">
<h2>AI Analysis Summary</h2>
<p>{html.escape(str(summary))}</p>
<div class="recommendations">
<h3>🎯 Clinical Recommendations</h3>
{recs}
</div>
</div>
<div class="section">
<h2>Medical History</h2>
<div class="grid2">
<div class="card"><h3>Medical History</h3><p>{html.escape(str(patient_data.get('medical_history','None reported')))}</p></div>
<div class="card"><h3>Current Medications</h3><p>{html.escape(str(patient_data.get('medications','None reported')))}</p></div>
<div class="card"><h3>Known Allergies</h3><p>{html.escape(str(patient_data.get('allergies','None reported')))}</p></div>
<div class="card"><h3>Additional Notes</h3><p>{html.escape(str(patient_data.get('additional_notes','None')))}</p></div>
</div>
</div>
</div>
<div class="footer">
<p><strong>SmartHeal AI</strong> — Advanced Wound Care Analysis & Clinical Support System</p>
<p>Generated on {datetime.now().strftime('%B %d, %Y at %I:%M %p')} | For professional medical use only</p>
<p>⚠️ This AI analysis is for clinical support only. Always consult qualified professionals.</p>
</div>
</div>
</body>
</html>
"""
def generate_patient_history_report(self, patient_history: List[Dict]) -> str:
"""
Large, printable HTML summarizing multiple visits for a patient.
(Fixed: no nested f-strings)
"""
if not patient_history:
return "<p>No patient history available.</p>"
pname = html.escape(str(patient_history[0].get('patient_name', 'Unknown Patient')))
rows_html: List[str] = []
for i, visit in enumerate(patient_history):
dt = visit.get('visit_date')
if hasattr(dt, "strftime"):
dt_str = dt.strftime('%B %d, %Y')
else:
dt_str = str(dt)
wound_loc = html.escape(str(visit.get('wound_location', 'N/A')))
pain = html.escape(str(visit.get('pain_level', 'N/A')))
risk = html.escape(str(visit.get('risk_level', 'Unknown')))
summary = visit.get('summary')
# Build optional summary block separately to avoid nested f-string
if summary:
summary_block = (
"<p style='margin:0.5rem 0 0 0'><strong>Summary:</strong> "
f"{html.escape(str(summary))}"
"</p>"
)
else:
summary_block = ""
rows_html.append(
"<div style='padding:20px;border-bottom:1px solid #f0f0f0;'>"
"<div style='display:flex;justify-content:space-between;align-items:center;margin-bottom:8px;'>"
f"<h3 style='margin:0;color:#2c5aa0;'>Visit #{len(patient_history)-i}</h3>"
f"<span style='color:#6c757d;font-size:14px;'>{html.escape(dt_str)}</span>"
"</div>"
"<div style='display:grid;grid-template-columns:1fr 1fr 1fr;gap:12px;margin:10px 0;'>"
f"<div style='background:#f8f9fa;padding:10px;border-radius:6px;'><strong>Location:</strong> {wound_loc}</div>"
f"<div style='background:#f8f9fa;padding:10px;border-radius:6px;'><strong>Pain:</strong> {pain}/10</div>"
f"<div style='background:#f8f9fa;padding:10px;border-radius:6px;'><strong>Risk:</strong> {risk}</div>"
"</div>"
f"{summary_block}"
"</div>"
)
return (
"<div style=\"max-width:920px;margin:0 auto;font-family:'Segoe UI', sans-serif;\">"
"<div style=\"background:linear-gradient(135deg,#3182ce 0%,#2c5aa0 100%);color:white;"
"padding:20px;border-radius:10px 10px 0 0;\">"
f"<h2 style=\"margin:0;\">📋 Patient History: {pname}</h2>"
"<p style=\"margin:8px 0 0 0;opacity:0.9;\">Complete Treatment Timeline</p>"
"</div>"
"<div style=\"background:white;border:1px solid #e9ecef;border-top:none;border-radius:0 0 10px 10px;\">"
+ "".join(rows_html) +
"</div>"
"</div>"
)