Spaces:
Sleeping
Sleeping
| # smartheal_ai_processor.py | |
| # Verbose, instrumented version — preserves public class/function names | |
| # Turn on deep logging: export LOGLEVEL=DEBUG SMARTHEAL_DEBUG=1 | |
| import os | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, List, Tuple | |
| # ---- Environment defaults (do NOT globally hint CUDA here) ---- | |
| os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") | |
| LOGLEVEL = os.getenv("LOGLEVEL", "INFO").upper() | |
| SMARTHEAL_DEBUG = os.getenv("SMARTHEAL_DEBUG", "0") == "1" | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| from PIL.ExifTags import TAGS | |
| import os, logging | |
| from huggingface_hub import HfFolder | |
| # Read from env (prefer standard uppercase) | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("hf_token") | |
| if HF_TOKEN: | |
| # Persist for the ubuntu user so HF/Transformers can reuse it | |
| HfFolder.save_token(HF_TOKEN) | |
| # Also keep it in-process for libraries that accept a token kwarg | |
| os.environ["HF_TOKEN"] = HF_TOKEN | |
| logging.info("✅ Hugging Face token configured without interactive login.") | |
| else: | |
| logging.warning("⚠️ HF_TOKEN not set. Set it in /etc/default/smartheal for private/gated models.") | |
| # --- Logging config --- | |
| logging.basicConfig( | |
| level=getattr(logging, LOGLEVEL, logging.INFO), | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| def _log_kv(prefix: str, kv: Dict): | |
| logging.debug(prefix + " | " + " | ".join(f"{k}={v}" for k, v in kv.items())) | |
| # --- Spaces GPU decorator (REQUIRED) --- | |
| from spaces import GPU as _SPACES_GPU | |
| def smartheal_gpu_stub(ping: int = 0) -> str: | |
| return "ready" | |
| # ---- Paths / constants ---- | |
| UPLOADS_DIR = "uploads" | |
| os.makedirs(UPLOADS_DIR, exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN", None) | |
| YOLO_MODEL_PATH = "src/best.pt" | |
| SEG_MODEL_PATH = "src/segmentation_model.h5" # optional; legacy .h5 supported | |
| GUIDELINE_PDFS = ["src/eHealth in Wound Care.pdf", "src/IWGDF Guideline.pdf", "src/evaluation.pdf"] | |
| DATASET_ID = "SmartHeal/wound-image-uploads" | |
| DEFAULT_PX_PER_CM = 38.0 | |
| PX_PER_CM_MIN, PX_PER_CM_MAX = 5.0, 1200.0 | |
| # Segmentation preprocessing knobs | |
| SEG_EXPECTS_RGB = os.getenv("SEG_EXPECTS_RGB", "1") == "1" # most TF models trained on RGB | |
| SEG_NORM = os.getenv("SEG_NORM", "0to1") # "0to1" | "imagenet" | |
| SEG_THRESH = float(os.getenv("SEG_THRESH", "0.5")) | |
| models_cache: Dict[str, object] = {} | |
| knowledge_base_cache: Dict[str, object] = {} | |
| # ---------- Utilities to prevent CUDA in main process ---------- | |
| from contextlib import contextmanager | |
| def _no_cuda_env(): | |
| """ | |
| Mask GPUs so any library imported/constructed in the main process | |
| cannot see CUDA (required for Spaces Stateless GPU). | |
| """ | |
| prev = os.environ.get("CUDA_VISIBLE_DEVICES") | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "-1" | |
| try: | |
| yield | |
| finally: | |
| if prev is None: | |
| os.environ.pop("CUDA_VISIBLE_DEVICES", None) | |
| else: | |
| os.environ["CUDA_VISIBLE_DEVICES"] = prev | |
| # ---------- Lazy imports (wrapped where needed) ---------- | |
| def _import_ultralytics(): | |
| # Prevent Ultralytics from probing CUDA on import | |
| with _no_cuda_env(): | |
| from ultralytics import YOLO | |
| return YOLO | |
| def _import_tf_loader(): | |
| import tensorflow as tf | |
| tf.config.set_visible_devices([], "GPU") | |
| from tensorflow.keras.models import load_model | |
| return load_model | |
| def _import_hf_cls(): | |
| from transformers import pipeline | |
| return pipeline | |
| def _import_embeddings(): | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| return HuggingFaceEmbeddings | |
| def _import_langchain_pdf(): | |
| from langchain_community.document_loaders import PyPDFLoader | |
| return PyPDFLoader | |
| def _import_langchain_faiss(): | |
| from langchain_community.vectorstores import FAISS | |
| return FAISS | |
| def _import_hf_hub(): | |
| from huggingface_hub import HfApi, HfFolder | |
| return HfApi, HfFolder | |
| # ---------- SmartHeal prompts (system + user prefix) ---------- | |
| # ---------- SmartHeal prompts (system + user prefix) ---------- | |
| SMARTHEAL_SYSTEM_PROMPT = """\ | |
| You are SmartHeal Clinical Assistant, a wound-care decision-support system. | |
| You analyze wound photographs and brief patient context to produce careful, | |
| specific, guideline-informed recommendations WITHOUT diagnosing. | |
| Output requirements (strict): | |
| - Treat the vision pipeline measurements as ground truth; restate them once. | |
| - Write in concise, clinical bullets with clear, actionable steps (no filler). | |
| - Use EXACT section headings and order: Analysis; Medication and Treatment; Disclaimer. | |
| - Provide a single primary plan plus sensible alternatives when appropriate (e.g., by exudate level). | |
| - For dressings: name the category (e.g., foam/alginate/hydrogel/silver/iodine/PHMB/honey), typical wear time, | |
| change frequency, and what to switch to if too wet/dry or if maceration appears. | |
| - For offloading/compression/NPWT: state the indication criteria and practical device choice. | |
| - For medications: suggest evidence-based options (generic names), with typical adult dose ranges, route, and duration; | |
| include key contraindications/interactions and mark as “for clinician review”. | |
| - Include a follow-up cadence (in days) and explicit switch/stop rules and escalation triggers. | |
| - If information is missing, state assumptions briefly and proceed with a best-practice plan. | |
| - Tone: professional, precise, conservative. Avoid definitive diagnoses or promises of cure. | |
| - Length target: 220–350 words total. No preamble or closing beyond the specified sections. | |
| """ | |
| SMARTHEAL_USER_PREFIX = """\ | |
| Patient: {patient_info} | |
| Visual findings: type={wound_type}, size={length_cm}x{breadth_cm} cm, area={area_cm2} cm^2, | |
| detection_conf={det_conf:.2f}, calibration={px_per_cm} px/cm. | |
| Guideline context (principles you may draw from—summarize, don’t quote verbatim): | |
| {guideline_context} | |
| Write a structured, actionable answer with these headings EXACTLY and nothing else: | |
| Analysis | |
| - Restate the measured size/area once and interpret exudate burden, likely bioburden risk, and peri-wound skin status. | |
| - Note key risks tied to the wound type (e.g., DFU → pressure/neuropathy/ischemia), and any uncertainties or data gaps | |
| (e.g., PAD status, glycemic control, duration). Be specific. | |
| Medication and Treatment | |
| - Cleansing/irrigation: solution, volume, and frequency. | |
| - Debridement: if/when indicated; method options (conservative sharp, autolytic, enzymatic) and when to avoid. | |
| - Dressing strategy: pick ONE primary dressing category based on the current exudate level; include change frequency, | |
| expected wear time, and a backup option if too wet/dry or if maceration/odor occurs. | |
| - Adjuncts: offloading (preferred device and when to use TCC vs removable walker), compression (only if appropriate; note ABI threshold), | |
| barrier films/silicone contact layers, and criteria for NPWT (size, depth, exudate, surgical wounds). | |
| - Medications (for clinician review): generic names with typical adult dose ranges, route, and duration: | |
| * Analgesia (acetaminophen/NSAID with max daily dose cautions). | |
| * Antimicrobials: topical options for localized critical colonization; systemic options ONLY if clinical infection criteria met. | |
| Include top interactions/contraindications and monitoring (renal/hepatic disease, anticoagulation, pregnancy, allergy). | |
| - Follow-up cadence (explicit days) and objective response criteria (area ↓, exudate ↓, pain ↓, granulation ↑). | |
| - Clear switch/stop rules for dressings and antimicrobials based on response or intolerance. | |
| Disclaimer | |
| - This is decision support, not a diagnosis or prescription. All medications/interventions require clinician review. | |
| - Advise urgent evaluation for red flags (spreading erythema, fever, rapidly worsening pain, necrosis, malodor, suspected ischemia), | |
| and tailor to local guidelines/formulary and patient comorbidities. | |
| """ | |
| # ---------- MedGemma-only text generator ---------- | |
| def vlm_generate(prompt, image_pil, model_id="unsloth/medgemma-4b-it-bnb-4bit", | |
| max_new_tokens=1024, token=None): | |
| """ | |
| Simple helper: messages-style image+text → text using a 4-bit MedGemma pipeline. | |
| - No explicit `device` argument (pipeline will auto-detect). | |
| - Uses HF token from arg or HF_TOKEN env. | |
| """ | |
| import os, torch | |
| from transformers import pipeline, BitsAndBytesConfig | |
| # Unmask GPU if it was masked upstream (harmless on CPU too) | |
| os.environ.pop("CUDA_VISIBLE_DEVICES", None) | |
| hf_token = token or os.getenv("HF_TOKEN") | |
| dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32 | |
| # 4-bit quantization config (required by the Unsloth 4-bit model) | |
| bnb = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_compute_dtype=dtype, | |
| ) | |
| pipe = pipeline( | |
| "image-text-to-text", | |
| model=model_id, | |
| model_kwargs={"quantization_config": bnb}, | |
| torch_dtype=dtype, | |
| token=hf_token, | |
| trust_remote_code=True, | |
| ) | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image_pil}, | |
| {"type": "text", "text": prompt}, | |
| ], | |
| }] | |
| out = pipe( | |
| text=messages, | |
| max_new_tokens=int(max_new_tokens), | |
| do_sample=False, | |
| temperature=0.2, | |
| return_full_text=False, | |
| ) | |
| if isinstance(out, list) and out and isinstance(out[0], dict) and "generated_text" in out[0]: | |
| return (out[0]["generated_text"] or "").strip() | |
| return (str(out) or "").strip() or "⚠️ Empty response" | |
| def generate_medgemma_report( | |
| patient_info: str, | |
| visual_results: dict, | |
| guideline_context: str, | |
| image_pil, # PIL.Image | |
| max_new_tokens: int | None = None, | |
| ) -> str: | |
| """ | |
| Build SmartHeal prompt and generate with the Unsloth MedGemma 4-bit VLM. | |
| No fallback to any other model. | |
| """ | |
| import os | |
| if os.getenv("SMARTHEAL_ENABLE_VLM", "1") != "1": | |
| return "⚠️ VLM disabled" | |
| uprompt = SMARTHEAL_USER_PREFIX.format( | |
| patient_info=patient_info, | |
| wound_type=visual_results.get("wound_type", "Unknown"), | |
| length_cm=visual_results.get("length_cm", 0), | |
| breadth_cm=visual_results.get("breadth_cm", 0), | |
| area_cm2=visual_results.get("surface_area_cm2", 0), | |
| det_conf=float(visual_results.get("detection_confidence", 0.0)), | |
| px_per_cm=visual_results.get("px_per_cm", "?"), | |
| guideline_context=(guideline_context or "")[:900], | |
| ) | |
| prompt = f"{SMARTHEAL_SYSTEM_PROMPT}\n\n{uprompt}\n\nAnswer:" | |
| model_id = os.getenv("SMARTHEAL_MEDGEMMA_MODEL", "unsloth/medgemma-4b-it-bnb-4bit") | |
| max_new_tokens = max_new_tokens or int(os.getenv("SMARTHEAL_VLM_MAX_TOKENS", "600")) | |
| # Uses the simple messages-based VLM helper you added earlier (no device param). | |
| return vlm_generate( | |
| prompt=prompt, | |
| image_pil=image_pil, | |
| model_id=model_id, | |
| max_new_tokens=max_new_tokens, | |
| token=os.getenv("HF_TOKEN"), | |
| ) | |
| # ---------- Input-shape helpers (avoid `.as_list()` on strings) ---------- | |
| def _shape_to_hw(shape) -> Tuple[Optional[int], Optional[int]]: | |
| try: | |
| if hasattr(shape, "as_list"): | |
| shape = shape.as_list() | |
| except Exception: | |
| pass | |
| if isinstance(shape, (tuple, list)): | |
| if len(shape) == 4: # (None, H, W, C) | |
| H, W = shape[1], shape[2] | |
| elif len(shape) == 3: # (H, W, C) | |
| H, W = shape[0], shape[1] | |
| else: | |
| return (None, None) | |
| try: H = int(H) if (H is not None and str(H).lower() != "none") else None | |
| except Exception: H = None | |
| try: W = int(W) if (W is not None and str(W).lower() != "none") else None | |
| except Exception: W = None | |
| return (H, W) | |
| return (None, None) | |
| def _get_model_input_hw(model, default_hw: Tuple[int, int] = (224, 224)) -> Tuple[int, int]: | |
| H, W = _shape_to_hw(getattr(model, "input_shape", None)) | |
| if H and W: | |
| return H, W | |
| try: | |
| inputs = getattr(model, "inputs", None) | |
| if inputs: | |
| H, W = _shape_to_hw(inputs[0].shape) | |
| if H and W: | |
| return H, W | |
| except Exception: | |
| pass | |
| try: | |
| cfg = model.get_config() if hasattr(model, "get_config") else None | |
| if isinstance(cfg, dict): | |
| for layer in cfg.get("layers", []): | |
| conf = (layer or {}).get("config", {}) | |
| cand = conf.get("batch_input_shape") or conf.get("batch_shape") | |
| H, W = _shape_to_hw(cand) | |
| if H and W: | |
| return H, W | |
| except Exception: | |
| pass | |
| logging.warning(f"Could not resolve model input shape; using default {default_hw}.") | |
| return default_hw | |
| # ---------- Initialize CPU models ---------- | |
| def load_yolo_model(): | |
| YOLO = _import_ultralytics() | |
| with _no_cuda_env(): | |
| model = YOLO(YOLO_MODEL_PATH) | |
| return model | |
| def load_segmentation_model(): | |
| import os; os.environ.setdefault("KERAS_BACKEND","tensorflow") | |
| import tensorflow as tf; tf.config.set_visible_devices([], "GPU") | |
| import keras | |
| return keras.models.load_model("src/segmentation_model.keras", compile=False) | |
| def load_classification_pipeline(): | |
| pipe = _import_hf_cls() | |
| return pipe("image-classification", model="Hemg/Wound-classification", token=HF_TOKEN, device="cpu") | |
| def load_embedding_model(): | |
| Emb = _import_embeddings() | |
| return Emb(model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}) | |
| def initialize_cpu_models() -> None: | |
| if HF_TOKEN: | |
| try: | |
| HfApi, HfFolder = _import_hf_hub() | |
| HfFolder.save_token(HF_TOKEN) | |
| logging.info("✅ HF token set") | |
| except Exception as e: | |
| logging.warning(f"HF token save failed: {e}") | |
| if "det" not in models_cache: | |
| try: | |
| models_cache["det"] = load_yolo_model() | |
| logging.info("✅ YOLO loaded (CPU; CUDA masked in main)") | |
| except Exception as e: | |
| logging.error(f"YOLO load failed: {e}") | |
| if "seg" not in models_cache: | |
| try: | |
| if os.path.exists(SEG_MODEL_PATH): | |
| m = load_segmentation_model() # uses global path by default | |
| models_cache["seg"] = m | |
| th, tw = _get_model_input_hw(m, default_hw=(224, 224)) | |
| oshape = getattr(m, "output_shape", None) | |
| logging.info(f"✅ Segmentation model loaded (CPU) | input_hw=({th},{tw}) output_shape={oshape}") | |
| else: | |
| models_cache["seg"] = None | |
| logging.warning("Segmentation model file missing; skipping.") | |
| except Exception as e: | |
| models_cache["seg"] = None | |
| logging.warning(f"Segmentation unavailable: {e}") | |
| if "cls" not in models_cache: | |
| try: | |
| models_cache["cls"] = load_classification_pipeline() | |
| logging.info("✅ Classifier loaded (CPU)") | |
| except Exception as e: | |
| models_cache["cls"] = None | |
| logging.warning(f"Classifier unavailable: {e}") | |
| if "embedding_model" not in models_cache: | |
| try: | |
| models_cache["embedding_model"] = load_embedding_model() | |
| logging.info("✅ Embeddings loaded (CPU)") | |
| except Exception as e: | |
| models_cache["embedding_model"] = None | |
| logging.warning(f"Embeddings unavailable: {e}") | |
| def setup_knowledge_base() -> None: | |
| if "vector_store" in knowledge_base_cache: | |
| return | |
| docs: List = [] | |
| try: | |
| PyPDFLoader = _import_langchain_pdf() | |
| for pdf in GUIDELINE_PDFS: | |
| if os.path.exists(pdf): | |
| try: | |
| docs.extend(PyPDFLoader(pdf).load()) | |
| logging.info(f"Loaded PDF: {pdf}") | |
| except Exception as e: | |
| logging.warning(f"PDF load failed ({pdf}): {e}") | |
| except Exception as e: | |
| logging.warning(f"LangChain PDF loader unavailable: {e}") | |
| if docs and models_cache.get("embedding_model"): | |
| try: | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| FAISS = _import_langchain_faiss() | |
| chunks = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100).split_documents(docs) | |
| knowledge_base_cache["vector_store"] = FAISS.from_documents(chunks, models_cache["embedding_model"]) | |
| logging.info(f"✅ Knowledge base ready ({len(chunks)} chunks)") | |
| except Exception as e: | |
| knowledge_base_cache["vector_store"] = None | |
| logging.warning(f"KB build failed: {e}") | |
| else: | |
| knowledge_base_cache["vector_store"] = None | |
| logging.warning("KB disabled (no docs or embeddings).") | |
| initialize_cpu_models() | |
| setup_knowledge_base() | |
| # ---------- Calibration helpers ---------- | |
| def _exif_to_dict(pil_img: Image.Image) -> Dict[str, object]: | |
| out = {} | |
| try: | |
| exif = pil_img.getexif() | |
| if not exif: | |
| return out | |
| for k, v in exif.items(): | |
| tag = TAGS.get(k, k) | |
| out[tag] = v | |
| except Exception: | |
| pass | |
| return out | |
| def _to_float(val) -> Optional[float]: | |
| try: | |
| if val is None: | |
| return None | |
| if isinstance(val, tuple) and len(val) == 2: | |
| num, den = float(val[0]), float(val[1]) if float(val[1]) != 0 else 1.0 | |
| return num / den | |
| return float(val) | |
| except Exception: | |
| return None | |
| def _estimate_sensor_width_mm(f_mm: Optional[float], f35: Optional[float]) -> Optional[float]: | |
| if f_mm and f35 and f35 > 0: | |
| return 36.0 * f_mm / f35 | |
| return None | |
| def estimate_px_per_cm_from_exif(pil_img: Image.Image, default_px_per_cm: float = DEFAULT_PX_PER_CM) -> Tuple[float, Dict]: | |
| meta = {"used": "default", "f_mm": None, "f35": None, "sensor_w_mm": None, "distance_m": None} | |
| try: | |
| exif = _exif_to_dict(pil_img) | |
| f_mm = _to_float(exif.get("FocalLength")) | |
| f35 = _to_float(exif.get("FocalLengthIn35mmFilm") or exif.get("FocalLengthIn35mm")) | |
| subj_dist_m = _to_float(exif.get("SubjectDistance")) | |
| sensor_w_mm = _estimate_sensor_width_mm(f_mm, f35) | |
| meta.update({"f_mm": f_mm, "f35": f35, "sensor_w_mm": sensor_w_mm, "distance_m": subj_dist_m}) | |
| if f_mm and sensor_w_mm and subj_dist_m and subj_dist_m > 0: | |
| w_px = pil_img.width | |
| field_w_mm = sensor_w_mm * (subj_dist_m * 1000.0) / f_mm | |
| field_w_cm = field_w_mm / 10.0 | |
| px_per_cm = w_px / max(field_w_cm, 1e-6) | |
| px_per_cm = float(np.clip(px_per_cm, PX_PER_CM_MIN, PX_PER_CM_MAX)) | |
| meta["used"] = "exif" | |
| return px_per_cm, meta | |
| return float(default_px_per_cm), meta | |
| except Exception: | |
| return float(default_px_per_cm), meta | |
| # ---------- Segmentation helpers ---------- | |
| def _imagenet_norm(arr: np.ndarray) -> np.ndarray: | |
| mean = np.array([123.675, 116.28, 103.53], dtype=np.float32) | |
| std = np.array([58.395, 57.12, 57.375], dtype=np.float32) | |
| return (arr.astype(np.float32) - mean) / std | |
| def _preprocess_for_seg(bgr_roi: np.ndarray, target_hw: Tuple[int, int]) -> np.ndarray: | |
| H, W = target_hw | |
| resized = cv2.resize(bgr_roi, (W, H), interpolation=cv2.INTER_LINEAR) | |
| if SEG_EXPECTS_RGB: | |
| resized = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) | |
| if SEG_NORM.lower() == "imagenet": | |
| x = _imagenet_norm(resized) | |
| else: | |
| x = resized.astype(np.float32) / 255.0 | |
| x = np.expand_dims(x, axis=0) # (1,H,W,3) | |
| return x | |
| def _to_prob(pred: np.ndarray) -> np.ndarray: | |
| p = np.squeeze(pred) | |
| pmin, pmax = float(p.min()), float(p.max()) | |
| if pmax > 1.0 or pmin < 0.0: | |
| p = 1.0 / (1.0 + np.exp(-p)) | |
| return p.astype(np.float32) | |
| # ---- Adaptive threshold + GrabCut grow ---- | |
| def _adaptive_prob_threshold(p: np.ndarray) -> float: | |
| """ | |
| Choose a threshold that avoids tiny blobs while not swallowing skin. | |
| Try Otsu and the 90th percentile, clamp to [0.25, 0.65], pick by area heuristic. | |
| """ | |
| p01 = np.clip(p.astype(np.float32), 0, 1) | |
| p255 = (p01 * 255).astype(np.uint8) | |
| ret_otsu, _ = cv2.threshold(p255, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| thr_otsu = float(np.clip(ret_otsu / 255.0, 0.25, 0.65)) | |
| thr_pctl = float(np.clip(np.percentile(p01, 90), 0.25, 0.65)) | |
| def area_frac(thr: float) -> float: | |
| return float((p01 >= thr).sum()) / float(p01.size) | |
| af_otsu = area_frac(thr_otsu) | |
| af_pctl = area_frac(thr_pctl) | |
| def score(af: float) -> float: | |
| target_low, target_high = 0.03, 0.10 | |
| if af < target_low: return abs(af - target_low) * 3.0 | |
| if af > target_high: return abs(af - target_high) * 1.5 | |
| return 0.0 | |
| return thr_otsu if score(af_otsu) <= score(af_pctl) else thr_pctl | |
| def _grabcut_refine(bgr: np.ndarray, seed01: np.ndarray, iters: int = 3) -> np.ndarray: | |
| """Grow from a confident core into low-contrast margins.""" | |
| h, w = bgr.shape[:2] | |
| gc = np.full((h, w), cv2.GC_PR_BGD, np.uint8) | |
| k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| seed_dil = cv2.dilate(seed01, k, iterations=1) | |
| gc[seed01.astype(bool)] = cv2.GC_PR_FGD | |
| gc[seed_dil.astype(bool)] = cv2.GC_FGD | |
| gc[0, :], gc[-1, :], gc[:, 0], gc[:, 1] = cv2.GC_BGD, cv2.GC_BGD, cv2.GC_BGD, cv2.GC_BGD | |
| bgdModel = np.zeros((1, 65), np.float64) | |
| fgdModel = np.zeros((1, 65), np.float64) | |
| cv2.grabCut(bgr, gc, None, bgdModel, fgdModel, iters, cv2.GC_INIT_WITH_MASK) | |
| return np.where((gc == cv2.GC_FGD) | (gc == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) | |
| def _fill_holes(mask01: np.ndarray) -> np.ndarray: | |
| h, w = mask01.shape[:2] | |
| ff = np.zeros((h + 2, w + 2), np.uint8) | |
| m = (mask01 * 255).astype(np.uint8).copy() | |
| cv2.floodFill(m, ff, (0, 0), 255) | |
| m_inv = cv2.bitwise_not(m) | |
| out = ((mask01 * 255) | m_inv) // 255 | |
| return out.astype(np.uint8) | |
| def _clean_mask(mask01: np.ndarray) -> np.ndarray: | |
| """Open → Close → Fill holes → Largest component (no dilation).""" | |
| mask01 = (mask01 > 0).astype(np.uint8) | |
| k3 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) | |
| k5 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| mask01 = cv2.morphologyEx(mask01, cv2.MORPH_OPEN, k3, iterations=1) | |
| mask01 = cv2.morphologyEx(mask01, cv2.MORPH_CLOSE, k5, iterations=1) | |
| mask01 = _fill_holes(mask01) | |
| # Keep largest component only | |
| num, labels, stats, _ = cv2.connectedComponentsWithStats(mask01, 8) | |
| if num > 1: | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| if areas.size: | |
| largest_idx = 1 + int(np.argmax(areas)) | |
| mask01 = (labels == largest_idx).astype(np.uint8) | |
| return (mask01 > 0).astype(np.uint8) | |
| # Global last debug dict (per-process) | |
| _last_seg_debug: Dict[str, object] = {} | |
| def segment_wound(image_bgr: np.ndarray, ts: str, out_dir: str) -> Tuple[np.ndarray, Dict[str, object]]: | |
| """ | |
| TF model → adaptive threshold on prob → GrabCut grow → cleanup. | |
| Fallback: KMeans-Lab. | |
| Returns (mask_uint8_0_255, debug_dict) | |
| """ | |
| debug = {"used": None, "reason": None, "positive_fraction": 0.0, | |
| "thr": None, "heatmap_path": None, "roi_seen_by_model": None} | |
| seg_model = models_cache.get("seg", None) | |
| # --- Model path --- | |
| if seg_model is not None: | |
| try: | |
| th, tw = _get_model_input_hw(seg_model, default_hw=(224, 224)) | |
| x = _preprocess_for_seg(image_bgr, (th, tw)) | |
| roi_seen_path = None | |
| if SMARTHEAL_DEBUG: | |
| roi_seen_path = os.path.join(out_dir, f"roi_for_seg_{ts}.png") | |
| cv2.imwrite(roi_seen_path, image_bgr) | |
| pred = seg_model.predict(x, verbose=0) | |
| if isinstance(pred, (list, tuple)): pred = pred[0] | |
| p = _to_prob(pred) | |
| p = cv2.resize(p, (image_bgr.shape[1], image_bgr.shape[0]), interpolation=cv2.INTER_LINEAR) | |
| heatmap_path = None | |
| if SMARTHEAL_DEBUG: | |
| hm = (np.clip(p, 0, 1) * 255).astype(np.uint8) | |
| heat = cv2.applyColorMap(hm, cv2.COLORMAP_JET) | |
| heatmap_path = os.path.join(out_dir, f"seg_pred_heatmap_{ts}.png") | |
| cv2.imwrite(heatmap_path, heat) | |
| thr = _adaptive_prob_threshold(p) | |
| core01 = (p >= thr).astype(np.uint8) | |
| core_frac = float(core01.sum()) / float(core01.size) | |
| if core_frac < 0.005: | |
| thr2 = max(thr - 0.10, 0.15) | |
| core01 = (p >= thr2).astype(np.uint8) | |
| thr = thr2 | |
| core_frac = float(core01.sum()) / float(core01.size) | |
| if core01.any(): | |
| gc01 = _grabcut_refine(image_bgr, core01, iters=3) | |
| mask01 = _clean_mask(gc01) | |
| else: | |
| mask01 = np.zeros(core01.shape, np.uint8) | |
| pos_frac = float(mask01.sum()) / float(mask01.size) | |
| logging.info(f"SegModel USED | thr={float(thr):.2f} core_frac={core_frac:.4f} final_frac={pos_frac:.4f}") | |
| debug.update({ | |
| "used": "tf_model", | |
| "reason": "ok", | |
| "positive_fraction": pos_frac, | |
| "thr": float(thr), | |
| "heatmap_path": heatmap_path, | |
| "roi_seen_by_model": roi_seen_path | |
| }) | |
| return (mask01 * 255).astype(np.uint8), debug | |
| except Exception as e: | |
| logging.warning(f"⚠️ Segmentation model failed → fallback. Reason: {e}") | |
| debug.update({"used": "fallback_kmeans", "reason": f"model_failed: {e}"}) | |
| # --- Fallback: KMeans in Lab (reddest cluster as wound) --- | |
| Z = image_bgr.reshape((-1, 3)).astype(np.float32) | |
| criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0) | |
| _, labels, centers = cv2.kmeans(Z, 2, None, criteria, 5, cv2.KMEANS_PP_CENTERS) | |
| centers_u8 = centers.astype(np.uint8).reshape(1, 2, 3) | |
| centers_lab = cv2.cvtColor(centers_u8, cv2.COLOR_BGR2LAB)[0] | |
| wound_idx = int(np.argmax(centers_lab[:, 1])) # maximize a* (red) | |
| mask01 = (labels.reshape(image_bgr.shape[:2]) == wound_idx).astype(np.uint8) | |
| mask01 = _clean_mask(mask01) | |
| pos_frac = float(mask01.sum()) / float(mask01.size) | |
| logging.info(f"KMeans USED | final_frac={pos_frac:.4f}") | |
| debug.update({ | |
| "used": "fallback_kmeans", | |
| "reason": debug.get("reason") or "no_model", | |
| "positive_fraction": pos_frac, | |
| "thr": None | |
| }) | |
| return (mask01 * 255).astype(np.uint8), debug | |
| # ---------- Measurement + overlay helpers ---------- | |
| def largest_component_mask(binary01: np.ndarray, min_area_px: int = 50) -> np.ndarray: | |
| num, labels, stats, _ = cv2.connectedComponentsWithStats(binary01.astype(np.uint8), connectivity=8) | |
| if num <= 1: | |
| return binary01.astype(np.uint8) | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| if areas.size == 0 or areas.max() < min_area_px: | |
| return binary01.astype(np.uint8) | |
| largest_idx = 1 + int(np.argmax(areas)) | |
| return (labels == largest_idx).astype(np.uint8) | |
| def measure_min_area_rect(mask01: np.ndarray, px_per_cm: float) -> Tuple[float, float, Tuple]: | |
| contours, _ = cv2.findContours(mask01.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return 0.0, 0.0, (None, None) | |
| cnt = max(contours, key=cv2.contourArea) | |
| rect = cv2.minAreaRect(cnt) | |
| (w_px, h_px) = rect[1] | |
| length_px, breadth_px = (max(w_px, h_px), min(h_px, w_px)) | |
| length_cm = round(length_px / max(px_per_cm, 1e-6), 2) | |
| breadth_cm = round(breadth_px / max(px_per_cm, 1e-6), 2) | |
| box = cv2.boxPoints(rect).astype(int) | |
| return length_cm, breadth_cm, (box, rect[0]) | |
| def area_cm2_from_contour(mask01: np.ndarray, px_per_cm: float) -> Tuple[float, Optional[np.ndarray]]: | |
| """Area from largest polygon (sub-pixel); returns (area_cm2, contour).""" | |
| m = (mask01 > 0).astype(np.uint8) | |
| contours, _ = cv2.findContours(m, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return 0.0, None | |
| cnt = max(contours, key=cv2.contourArea) | |
| poly_area_px2 = float(cv2.contourArea(cnt)) | |
| area_cm2 = round(poly_area_px2 / (max(px_per_cm, 1e-6) ** 2), 2) | |
| return area_cm2, cnt | |
| def clamp_area_with_minrect(cnt: np.ndarray, px_per_cm: float, area_cm2_poly: float) -> float: | |
| rect = cv2.minAreaRect(cnt) | |
| (w_px, h_px) = rect[1] | |
| rect_area_px2 = float(max(w_px, 0.0) * max(h_px, 0.0)) | |
| rect_area_cm2 = rect_area_px2 / (max(px_per_cm, 1e-6) ** 2) | |
| return round(min(area_cm2_poly, rect_area_cm2 * 1.05), 2) | |
| def draw_measurement_overlay( | |
| base_bgr: np.ndarray, | |
| mask01: np.ndarray, | |
| rect_box: np.ndarray, | |
| length_cm: float, | |
| breadth_cm: float, | |
| thickness: int = 2 | |
| ) -> np.ndarray: | |
| """ | |
| 1) Strong red mask overlay + white contour | |
| 2) Min-area rectangle | |
| 3) Double-headed arrows labeled Length/Width | |
| """ | |
| overlay = base_bgr.copy() | |
| # Mask tint | |
| mask255 = (mask01 * 255).astype(np.uint8) | |
| mask3 = cv2.merge([mask255, mask255, mask255]) | |
| red = np.zeros_like(overlay); red[:] = (0, 0, 255) | |
| alpha = 0.55 | |
| tinted = cv2.addWeighted(overlay, 1 - alpha, red, alpha, 0) | |
| overlay = np.where(mask3 > 0, tinted, overlay) | |
| # Contour | |
| cnts, _ = cv2.findContours(mask255, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if cnts: | |
| cv2.drawContours(overlay, cnts, -1, (255, 255, 255), 2) | |
| if rect_box is not None: | |
| cv2.polylines(overlay, [rect_box], True, (255, 255, 255), thickness) | |
| pts = rect_box.reshape(-1, 2) | |
| def midpoint(a, b): return (int((a[0] + b[0]) / 2), int((a[1] + b[1]) / 2)) | |
| e = [np.linalg.norm(pts[i] - pts[(i + 1) % 4]) for i in range(4)] | |
| long_edge_idx = int(np.argmax(e)) | |
| mids = [midpoint(pts[i], pts[(i + 1) % 4]) for i in range(4)] | |
| long_pair = (long_edge_idx, (long_edge_idx + 2) % 4) | |
| short_pair = ((long_edge_idx + 1) % 4, (long_edge_idx + 3) % 4) | |
| def draw_double_arrow(img, p1, p2): | |
| cv2.arrowedLine(img, p1, p2, (0, 0, 0), thickness + 2, tipLength=0.05) | |
| cv2.arrowedLine(img, p2, p1, (0, 0, 0), thickness + 2, tipLength=0.05) | |
| cv2.arrowedLine(img, p1, p2, (255, 255, 255), thickness, tipLength=0.05) | |
| cv2.arrowedLine(img, p2, p1, (255, 255, 255), thickness, tipLength=0.05) | |
| def put_label(text, anchor): | |
| org = (anchor[0] + 6, anchor[1] - 6) | |
| cv2.putText(overlay, text, org, cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 4, cv2.LINE_AA) | |
| cv2.putText(overlay, text, org, cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA) | |
| draw_double_arrow(overlay, mids[long_pair[0]], mids[long_pair[1]]) | |
| draw_double_arrow(overlay, mids[short_pair[0]], mids[short_pair[1]]) | |
| put_label(f"Length: {length_cm:.2f} cm", mids[long_pair[0]]) | |
| put_label(f"Width: {breadth_cm:.2f} cm", mids[short_pair[0]]) | |
| return overlay | |
| # ---------- AI PROCESSOR ---------- | |
| class AIProcessor: | |
| def __init__(self): | |
| self.models_cache = models_cache | |
| self.knowledge_base_cache = knowledge_base_cache | |
| self.uploads_dir = UPLOADS_DIR | |
| self.dataset_id = DATASET_ID | |
| self.hf_token = HF_TOKEN | |
| def _ensure_analysis_dir(self) -> str: | |
| out_dir = os.path.join(self.uploads_dir, "analysis") | |
| os.makedirs(out_dir, exist_ok=True) | |
| return out_dir | |
| def perform_visual_analysis(self, image_pil: Image.Image) -> Dict: | |
| """ | |
| YOLO detect → crop ROI → segment_wound(ROI) → clean mask → | |
| minAreaRect measurement (cm) using EXIF px/cm → save outputs. | |
| """ | |
| try: | |
| px_per_cm, exif_meta = estimate_px_per_cm_from_exif(image_pil, DEFAULT_PX_PER_CM) | |
| # Guardrails for calibration to avoid huge area blow-ups | |
| px_per_cm = float(np.clip(px_per_cm, 20.0, 350.0)) | |
| if (exif_meta or {}).get("used") != "exif": | |
| logging.warning(f"Calibration fallback used: px_per_cm={px_per_cm:.2f} (default). Prefer ruler/Aruco for accuracy.") | |
| image_cv = cv2.cvtColor(np.array(image_pil.convert("RGB")), cv2.COLOR_RGB2BGR) | |
| # --- Detection --- | |
| det_model = self.models_cache.get("det") | |
| if det_model is None: | |
| raise RuntimeError("YOLO model not loaded") | |
| # Force CPU inference and avoid CUDA touch | |
| results = det_model.predict(image_cv, verbose=False, device="cpu") | |
| if (not results) or (not getattr(results[0], "boxes", None)) or (len(results[0].boxes) == 0): | |
| try: | |
| import gradio as gr | |
| raise gr.Error("No wound could be detected.") | |
| except Exception: | |
| raise RuntimeError("No wound could be detected.") | |
| box = results[0].boxes[0].xyxy[0].cpu().numpy().astype(int) | |
| x1, y1, x2, y2 = [int(v) for v in box] | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(image_cv.shape[1], x2), min(image_cv.shape[0], y2) | |
| roi = image_cv[y1:y2, x1:x2].copy() | |
| if roi.size == 0: | |
| try: | |
| import gradio as gr | |
| raise gr.Error("Detected ROI is empty.") | |
| except Exception: | |
| raise RuntimeError("Detected ROI is empty.") | |
| out_dir = self._ensure_analysis_dir() | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # --- Segmentation (model-first + KMeans fallback) --- | |
| mask_u8_255, seg_debug = segment_wound(roi, ts, out_dir) | |
| mask01 = (mask_u8_255 > 127).astype(np.uint8) | |
| if mask01.any(): | |
| mask01 = _clean_mask(mask01) | |
| logging.debug(f"Mask postproc: px_after={int(mask01.sum())}") | |
| # --- Measurement (accurate & conservative) --- | |
| if mask01.any(): | |
| length_cm, breadth_cm, (box_pts, _) = measure_min_area_rect(mask01, px_per_cm) | |
| area_poly_cm2, largest_cnt = area_cm2_from_contour(mask01, px_per_cm) | |
| if largest_cnt is not None: | |
| surface_area_cm2 = clamp_area_with_minrect(largest_cnt, px_per_cm, area_poly_cm2) | |
| else: | |
| surface_area_cm2 = area_poly_cm2 | |
| anno_roi = draw_measurement_overlay(roi, mask01, box_pts, length_cm, breadth_cm) | |
| segmentation_empty = False | |
| else: | |
| # Fallback if seg failed: use ROI dimensions | |
| h_px = max(0, y2 - y1); w_px = max(0, x2 - x1) | |
| length_cm = round(max(h_px, w_px) / px_per_cm, 2) | |
| breadth_cm = round(min(h_px, w_px) / px_per_cm, 2) | |
| surface_area_cm2 = round((h_px * w_px) / (px_per_cm ** 2), 2) | |
| anno_roi = roi.copy() | |
| cv2.rectangle(anno_roi, (2, 2), (anno_roi.shape[1]-3, anno_roi.shape[0]-3), (0, 0, 255), 3) | |
| cv2.line(anno_roi, (0, 0), (anno_roi.shape[1]-1, anno_roi.shape[0]-1), (0, 0, 255), 2) | |
| cv2.line(anno_roi, (anno_roi.shape[1]-1, 0), (0, anno_roi.shape[0]-1), (0, 0, 255), 2) | |
| box_pts = None | |
| segmentation_empty = True | |
| # --- Skin tone estimation using ITA (Individual Typology Angle) --- | |
| try: | |
| # Convert ROI to LAB for skin tone measurement | |
| # cv2.cvtColor returns LAB with L in [0,255], a,b in [0,255]; we convert to L* and b* | |
| lab_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB) | |
| L_chan = lab_roi[:, :, 0] / 2.55 # scale to 0–100 | |
| b_chan = lab_roi[:, :, 2].astype(np.float32) - 128.0 | |
| # Define skin pixels as non-wound region or entire ROI if mask is empty | |
| if mask01.any(): | |
| skin_mask = (mask01 == 0) | |
| else: | |
| skin_mask = np.ones_like(mask01, dtype=bool) | |
| L_vals = L_chan[skin_mask] | |
| b_vals = b_chan[skin_mask] | |
| # Safeguard against empty arrays | |
| if L_vals.size == 0 or b_vals.size == 0: | |
| mean_L = float(np.mean(L_chan)) | |
| mean_b = float(np.mean(b_chan)) | |
| else: | |
| mean_L = float(np.mean(L_vals)) | |
| mean_b = float(np.mean(b_vals)) | |
| # Compute ITA in degrees; use arctan2 to handle mean_b=0 gracefully | |
| ita_deg = float(np.degrees(np.arctan2((mean_L - 50.0), mean_b))) if mean_b != 0 else 0.0 | |
| # Map ITA to Fitzpatrick skin tone categories (Del Bino ranges) | |
| if ita_deg > 55: | |
| skin_tone_label = "Type I (Very Light)" | |
| elif 41 < ita_deg <= 55: | |
| skin_tone_label = "Type II (Light)" | |
| elif 28 < ita_deg <= 41: | |
| skin_tone_label = "Type III (Intermediate)" | |
| elif 10 < ita_deg <= 28: | |
| skin_tone_label = "Type IV (Tan)" | |
| elif -30 < ita_deg <= 10: | |
| skin_tone_label = "Type V (Brown)" | |
| else: | |
| skin_tone_label = "Type VI (Dark)" | |
| except Exception as e: | |
| logging.warning(f"Skin tone estimation failed: {e}") | |
| ita_deg = 0.0 | |
| skin_tone_label = "Unknown" | |
| # --- Tissue classification (granulation, slough, necrotic) --- | |
| try: | |
| tissue_type = "Unknown" | |
| if mask01.any(): | |
| hsv_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| mask_bool = mask01.astype(bool) | |
| # Compute mean hue and value on wound region | |
| h_vals = hsv_roi[:, :, 0][mask_bool] | |
| s_vals = hsv_roi[:, :, 1][mask_bool] | |
| v_vals = hsv_roi[:, :, 2][mask_bool] | |
| # Safeguard against empty arrays | |
| if h_vals.size > 0 and v_vals.size > 0: | |
| mean_h = float(np.mean(h_vals)) | |
| mean_v = float(np.mean(v_vals)) | |
| # Necrotic (dark) if value is low | |
| if mean_v < 50: | |
| tissue_type = "Necrotic" | |
| # Slough (yellow) if hue between ~10 and 30 on OpenCV scale (0–179) | |
| elif 10 <= mean_h <= 30: | |
| tissue_type = "Slough" | |
| else: | |
| tissue_type = "Granulation" | |
| else: | |
| tissue_type = "Unknown" | |
| except Exception as e: | |
| logging.warning(f"Tissue classification failed: {e}") | |
| tissue_type = "Unknown" | |
| # --- Save visualizations --- | |
| original_path = os.path.join(out_dir, f"original_{ts}.png") | |
| cv2.imwrite(original_path, image_cv) | |
| det_vis = image_cv.copy() | |
| cv2.rectangle(det_vis, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| detection_path = os.path.join(out_dir, f"detection_{ts}.png") | |
| cv2.imwrite(detection_path, det_vis) | |
| roi_mask_path = os.path.join(out_dir, f"roi_mask_{ts}.png") | |
| cv2.imwrite(roi_mask_path, (mask01 * 255).astype(np.uint8)) | |
| # ROI overlay (mask tint + contour, without arrows) | |
| mask255 = (mask01 * 255).astype(np.uint8) | |
| mask3 = cv2.merge([mask255, mask255, mask255]) | |
| red = np.zeros_like(roi); red[:] = (0, 0, 255) | |
| alpha = 0.55 | |
| tinted = cv2.addWeighted(roi, 1 - alpha, red, alpha, 0) | |
| if mask255.any(): | |
| roi_overlay = np.where(mask3 > 0, tinted, roi) | |
| cnts, _ = cv2.findContours(mask255, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| cv2.drawContours(roi_overlay, cnts, -1, (255, 255, 255), 2) | |
| else: | |
| roi_overlay = anno_roi | |
| seg_full = image_cv.copy() | |
| seg_full[y1:y2, x1:x2] = roi_overlay | |
| segmentation_path = os.path.join(out_dir, f"segmentation_{ts}.png") | |
| cv2.imwrite(segmentation_path, seg_full) | |
| segmentation_roi_path = os.path.join(out_dir, f"segmentation_roi_{ts}.png") | |
| cv2.imwrite(segmentation_roi_path, roi_overlay) | |
| # Annotated (mask + arrows + labels) in full-frame | |
| anno_full = image_cv.copy() | |
| anno_full[y1:y2, x1:x2] = anno_roi | |
| annotated_seg_path = os.path.join(out_dir, f"segmentation_annotated_{ts}.png") | |
| cv2.imwrite(annotated_seg_path, anno_full) | |
| # --- Optional classification --- | |
| wound_type = "Unknown" | |
| cls_pipe = self.models_cache.get("cls") | |
| if cls_pipe is not None: | |
| try: | |
| preds = cls_pipe(Image.fromarray(cv2.cvtColor(roi, cv2.COLOR_BGR2RGB))) | |
| if preds: | |
| wound_type = max(preds, key=lambda x: x.get("score", 0)).get("label", "Unknown") | |
| except Exception as e: | |
| logging.warning(f"Classification failed: {e}") | |
| # Log end-of-seg summary | |
| seg_summary = { | |
| "seg_used": seg_debug.get("used"), | |
| "seg_reason": seg_debug.get("reason"), | |
| "positive_fraction": round(float(seg_debug.get("positive_fraction", 0.0)), 6), | |
| "threshold": seg_debug.get("thr"), | |
| "segmentation_empty": segmentation_empty, | |
| "exif_px_per_cm": round(px_per_cm, 3), | |
| } | |
| _log_kv("SEG_SUMMARY", seg_summary) | |
| return { | |
| "wound_type": wound_type, | |
| "length_cm": length_cm, | |
| "breadth_cm": breadth_cm, | |
| "surface_area_cm2": surface_area_cm2, | |
| "px_per_cm": round(px_per_cm, 2), | |
| "calibration_meta": exif_meta, | |
| "detection_confidence": float(results[0].boxes.conf[0].cpu().item()) | |
| if getattr(results[0].boxes, "conf", None) is not None else 0.0, | |
| "detection_image_path": detection_path, | |
| "segmentation_image_path": annotated_seg_path, | |
| "segmentation_annotated_path": annotated_seg_path, | |
| "segmentation_roi_path": segmentation_roi_path, | |
| "roi_mask_path": roi_mask_path, | |
| "segmentation_empty": segmentation_empty, | |
| "segmentation_debug": seg_debug, | |
| "original_image_path": original_path, | |
| # Additional AI insights | |
| "skin_tone_label": skin_tone_label, | |
| "ita_degrees": round(float(ita_deg), 2), | |
| "tissue_type": tissue_type, | |
| } | |
| except Exception as e: | |
| logging.error(f"Visual analysis failed: {e}", exc_info=True) | |
| raise | |
| # ------------------------------------------------------------------------- | |
| # Helper: refine measurements from a binary mask | |
| # ------------------------------------------------------------------------- | |
| def _refine_metrics_from_mask(self, mask: np.ndarray, px_per_cm: float) -> Tuple[float, float, float]: | |
| """ | |
| Given a binary mask and pixel‑per‑centimeter calibration, compute length, breadth and area. | |
| The mask should be a 2D numpy array of dtype uint8 or bool where 1 indicates wound pixels. | |
| Parameters | |
| ---------- | |
| mask : np.ndarray | |
| Binary mask of the wound region, shape (H, W). Non‑zero values denote wound pixels. | |
| px_per_cm : float | |
| Estimated pixels per centimeter calibration factor. | |
| Returns | |
| ------- | |
| tuple[float, float, float] | |
| (length_cm, breadth_cm, area_cm2) | |
| Notes | |
| ----- | |
| This method approximates the wound measurements by computing the axis‑aligned bounding box | |
| around all wound pixels and calculating the longer and shorter sides as length and width. | |
| The surface area is computed as the number of wound pixels divided by (px_per_cm**2). | |
| """ | |
| if mask is None or mask.size == 0 or not np.any(mask): | |
| return 0.0, 0.0, 0.0 | |
| # Ensure binary mask | |
| mask01 = (mask > 0).astype(np.uint8) | |
| # Find coordinates of wound pixels | |
| coords = np.argwhere(mask01) | |
| y_min, x_min = coords.min(axis=0) | |
| y_max, x_max = coords.max(axis=0) | |
| height_px = int(y_max - y_min + 1) | |
| width_px = int(x_max - x_min + 1) | |
| # Compute length as the larger dimension | |
| length_cm = round(max(height_px, width_px) / float(px_per_cm), 2) | |
| breadth_cm = round(min(height_px, width_px) / float(px_per_cm), 2) | |
| area_px = int(mask01.sum()) | |
| area_cm2 = round(area_px / (float(px_per_cm) ** 2), 2) | |
| return length_cm, breadth_cm, area_cm2 | |
| # ---------- Knowledge base + reporting ---------- | |
| def query_guidelines(self, query: str) -> str: | |
| try: | |
| vs = self.knowledge_base_cache.get("vector_store") | |
| if not vs: | |
| return "Knowledge base is not available." | |
| retriever = vs.as_retriever(search_kwargs={"k": 5}) | |
| docs = retriever.invoke(query) | |
| lines: List[str] = [] | |
| for d in docs: | |
| src = (d.metadata or {}).get("source", "N/A") | |
| txt = (d.page_content or "")[:300] | |
| lines.append(f"Source: {src}\nContent: {txt}...") | |
| return "\n\n".join(lines) if lines else "No relevant guideline snippets found." | |
| except Exception as e: | |
| logging.warning(f"Guidelines query failed: {e}") | |
| return f"Guidelines query failed: {str(e)}" | |
| def _generate_fallback_report(self, patient_info: str, visual_results: Dict, guideline_context: str) -> str: | |
| return f"""# 🩺 SmartHeal AI - Comprehensive Wound Analysis Report | |
| ## 📋 Patient Information | |
| {patient_info} | |
| ## 🔍 Visual Analysis Results | |
| - **Wound Type**: {visual_results.get('wound_type', 'Unknown')} | |
| - **Dimensions**: {visual_results.get('length_cm', 0)} cm × {visual_results.get('breadth_cm', 0)} cm | |
| - **Surface Area**: {visual_results.get('surface_area_cm2', 0)} cm² | |
| - **Detection Confidence**: {visual_results.get('detection_confidence', 0):.1%} | |
| - **Calibration**: {visual_results.get('px_per_cm','?')} px/cm ({(visual_results.get('calibration_meta') or {}).get('used','default')}) | |
| ## 📊 Analysis Images | |
| - **Original**: {visual_results.get('original_image_path', 'N/A')} | |
| - **Detection**: {visual_results.get('detection_image_path', 'N/A')} | |
| - **Segmentation**: {visual_results.get('segmentation_image_path', 'N/A')} | |
| - **Annotated**: {visual_results.get('segmentation_annotated_path', 'N/A')} | |
| ## 🎯 Clinical Summary | |
| Automated analysis provides quantitative measurements; verify via clinical examination. | |
| ## 💊 Recommendations | |
| - Cleanse wound gently; select dressing per exudate/infection risk | |
| - Debride necrotic tissue if indicated (clinical decision) | |
| - Document with serial photos and measurements | |
| ## 📅 Monitoring | |
| - Daily in week 1, then every 2–3 days (or as indicated) | |
| - Weekly progress review | |
| ## 📚 Guideline Context | |
| {(guideline_context or '')[:800]}{"..." if guideline_context and len(guideline_context) > 800 else ''} | |
| **Disclaimer:** Automated, for decision support only. Verify clinically. | |
| """ | |
| def generate_final_report( | |
| self, | |
| patient_info: str, | |
| visual_results: Dict, | |
| guideline_context: str, | |
| image_pil: Image.Image, | |
| max_new_tokens: Optional[int] = None, | |
| ) -> str: | |
| try: | |
| report = generate_medgemma_report( | |
| patient_info, visual_results, guideline_context, image_pil, max_new_tokens | |
| ) | |
| if report and report.strip() and not report.startswith(("⚠️", "❌")): | |
| return report | |
| logging.warning("VLM unavailable/invalid; using fallback.") | |
| return self._generate_fallback_report(patient_info, visual_results, guideline_context) | |
| except Exception as e: | |
| logging.error(f"Report generation failed: {e}") | |
| return self._generate_fallback_report(patient_info, visual_results, guideline_context) | |
| def save_and_commit_image(self, image_pil: Image.Image) -> str: | |
| try: | |
| os.makedirs(self.uploads_dir, exist_ok=True) | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{ts}.png" | |
| path = os.path.join(self.uploads_dir, filename) | |
| image_pil.convert("RGB").save(path) | |
| logging.info(f"✅ Image saved locally: {path}") | |
| if HF_TOKEN and DATASET_ID: | |
| try: | |
| HfApi, HfFolder = _import_hf_hub() | |
| HfFolder.save_token(HF_TOKEN) | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=path, | |
| path_in_repo=f"images/{filename}", | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Upload wound image: {filename}", | |
| ) | |
| logging.info("✅ Image committed to HF dataset") | |
| except Exception as e: | |
| logging.warning(f"HF upload failed: {e}") | |
| return path | |
| except Exception as e: | |
| logging.error(f"Failed to save/commit image: {e}") | |
| return "" | |
| def full_analysis_pipeline(self, image_pil: Image.Image, questionnaire_data: Dict) -> Dict: | |
| try: | |
| saved_path = self.save_and_commit_image(image_pil) | |
| visual_results = self.perform_visual_analysis(image_pil) | |
| pi = questionnaire_data or {} | |
| patient_info = ( | |
| f"Age: {pi.get('age','N/A')}, " | |
| f"Diabetic: {pi.get('diabetic','N/A')}, " | |
| f"Allergies: {pi.get('allergies','N/A')}, " | |
| f"Date of Wound: {pi.get('date_of_injury','N/A')}, " | |
| f"Professional Care: {pi.get('professional_care','N/A')}, " | |
| f"Oozing/Bleeding: {pi.get('oozing_bleeding','N/A')}, " | |
| f"Infection: {pi.get('infection','N/A')}, " | |
| f"Moisture: {pi.get('moisture','N/A')}" | |
| ) | |
| query = ( | |
| f"best practices for managing a {visual_results.get('wound_type','Unknown')} " | |
| f"with moisture '{pi.get('moisture','unknown')}' and infection '{pi.get('infection','unknown')}' " | |
| f"in a diabetic status '{pi.get('diabetic','unknown')}'" | |
| ) | |
| guideline_context = self.query_guidelines(query) | |
| report = self.generate_final_report(patient_info, visual_results, guideline_context, image_pil) | |
| return { | |
| "success": True, | |
| "visual_analysis": visual_results, | |
| "report": report, | |
| "saved_image_path": saved_path, | |
| "guideline_context": (guideline_context or "")[:500] + ( | |
| "..." if guideline_context and len(guideline_context) > 500 else "" | |
| ), | |
| } | |
| except Exception as e: | |
| logging.error(f"Pipeline error: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "visual_analysis": {}, | |
| "report": f"Analysis failed: {str(e)}", | |
| "saved_image_path": None, | |
| "guideline_context": "", | |
| } | |
| def analyze_wound( | |
| self, | |
| image, | |
| questionnaire_data: Dict, | |
| seg_adjust: float = 0.0, | |
| manual_mask_path: Optional[str] = None, | |
| ) -> Dict: | |
| """ | |
| Analyze a wound image and return a dictionary with visual analysis, report and paths. | |
| """ | |
| try: | |
| # Normalize input to PIL | |
| if isinstance(image, str): | |
| if not os.path.exists(image): | |
| raise ValueError(f"Image file not found: {image}") | |
| image_pil = Image.open(image) | |
| elif isinstance(image, Image.Image): | |
| image_pil = image | |
| elif isinstance(image, np.ndarray): | |
| image_pil = Image.fromarray(image) | |
| else: | |
| raise ValueError(f"Unsupported image type: {type(image)}") | |
| # Run the standard pipeline | |
| result = self.full_analysis_pipeline(image_pil, questionnaire_data or {}) | |
| # If neither manual mask nor adjustment specified, return as is | |
| if (not manual_mask_path) and (abs(seg_adjust) < 1e-5): | |
| return result | |
| # Extract visual analysis and calibration from result | |
| visual = result.get("visual_analysis", {}) or {} | |
| px_per_cm = float(visual.get("px_per_cm", DEFAULT_PX_PER_CM)) | |
| # Attempt to load a mask | |
| roi_mask_path = visual.get("roi_mask_path") | |
| mask_img = None | |
| if manual_mask_path: | |
| try: | |
| if os.path.exists(manual_mask_path): | |
| mask_img = Image.open(manual_mask_path) | |
| else: | |
| logging.warning(f"Manual mask path does not exist: {manual_mask_path}") | |
| except Exception as e: | |
| logging.warning(f"Failed to load manual mask: {e}") | |
| elif roi_mask_path and os.path.exists(roi_mask_path): | |
| try: | |
| mask_img = Image.open(roi_mask_path) | |
| except Exception as e: | |
| logging.warning(f"Failed to load ROI mask for adjustment: {e}") | |
| if mask_img is not None: | |
| mask_np = np.array(mask_img.convert("L")) | |
| # If adjustment requested and no manual override | |
| if (manual_mask_path is None) and (abs(seg_adjust) >= 1e-5): | |
| iter_count = max(1, int(round(abs(seg_adjust) / 5))) | |
| kernel = np.ones((3, 3), np.uint8) | |
| try: | |
| if seg_adjust > 0: | |
| mask_np = cv2.dilate((mask_np > 127).astype(np.uint8), kernel, iterations=iter_count) | |
| else: | |
| mask_np = cv2.erode((mask_np > 127).astype(np.uint8), kernel, iterations=iter_count) | |
| except Exception as e: | |
| logging.warning(f"Segmentation adjustment failed: {e}") | |
| else: | |
| mask_np = (mask_np > 127).astype(np.uint8) | |
| # Recalculate metrics | |
| try: | |
| length_cm, breadth_cm, area_cm2 = self._refine_metrics_from_mask(mask_np, px_per_cm) | |
| visual["length_cm"] = length_cm | |
| visual["breadth_cm"] = breadth_cm | |
| visual["surface_area_cm2"] = area_cm2 | |
| visual["segmentation_refined"] = bool(manual_mask_path) or (abs(seg_adjust) >= 1e-5) | |
| except Exception as e: | |
| logging.warning(f"Failed to recalculate metrics from mask: {e}") | |
| # ------- Manual overlay with wound-only red + ARROWS ------- | |
| if manual_mask_path: | |
| try: | |
| base_rgb = np.array(image_pil.convert("RGB")) | |
| base_bgr = cv2.cvtColor(base_rgb, cv2.COLOR_RGB2BGR) | |
| h, w = base_bgr.shape[:2] | |
| if mask_np.shape[:2] != (h, w): | |
| mask_np = cv2.resize(mask_np.astype(np.uint8), (w, h), interpolation=cv2.INTER_NEAREST) | |
| # If mask seems inverted (covers majority), flip it so 1 = wound | |
| wound_fraction = float(mask_np.mean()) | |
| if wound_fraction > 0.5: | |
| mask_np = (1 - mask_np).astype(np.uint8) | |
| # Output dir | |
| out_dir = os.path.dirname(roi_mask_path or result.get("saved_image_path") or manual_mask_path) or os.getcwd() | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Save clean binary manual mask | |
| manual_mask_save = os.path.join(out_dir, f"manual_mask_{ts}.png") | |
| cv2.imwrite(manual_mask_save, (mask_np * 255).astype(np.uint8)) | |
| # Base red overlay on wound only | |
| red = np.zeros_like(base_bgr); red[:] = (0, 0, 255) | |
| alpha = 0.55 | |
| tinted = cv2.addWeighted(base_bgr, 1 - alpha, red, alpha, 0) | |
| mask255 = (mask_np * 255).astype(np.uint8) | |
| mask3 = cv2.merge([mask255, mask255, mask255]) | |
| overlay = np.where(mask3 > 0, tinted, base_bgr) | |
| # ---- Draw double-headed arrows + labels for Length & Width ---- | |
| ys, xs = np.where(mask_np > 0) | |
| if xs.size and ys.size: | |
| x0, x1 = int(xs.min()), int(xs.max()) | |
| y0, y1 = int(ys.min()), int(ys.max()) | |
| w_px = x1 - x0 + 1 | |
| h_px = y1 - y0 + 1 | |
| # Compute cm from px (fallback-safe) | |
| def _px_to_cm(px): | |
| try: | |
| return float(px) / float(px_per_cm if px_per_cm else DEFAULT_PX_PER_CM) | |
| except Exception: | |
| return float(px) | |
| L_px = max(w_px, h_px) | |
| W_px = min(w_px, h_px) | |
| L_cm = _px_to_cm(L_px) | |
| W_cm = _px_to_cm(W_px) | |
| # Horizontal (center y) and vertical (center x) lines | |
| cy = (y0 + y1) // 2 | |
| cx = (x0 + x1) // 2 | |
| h_start, h_end = (x0, cy), (x1, cy) | |
| v_start, v_end = (cx, y0), (cx, y1) | |
| # Helper: outlined arrowed line (black underlay + white line) | |
| def draw_double_headed(img, p1, p2, color_fg=(255,255,255), color_bg=(0,0,0), t_fg=3, t_bg=6): | |
| cv2.arrowedLine(img, p1, p2, color_bg, t_bg, tipLength=0.03) | |
| cv2.arrowedLine(img, p2, p1, color_bg, t_bg, tipLength=0.03) | |
| cv2.arrowedLine(img, p1, p2, color_fg, t_fg, tipLength=0.03) | |
| cv2.arrowedLine(img, p2, p1, color_fg, t_fg, tipLength=0.03) | |
| # Draw both arrows | |
| draw_double_headed(overlay, h_start, h_end) | |
| draw_double_headed(overlay, v_start, v_end) | |
| # Helper: outlined text | |
| def put_text_outlined(img, text, org, font=cv2.FONT_HERSHEY_SIMPLEX, scale=0.7, | |
| color_fg=(255,255,255), color_bg=(0,0,0), t_fg=2, t_bg=4): | |
| cv2.putText(img, text, org, font, scale, color_bg, t_bg, cv2.LINE_AA) | |
| cv2.putText(img, text, org, font, scale, color_fg, t_fg, cv2.LINE_AA) | |
| # Decide which is Length vs Width for labels | |
| if w_px >= h_px: | |
| # horizontal is length | |
| put_text_outlined(overlay, f"Length: {L_cm:.2f} cm", | |
| (x0, max(25, cy - 10))) | |
| put_text_outlined(overlay, f"Width: {W_cm:.2f} cm", | |
| (max(5, cx + 10), y0 + 25)) | |
| else: | |
| # vertical is length | |
| put_text_outlined(overlay, f"Length: {L_cm:.2f} cm", | |
| (max(5, cx + 10), cy)) | |
| put_text_outlined(overlay, f"Width: {W_cm:.2f} cm", | |
| (x0, max(25, cy - 10))) | |
| # Save overlay with arrows | |
| manual_overlay_path = os.path.join(out_dir, f"segmentation_manual_{ts}.png") | |
| cv2.imwrite(manual_overlay_path, overlay) | |
| # Update paths so UI shows the manual overlay (with arrows) | |
| visual["roi_mask_path"] = manual_mask_save | |
| visual["segmentation_image_path"] = manual_overlay_path | |
| visual["segmentation_roi_path"] = manual_overlay_path | |
| visual["segmentation_annotated_path"] = manual_overlay_path | |
| visual["segmentation_refined_type"] = "manual" | |
| visual["manual_mask_used"] = True | |
| except Exception as e: | |
| logging.warning(f"Failed to generate manual segmentation overlay: {e}") | |
| # ---------------------------------------------------------- | |
| result["visual_analysis"] = visual | |
| return result | |
| except Exception as e: | |
| logging.error(f"Wound analysis error: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "visual_analysis": {}, | |
| "report": f"Analysis initialization failed: {str(e)}", | |
| "saved_image_path": None, | |
| "guideline_context": "", | |
| } | |