Spaces:
Sleeping
Sleeping
Update src/ai_processor.py
Browse files- src/ai_processor.py +193 -253
src/ai_processor.py
CHANGED
|
@@ -766,182 +766,34 @@ class AIProcessor:
|
|
| 766 |
self.dataset_id = DATASET_ID
|
| 767 |
self.hf_token = HF_TOKEN
|
| 768 |
|
| 769 |
-
# ---------------------------------------------------------------------
|
| 770 |
-
# Skin tone estimation helpers
|
| 771 |
-
# ---------------------------------------------------------------------
|
| 772 |
-
def _estimate_skin_tone(self, bgr_roi: np.ndarray, mask01: np.ndarray) -> Tuple[str, float]:
|
| 773 |
-
"""
|
| 774 |
-
Estimate the Fitzpatrick skin tone based on the non-wound region within the ROI.
|
| 775 |
-
We convert to CIELAB, compute the mean L* and b* over non-wound pixels and
|
| 776 |
-
derive the Individual Typology Angle (ITA). The ITA is then mapped to a
|
| 777 |
-
Fitzpatrick skin type using Del Bino et al. ranges【854677944493859†L221-L227】.
|
| 778 |
-
Returns a (label, ita_degrees) tuple.
|
| 779 |
-
"""
|
| 780 |
-
try:
|
| 781 |
-
# Convert ROI to Lab (OpenCV uses L:0-100, a,b:-128..127)
|
| 782 |
-
lab = cv2.cvtColor(bgr_roi, cv2.COLOR_BGR2LAB).astype(np.float32)
|
| 783 |
-
L = lab[..., 0] * (100.0 / 255.0) # scale L to [0,100]
|
| 784 |
-
b_channel = lab[..., 2] - 128.0 # b* in [-128,127]
|
| 785 |
-
# Invert mask to sample non-wound skin
|
| 786 |
-
non_wound = (mask01 == 0)
|
| 787 |
-
if non_wound.sum() == 0:
|
| 788 |
-
# fallback: use entire ROI
|
| 789 |
-
non_wound = np.ones_like(mask01, dtype=bool)
|
| 790 |
-
mean_L = float(L[non_wound].mean())
|
| 791 |
-
mean_b = float(b_channel[non_wound].mean())
|
| 792 |
-
# Compute ITA (Del Bino): arctan((L* - 50)/b*) * 180/pi
|
| 793 |
-
# Avoid division by zero
|
| 794 |
-
ita_rad = np.arctan2((mean_L - 50.0), max(mean_b, 1e-6))
|
| 795 |
-
ita_deg = float(np.degrees(ita_rad))
|
| 796 |
-
# Map ITA to Fitzpatrick type (Del Bino classification)
|
| 797 |
-
# Very light: ITA>55; Light:41-55; Intermediate:28-41; Tan:10-28;
|
| 798 |
-
# Brown:-30 to 10; Dark:<=-30【854677944493859†L221-L227】
|
| 799 |
-
if ita_deg > 55:
|
| 800 |
-
label = "Type I (Very Light)"
|
| 801 |
-
elif ita_deg > 41:
|
| 802 |
-
label = "Type II (Light)"
|
| 803 |
-
elif ita_deg > 28:
|
| 804 |
-
label = "Type III (Intermediate)"
|
| 805 |
-
elif ita_deg > 10:
|
| 806 |
-
label = "Type IV (Tan)"
|
| 807 |
-
elif ita_deg > -30:
|
| 808 |
-
label = "Type V (Brown)"
|
| 809 |
-
else:
|
| 810 |
-
label = "Type VI (Dark)"
|
| 811 |
-
return label, ita_deg
|
| 812 |
-
except Exception:
|
| 813 |
-
return "Unknown", 0.0
|
| 814 |
-
|
| 815 |
-
# ---------------------------------------------------------------------
|
| 816 |
-
# Tissue classification helpers
|
| 817 |
-
# ---------------------------------------------------------------------
|
| 818 |
-
def _classify_tissue_map(self, bgr_roi: np.ndarray, mask01: np.ndarray) -> Tuple[np.ndarray, str]:
|
| 819 |
-
"""
|
| 820 |
-
Classify each pixel in the wound mask into tissue types based on HSV.
|
| 821 |
-
- Necrotic: very dark (V<50)
|
| 822 |
-
- Slough: yellow/green hues (Hue between ~15 and ~40 in OpenCV's 0-179 scale)
|
| 823 |
-
- Granulation: remaining (reds/oranges)
|
| 824 |
-
Returns the classification map (same size as mask) with integer codes
|
| 825 |
-
(0=background/outside wound, 1=granulation, 2=slough, 3=necrotic) and a
|
| 826 |
-
summary label based on majority class.
|
| 827 |
-
"""
|
| 828 |
-
h, w = mask01.shape
|
| 829 |
-
class_map = np.zeros((h, w), dtype=np.uint8)
|
| 830 |
-
try:
|
| 831 |
-
hsv = cv2.cvtColor(bgr_roi, cv2.COLOR_BGR2HSV)
|
| 832 |
-
H, S, V = cv2.split(hsv)
|
| 833 |
-
# Define masks within wound
|
| 834 |
-
wound_pixels = (mask01 == 1)
|
| 835 |
-
# Initialize counts
|
| 836 |
-
counts = {1: 0, 2: 0, 3: 0}
|
| 837 |
-
# Classification thresholds
|
| 838 |
-
# Compute categories using boolean masks for performance
|
| 839 |
-
necrotic_mask = (V < 50) & wound_pixels
|
| 840 |
-
# Hue thresholds for slough (yellow) in OpenCV scale (0-179). Yellow ~ 15-40
|
| 841 |
-
slough_mask = (H >= 15) & (H <= 40) & (V >= 50) & wound_pixels
|
| 842 |
-
# Remaining wound pixels are granulation
|
| 843 |
-
granulation_mask = wound_pixels & (~necrotic_mask) & (~slough_mask)
|
| 844 |
-
class_map[granulation_mask] = 1
|
| 845 |
-
class_map[slough_mask] = 2
|
| 846 |
-
class_map[necrotic_mask] = 3
|
| 847 |
-
counts[1] = int(granulation_mask.sum())
|
| 848 |
-
counts[2] = int(slough_mask.sum())
|
| 849 |
-
counts[3] = int(necrotic_mask.sum())
|
| 850 |
-
total = sum(counts.values())
|
| 851 |
-
# Determine summary type
|
| 852 |
-
if total == 0:
|
| 853 |
-
return class_map, "Unknown"
|
| 854 |
-
# Compute percentages
|
| 855 |
-
percents = {k: v / total for k, v in counts.items()}
|
| 856 |
-
max_class = max(percents, key=percents.get)
|
| 857 |
-
if percents[max_class] < 0.6:
|
| 858 |
-
summary = "Mixed"
|
| 859 |
-
else:
|
| 860 |
-
summary = {1: "Granulation", 2: "Slough", 3: "Necrotic"}.get(max_class, "Unknown")
|
| 861 |
-
return class_map, summary
|
| 862 |
-
except Exception:
|
| 863 |
-
return class_map, "Unknown"
|
| 864 |
-
|
| 865 |
-
def _create_tissue_overlay(self, roi: np.ndarray, mask01: np.ndarray, class_map: np.ndarray) -> np.ndarray:
|
| 866 |
-
"""
|
| 867 |
-
Create a colored overlay for the wound region based on tissue classification.
|
| 868 |
-
Colors: granulation→red, slough→yellow, necrotic→black. Outside the wound
|
| 869 |
-
region retains the original ROI. Blending uses alpha to retain texture.
|
| 870 |
-
"""
|
| 871 |
-
overlay = roi.copy()
|
| 872 |
-
try:
|
| 873 |
-
# Define color mapping BGR
|
| 874 |
-
colors = {
|
| 875 |
-
1: np.array([0, 0, 255], dtype=np.uint8), # granulation: red
|
| 876 |
-
2: np.array([0, 255, 255], dtype=np.uint8), # slough: yellow
|
| 877 |
-
3: np.array([0, 0, 0], dtype=np.uint8), # necrotic: black (eschar)
|
| 878 |
-
}
|
| 879 |
-
alpha = 0.5
|
| 880 |
-
for class_id, color in colors.items():
|
| 881 |
-
mask_cls = (class_map == class_id)
|
| 882 |
-
if mask_cls.any():
|
| 883 |
-
# Create a color array same shape as ROI
|
| 884 |
-
col_img = np.zeros_like(roi)
|
| 885 |
-
col_img[:] = color
|
| 886 |
-
# Blend only for class pixels
|
| 887 |
-
overlay[mask_cls] = cv2.addWeighted(roi[mask_cls], 1 - alpha, col_img[mask_cls], alpha, 0)
|
| 888 |
-
# Draw boundary of wound
|
| 889 |
-
cnts, _ = cv2.findContours((mask01 * 255).astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 890 |
-
if cnts:
|
| 891 |
-
cv2.drawContours(overlay, cnts, -1, (255, 255, 255), 2)
|
| 892 |
-
except Exception:
|
| 893 |
-
pass
|
| 894 |
-
return overlay
|
| 895 |
-
|
| 896 |
def _ensure_analysis_dir(self) -> str:
|
| 897 |
out_dir = os.path.join(self.uploads_dir, "analysis")
|
| 898 |
os.makedirs(out_dir, exist_ok=True)
|
| 899 |
return out_dir
|
| 900 |
|
| 901 |
-
def perform_visual_analysis(
|
| 902 |
-
self,
|
| 903 |
-
image_pil: Image.Image,
|
| 904 |
-
manual_mask_path: Optional[str] = None,
|
| 905 |
-
mask_adjustment: int = 0
|
| 906 |
-
) -> Dict:
|
| 907 |
"""
|
| 908 |
-
|
| 909 |
-
|
| 910 |
-
1) Detect the wound ROI using YOLO.
|
| 911 |
-
2) If manual_mask_path is provided, use it as the wound mask; otherwise run
|
| 912 |
-
segmentation (model-first, fallback to KMeans) and clean the result.
|
| 913 |
-
3) Apply optional morphological adjustment (dilation/erosion) to fine-tune
|
| 914 |
-
the mask.
|
| 915 |
-
4) Measure length, width, and area in cm using EXIF-calibrated px/cm.
|
| 916 |
-
5) Compute skin tone and tissue classification within the ROI.
|
| 917 |
-
6) Generate color-coded overlays and save outputs.
|
| 918 |
-
Returns a dictionary with measurements, paths, and classification info.
|
| 919 |
"""
|
| 920 |
try:
|
| 921 |
-
# Calibration via EXIF
|
| 922 |
px_per_cm, exif_meta = estimate_px_per_cm_from_exif(image_pil, DEFAULT_PX_PER_CM)
|
|
|
|
| 923 |
px_per_cm = float(np.clip(px_per_cm, 20.0, 350.0))
|
| 924 |
if (exif_meta or {}).get("used") != "exif":
|
| 925 |
-
logging.warning(
|
| 926 |
-
f"Calibration fallback used: px_per_cm={px_per_cm:.2f} (default). Prefer ruler/Aruco for accuracy."
|
| 927 |
-
)
|
| 928 |
|
| 929 |
-
# Convert full image to BGR
|
| 930 |
image_cv = cv2.cvtColor(np.array(image_pil.convert("RGB")), cv2.COLOR_RGB2BGR)
|
| 931 |
|
| 932 |
# --- Detection ---
|
| 933 |
det_model = self.models_cache.get("det")
|
| 934 |
if det_model is None:
|
| 935 |
raise RuntimeError("YOLO model not loaded")
|
| 936 |
-
# CPU inference
|
| 937 |
results = det_model.predict(image_cv, verbose=False, device="cpu")
|
| 938 |
-
if (
|
| 939 |
-
not results
|
| 940 |
-
or not getattr(results[0], "boxes", None)
|
| 941 |
-
or len(results[0].boxes) == 0
|
| 942 |
-
):
|
| 943 |
try:
|
| 944 |
-
import gradio as gr
|
| 945 |
raise gr.Error("No wound could be detected.")
|
| 946 |
except Exception:
|
| 947 |
raise RuntimeError("No wound could be detected.")
|
|
@@ -953,7 +805,7 @@ class AIProcessor:
|
|
| 953 |
roi = image_cv[y1:y2, x1:x2].copy()
|
| 954 |
if roi.size == 0:
|
| 955 |
try:
|
| 956 |
-
import gradio as gr
|
| 957 |
raise gr.Error("Detected ROI is empty.")
|
| 958 |
except Exception:
|
| 959 |
raise RuntimeError("Detected ROI is empty.")
|
|
@@ -961,49 +813,15 @@ class AIProcessor:
|
|
| 961 |
out_dir = self._ensure_analysis_dir()
|
| 962 |
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| 963 |
|
| 964 |
-
# ---
|
| 965 |
-
seg_debug =
|
| 966 |
-
|
| 967 |
-
|
| 968 |
-
|
| 969 |
-
|
| 970 |
-
|
| 971 |
-
|
| 972 |
-
|
| 973 |
-
# Clean mask
|
| 974 |
-
if mask01.any():
|
| 975 |
-
mask01 = _clean_mask(mask01)
|
| 976 |
-
seg_debug.update({"used": "manual_mask", "reason": "provided_by_user"})
|
| 977 |
-
except Exception as e:
|
| 978 |
-
logging.warning(f"Manual mask load failed: {e}; falling back to auto segmentation")
|
| 979 |
-
manual_mask_path = None
|
| 980 |
-
mask01 = None # to be set by auto segmentation
|
| 981 |
-
else:
|
| 982 |
-
mask01 = None
|
| 983 |
-
|
| 984 |
-
if mask01 is None:
|
| 985 |
-
# Run segmentation
|
| 986 |
-
mask_u8_255, seg_debug = segment_wound(roi, ts, out_dir)
|
| 987 |
-
mask01 = (mask_u8_255 > 127).astype(np.uint8)
|
| 988 |
-
if mask01.any():
|
| 989 |
-
mask01 = _clean_mask(mask01)
|
| 990 |
-
logging.debug(f"Mask postproc: px_after={int(mask01.sum())}")
|
| 991 |
-
else:
|
| 992 |
-
logging.debug("Segmentation produced empty mask")
|
| 993 |
-
|
| 994 |
-
# --- Apply morphological adjustment ---
|
| 995 |
-
if mask01.any() and isinstance(mask_adjustment, (int, float)):
|
| 996 |
-
adj = int(mask_adjustment)
|
| 997 |
-
if adj != 0:
|
| 998 |
-
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
|
| 999 |
-
if adj > 0:
|
| 1000 |
-
mask01 = cv2.dilate(mask01, k, iterations=adj)
|
| 1001 |
-
elif adj < 0:
|
| 1002 |
-
mask01 = cv2.erode(mask01, k, iterations=abs(adj))
|
| 1003 |
-
# Re-clean to keep a single connected region
|
| 1004 |
-
mask01 = _clean_mask(mask01)
|
| 1005 |
-
|
| 1006 |
-
# --- Measurement ---
|
| 1007 |
if mask01.any():
|
| 1008 |
length_cm, breadth_cm, (box_pts, _) = measure_min_area_rect(mask01, px_per_cm)
|
| 1009 |
area_poly_cm2, largest_cnt = area_cm2_from_contour(mask01, px_per_cm)
|
|
@@ -1011,69 +829,62 @@ class AIProcessor:
|
|
| 1011 |
surface_area_cm2 = clamp_area_with_minrect(largest_cnt, px_per_cm, area_poly_cm2)
|
| 1012 |
else:
|
| 1013 |
surface_area_cm2 = area_poly_cm2
|
|
|
|
|
|
|
| 1014 |
segmentation_empty = False
|
| 1015 |
else:
|
| 1016 |
-
#
|
| 1017 |
-
h_px = max(0, y2 - y1)
|
| 1018 |
-
w_px = max(0, x2 - x1)
|
| 1019 |
length_cm = round(max(h_px, w_px) / px_per_cm, 2)
|
| 1020 |
breadth_cm = round(min(h_px, w_px) / px_per_cm, 2)
|
| 1021 |
surface_area_cm2 = round((h_px * w_px) / (px_per_cm ** 2), 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1022 |
box_pts = None
|
| 1023 |
segmentation_empty = True
|
| 1024 |
|
| 1025 |
-
# ---
|
| 1026 |
-
skin_label, ita_deg = self._estimate_skin_tone(roi, mask01)
|
| 1027 |
-
tissue_map, tissue_type = self._classify_tissue_map(roi, mask01)
|
| 1028 |
-
|
| 1029 |
-
# --- Overlays ---
|
| 1030 |
-
# ROI overlay: color-coded by tissue type
|
| 1031 |
-
if mask01.any():
|
| 1032 |
-
roi_overlay = self._create_tissue_overlay(roi, mask01, tissue_map)
|
| 1033 |
-
else:
|
| 1034 |
-
roi_overlay = roi.copy()
|
| 1035 |
-
|
| 1036 |
-
# Annotated ROI with measurement arrows
|
| 1037 |
-
if not segmentation_empty:
|
| 1038 |
-
anno_roi = draw_measurement_overlay(roi, mask01, box_pts, length_cm, breadth_cm)
|
| 1039 |
-
else:
|
| 1040 |
-
# Draw bounding box lines on fallback ROI
|
| 1041 |
-
anno_roi = roi.copy()
|
| 1042 |
-
cv2.rectangle(
|
| 1043 |
-
anno_roi,
|
| 1044 |
-
(2, 2),
|
| 1045 |
-
(anno_roi.shape[1] - 3, anno_roi.shape[0] - 3),
|
| 1046 |
-
(0, 0, 255),
|
| 1047 |
-
3,
|
| 1048 |
-
)
|
| 1049 |
-
cv2.line(anno_roi, (0, 0), (anno_roi.shape[1] - 1, anno_roi.shape[0] - 1), (0, 0, 255), 2)
|
| 1050 |
-
cv2.line(anno_roi, (anno_roi.shape[1] - 1, 0), (0, anno_roi.shape[0] - 1), (0, 0, 255), 2)
|
| 1051 |
-
|
| 1052 |
-
# Save files
|
| 1053 |
original_path = os.path.join(out_dir, f"original_{ts}.png")
|
| 1054 |
cv2.imwrite(original_path, image_cv)
|
| 1055 |
-
|
| 1056 |
det_vis = image_cv.copy()
|
| 1057 |
cv2.rectangle(det_vis, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
| 1058 |
detection_path = os.path.join(out_dir, f"detection_{ts}.png")
|
| 1059 |
cv2.imwrite(detection_path, det_vis)
|
| 1060 |
-
|
| 1061 |
roi_mask_path = os.path.join(out_dir, f"roi_mask_{ts}.png")
|
| 1062 |
cv2.imwrite(roi_mask_path, (mask01 * 255).astype(np.uint8))
|
| 1063 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1064 |
seg_full = image_cv.copy()
|
| 1065 |
seg_full[y1:y2, x1:x2] = roi_overlay
|
| 1066 |
segmentation_path = os.path.join(out_dir, f"segmentation_{ts}.png")
|
| 1067 |
cv2.imwrite(segmentation_path, seg_full)
|
|
|
|
| 1068 |
segmentation_roi_path = os.path.join(out_dir, f"segmentation_roi_{ts}.png")
|
| 1069 |
cv2.imwrite(segmentation_roi_path, roi_overlay)
|
| 1070 |
-
|
|
|
|
| 1071 |
anno_full = image_cv.copy()
|
| 1072 |
anno_full[y1:y2, x1:x2] = anno_roi
|
| 1073 |
annotated_seg_path = os.path.join(out_dir, f"segmentation_annotated_{ts}.png")
|
| 1074 |
cv2.imwrite(annotated_seg_path, anno_full)
|
| 1075 |
|
| 1076 |
-
# --- Optional
|
| 1077 |
wound_type = "Unknown"
|
| 1078 |
cls_pipe = self.models_cache.get("cls")
|
| 1079 |
if cls_pipe is not None:
|
|
@@ -1084,7 +895,7 @@ class AIProcessor:
|
|
| 1084 |
except Exception as e:
|
| 1085 |
logging.warning(f"Classification failed: {e}")
|
| 1086 |
|
| 1087 |
-
# Log
|
| 1088 |
seg_summary = {
|
| 1089 |
"seg_used": seg_debug.get("used"),
|
| 1090 |
"seg_reason": seg_debug.get("reason"),
|
|
@@ -1102,7 +913,8 @@ class AIProcessor:
|
|
| 1102 |
"surface_area_cm2": surface_area_cm2,
|
| 1103 |
"px_per_cm": round(px_per_cm, 2),
|
| 1104 |
"calibration_meta": exif_meta,
|
| 1105 |
-
|
|
|
|
| 1106 |
"detection_image_path": detection_path,
|
| 1107 |
"segmentation_image_path": annotated_seg_path,
|
| 1108 |
"segmentation_annotated_path": annotated_seg_path,
|
|
@@ -1111,14 +923,55 @@ class AIProcessor:
|
|
| 1111 |
"segmentation_empty": segmentation_empty,
|
| 1112 |
"segmentation_debug": seg_debug,
|
| 1113 |
"original_image_path": original_path,
|
| 1114 |
-
"skin_tone_label": skin_label,
|
| 1115 |
-
"ita_degrees": round(ita_deg, 2),
|
| 1116 |
-
"tissue_type": tissue_type,
|
| 1117 |
}
|
| 1118 |
except Exception as e:
|
| 1119 |
logging.error(f"Visual analysis failed: {e}", exc_info=True)
|
| 1120 |
raise
|
| 1121 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1122 |
# ---------- Knowledge base + reporting ----------
|
| 1123 |
def query_guidelines(self, query: str) -> str:
|
| 1124 |
try:
|
|
@@ -1220,16 +1073,7 @@ Automated analysis provides quantitative measurements; verify via clinical exami
|
|
| 1220 |
def full_analysis_pipeline(self, image_pil: Image.Image, questionnaire_data: Dict) -> Dict:
|
| 1221 |
try:
|
| 1222 |
saved_path = self.save_and_commit_image(image_pil)
|
| 1223 |
-
|
| 1224 |
-
manual_mask_path = None
|
| 1225 |
-
mask_adjustment = 0
|
| 1226 |
-
if isinstance(questionnaire_data, dict):
|
| 1227 |
-
manual_mask_path = questionnaire_data.get("manual_mask_path") or questionnaire_data.get("manual_mask")
|
| 1228 |
-
try:
|
| 1229 |
-
mask_adjustment = int(questionnaire_data.get("mask_adjustment", 0) or 0)
|
| 1230 |
-
except Exception:
|
| 1231 |
-
mask_adjustment = 0
|
| 1232 |
-
visual_results = self.perform_visual_analysis(image_pil, manual_mask_path, mask_adjustment)
|
| 1233 |
|
| 1234 |
pi = questionnaire_data or {}
|
| 1235 |
patient_info = (
|
|
@@ -1272,8 +1116,42 @@ Automated analysis provides quantitative measurements; verify via clinical exami
|
|
| 1272 |
"guideline_context": "",
|
| 1273 |
}
|
| 1274 |
|
| 1275 |
-
def analyze_wound(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1276 |
try:
|
|
|
|
| 1277 |
if isinstance(image, str):
|
| 1278 |
if not os.path.exists(image):
|
| 1279 |
raise ValueError(f"Image file not found: {image}")
|
|
@@ -1285,7 +1163,69 @@ Automated analysis provides quantitative measurements; verify via clinical exami
|
|
| 1285 |
else:
|
| 1286 |
raise ValueError(f"Unsupported image type: {type(image)}")
|
| 1287 |
|
| 1288 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1289 |
except Exception as e:
|
| 1290 |
logging.error(f"Wound analysis error: {e}")
|
| 1291 |
return {
|
|
|
|
| 766 |
self.dataset_id = DATASET_ID
|
| 767 |
self.hf_token = HF_TOKEN
|
| 768 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 769 |
def _ensure_analysis_dir(self) -> str:
|
| 770 |
out_dir = os.path.join(self.uploads_dir, "analysis")
|
| 771 |
os.makedirs(out_dir, exist_ok=True)
|
| 772 |
return out_dir
|
| 773 |
|
| 774 |
+
def perform_visual_analysis(self, image_pil: Image.Image) -> Dict:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 775 |
"""
|
| 776 |
+
YOLO detect → crop ROI → segment_wound(ROI) → clean mask →
|
| 777 |
+
minAreaRect measurement (cm) using EXIF px/cm → save outputs.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 778 |
"""
|
| 779 |
try:
|
|
|
|
| 780 |
px_per_cm, exif_meta = estimate_px_per_cm_from_exif(image_pil, DEFAULT_PX_PER_CM)
|
| 781 |
+
# Guardrails for calibration to avoid huge area blow-ups
|
| 782 |
px_per_cm = float(np.clip(px_per_cm, 20.0, 350.0))
|
| 783 |
if (exif_meta or {}).get("used") != "exif":
|
| 784 |
+
logging.warning(f"Calibration fallback used: px_per_cm={px_per_cm:.2f} (default). Prefer ruler/Aruco for accuracy.")
|
|
|
|
|
|
|
| 785 |
|
|
|
|
| 786 |
image_cv = cv2.cvtColor(np.array(image_pil.convert("RGB")), cv2.COLOR_RGB2BGR)
|
| 787 |
|
| 788 |
# --- Detection ---
|
| 789 |
det_model = self.models_cache.get("det")
|
| 790 |
if det_model is None:
|
| 791 |
raise RuntimeError("YOLO model not loaded")
|
| 792 |
+
# Force CPU inference and avoid CUDA touch
|
| 793 |
results = det_model.predict(image_cv, verbose=False, device="cpu")
|
| 794 |
+
if (not results) or (not getattr(results[0], "boxes", None)) or (len(results[0].boxes) == 0):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 795 |
try:
|
| 796 |
+
import gradio as gr
|
| 797 |
raise gr.Error("No wound could be detected.")
|
| 798 |
except Exception:
|
| 799 |
raise RuntimeError("No wound could be detected.")
|
|
|
|
| 805 |
roi = image_cv[y1:y2, x1:x2].copy()
|
| 806 |
if roi.size == 0:
|
| 807 |
try:
|
| 808 |
+
import gradio as gr
|
| 809 |
raise gr.Error("Detected ROI is empty.")
|
| 810 |
except Exception:
|
| 811 |
raise RuntimeError("Detected ROI is empty.")
|
|
|
|
| 813 |
out_dir = self._ensure_analysis_dir()
|
| 814 |
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| 815 |
|
| 816 |
+
# --- Segmentation (model-first + KMeans fallback) ---
|
| 817 |
+
mask_u8_255, seg_debug = segment_wound(roi, ts, out_dir)
|
| 818 |
+
mask01 = (mask_u8_255 > 127).astype(np.uint8)
|
| 819 |
+
|
| 820 |
+
if mask01.any():
|
| 821 |
+
mask01 = _clean_mask(mask01)
|
| 822 |
+
logging.debug(f"Mask postproc: px_after={int(mask01.sum())}")
|
| 823 |
+
|
| 824 |
+
# --- Measurement (accurate & conservative) ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 825 |
if mask01.any():
|
| 826 |
length_cm, breadth_cm, (box_pts, _) = measure_min_area_rect(mask01, px_per_cm)
|
| 827 |
area_poly_cm2, largest_cnt = area_cm2_from_contour(mask01, px_per_cm)
|
|
|
|
| 829 |
surface_area_cm2 = clamp_area_with_minrect(largest_cnt, px_per_cm, area_poly_cm2)
|
| 830 |
else:
|
| 831 |
surface_area_cm2 = area_poly_cm2
|
| 832 |
+
|
| 833 |
+
anno_roi = draw_measurement_overlay(roi, mask01, box_pts, length_cm, breadth_cm)
|
| 834 |
segmentation_empty = False
|
| 835 |
else:
|
| 836 |
+
# Fallback if seg failed: use ROI dimensions
|
| 837 |
+
h_px = max(0, y2 - y1); w_px = max(0, x2 - x1)
|
|
|
|
| 838 |
length_cm = round(max(h_px, w_px) / px_per_cm, 2)
|
| 839 |
breadth_cm = round(min(h_px, w_px) / px_per_cm, 2)
|
| 840 |
surface_area_cm2 = round((h_px * w_px) / (px_per_cm ** 2), 2)
|
| 841 |
+
anno_roi = roi.copy()
|
| 842 |
+
cv2.rectangle(anno_roi, (2, 2), (anno_roi.shape[1]-3, anno_roi.shape[0]-3), (0, 0, 255), 3)
|
| 843 |
+
cv2.line(anno_roi, (0, 0), (anno_roi.shape[1]-1, anno_roi.shape[0]-1), (0, 0, 255), 2)
|
| 844 |
+
cv2.line(anno_roi, (anno_roi.shape[1]-1, 0), (0, anno_roi.shape[0]-1), (0, 0, 255), 2)
|
| 845 |
box_pts = None
|
| 846 |
segmentation_empty = True
|
| 847 |
|
| 848 |
+
# --- Save visualizations ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 849 |
original_path = os.path.join(out_dir, f"original_{ts}.png")
|
| 850 |
cv2.imwrite(original_path, image_cv)
|
| 851 |
+
|
| 852 |
det_vis = image_cv.copy()
|
| 853 |
cv2.rectangle(det_vis, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
| 854 |
detection_path = os.path.join(out_dir, f"detection_{ts}.png")
|
| 855 |
cv2.imwrite(detection_path, det_vis)
|
| 856 |
+
|
| 857 |
roi_mask_path = os.path.join(out_dir, f"roi_mask_{ts}.png")
|
| 858 |
cv2.imwrite(roi_mask_path, (mask01 * 255).astype(np.uint8))
|
| 859 |
+
|
| 860 |
+
# ROI overlay (mask tint + contour, without arrows)
|
| 861 |
+
mask255 = (mask01 * 255).astype(np.uint8)
|
| 862 |
+
mask3 = cv2.merge([mask255, mask255, mask255])
|
| 863 |
+
red = np.zeros_like(roi); red[:] = (0, 0, 255)
|
| 864 |
+
alpha = 0.55
|
| 865 |
+
tinted = cv2.addWeighted(roi, 1 - alpha, red, alpha, 0)
|
| 866 |
+
if mask255.any():
|
| 867 |
+
roi_overlay = np.where(mask3 > 0, tinted, roi)
|
| 868 |
+
cnts, _ = cv2.findContours(mask255, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 869 |
+
cv2.drawContours(roi_overlay, cnts, -1, (255, 255, 255), 2)
|
| 870 |
+
else:
|
| 871 |
+
roi_overlay = anno_roi
|
| 872 |
+
|
| 873 |
seg_full = image_cv.copy()
|
| 874 |
seg_full[y1:y2, x1:x2] = roi_overlay
|
| 875 |
segmentation_path = os.path.join(out_dir, f"segmentation_{ts}.png")
|
| 876 |
cv2.imwrite(segmentation_path, seg_full)
|
| 877 |
+
|
| 878 |
segmentation_roi_path = os.path.join(out_dir, f"segmentation_roi_{ts}.png")
|
| 879 |
cv2.imwrite(segmentation_roi_path, roi_overlay)
|
| 880 |
+
|
| 881 |
+
# Annotated (mask + arrows + labels) in full-frame
|
| 882 |
anno_full = image_cv.copy()
|
| 883 |
anno_full[y1:y2, x1:x2] = anno_roi
|
| 884 |
annotated_seg_path = os.path.join(out_dir, f"segmentation_annotated_{ts}.png")
|
| 885 |
cv2.imwrite(annotated_seg_path, anno_full)
|
| 886 |
|
| 887 |
+
# --- Optional classification ---
|
| 888 |
wound_type = "Unknown"
|
| 889 |
cls_pipe = self.models_cache.get("cls")
|
| 890 |
if cls_pipe is not None:
|
|
|
|
| 895 |
except Exception as e:
|
| 896 |
logging.warning(f"Classification failed: {e}")
|
| 897 |
|
| 898 |
+
# Log end-of-seg summary
|
| 899 |
seg_summary = {
|
| 900 |
"seg_used": seg_debug.get("used"),
|
| 901 |
"seg_reason": seg_debug.get("reason"),
|
|
|
|
| 913 |
"surface_area_cm2": surface_area_cm2,
|
| 914 |
"px_per_cm": round(px_per_cm, 2),
|
| 915 |
"calibration_meta": exif_meta,
|
| 916 |
+
"detection_confidence": float(results[0].boxes.conf[0].cpu().item())
|
| 917 |
+
if getattr(results[0].boxes, "conf", None) is not None else 0.0,
|
| 918 |
"detection_image_path": detection_path,
|
| 919 |
"segmentation_image_path": annotated_seg_path,
|
| 920 |
"segmentation_annotated_path": annotated_seg_path,
|
|
|
|
| 923 |
"segmentation_empty": segmentation_empty,
|
| 924 |
"segmentation_debug": seg_debug,
|
| 925 |
"original_image_path": original_path,
|
|
|
|
|
|
|
|
|
|
| 926 |
}
|
| 927 |
except Exception as e:
|
| 928 |
logging.error(f"Visual analysis failed: {e}", exc_info=True)
|
| 929 |
raise
|
| 930 |
|
| 931 |
+
# -------------------------------------------------------------------------
|
| 932 |
+
# Helper: refine measurements from a binary mask
|
| 933 |
+
# -------------------------------------------------------------------------
|
| 934 |
+
def _refine_metrics_from_mask(self, mask: np.ndarray, px_per_cm: float) -> Tuple[float, float, float]:
|
| 935 |
+
"""
|
| 936 |
+
Given a binary mask and pixel‑per‑centimeter calibration, compute length, breadth and area.
|
| 937 |
+
|
| 938 |
+
The mask should be a 2D numpy array of dtype uint8 or bool where 1 indicates wound pixels.
|
| 939 |
+
|
| 940 |
+
Parameters
|
| 941 |
+
----------
|
| 942 |
+
mask : np.ndarray
|
| 943 |
+
Binary mask of the wound region, shape (H, W). Non‑zero values denote wound pixels.
|
| 944 |
+
px_per_cm : float
|
| 945 |
+
Estimated pixels per centimeter calibration factor.
|
| 946 |
+
|
| 947 |
+
Returns
|
| 948 |
+
-------
|
| 949 |
+
tuple[float, float, float]
|
| 950 |
+
(length_cm, breadth_cm, area_cm2)
|
| 951 |
+
|
| 952 |
+
Notes
|
| 953 |
+
-----
|
| 954 |
+
This method approximates the wound measurements by computing the axis‑aligned bounding box
|
| 955 |
+
around all wound pixels and calculating the longer and shorter sides as length and width.
|
| 956 |
+
The surface area is computed as the number of wound pixels divided by (px_per_cm**2).
|
| 957 |
+
"""
|
| 958 |
+
if mask is None or mask.size == 0 or not np.any(mask):
|
| 959 |
+
return 0.0, 0.0, 0.0
|
| 960 |
+
# Ensure binary mask
|
| 961 |
+
mask01 = (mask > 0).astype(np.uint8)
|
| 962 |
+
# Find coordinates of wound pixels
|
| 963 |
+
coords = np.argwhere(mask01)
|
| 964 |
+
y_min, x_min = coords.min(axis=0)
|
| 965 |
+
y_max, x_max = coords.max(axis=0)
|
| 966 |
+
height_px = int(y_max - y_min + 1)
|
| 967 |
+
width_px = int(x_max - x_min + 1)
|
| 968 |
+
# Compute length as the larger dimension
|
| 969 |
+
length_cm = round(max(height_px, width_px) / float(px_per_cm), 2)
|
| 970 |
+
breadth_cm = round(min(height_px, width_px) / float(px_per_cm), 2)
|
| 971 |
+
area_px = int(mask01.sum())
|
| 972 |
+
area_cm2 = round(area_px / (float(px_per_cm) ** 2), 2)
|
| 973 |
+
return length_cm, breadth_cm, area_cm2
|
| 974 |
+
|
| 975 |
# ---------- Knowledge base + reporting ----------
|
| 976 |
def query_guidelines(self, query: str) -> str:
|
| 977 |
try:
|
|
|
|
| 1073 |
def full_analysis_pipeline(self, image_pil: Image.Image, questionnaire_data: Dict) -> Dict:
|
| 1074 |
try:
|
| 1075 |
saved_path = self.save_and_commit_image(image_pil)
|
| 1076 |
+
visual_results = self.perform_visual_analysis(image_pil)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1077 |
|
| 1078 |
pi = questionnaire_data or {}
|
| 1079 |
patient_info = (
|
|
|
|
| 1116 |
"guideline_context": "",
|
| 1117 |
}
|
| 1118 |
|
| 1119 |
+
def analyze_wound(
|
| 1120 |
+
self,
|
| 1121 |
+
image,
|
| 1122 |
+
questionnaire_data: Dict,
|
| 1123 |
+
seg_adjust: float = 0.0,
|
| 1124 |
+
manual_mask_path: Optional[str] = None,
|
| 1125 |
+
) -> Dict:
|
| 1126 |
+
"""
|
| 1127 |
+
Analyze a wound image and return a dictionary with visual analysis, report and paths.
|
| 1128 |
+
|
| 1129 |
+
Parameters
|
| 1130 |
+
----------
|
| 1131 |
+
image : str | PIL.Image.Image | np.ndarray
|
| 1132 |
+
The input wound image. May be a filepath, PIL image or numpy array.
|
| 1133 |
+
questionnaire_data : Dict
|
| 1134 |
+
Auxiliary questionnaire information.
|
| 1135 |
+
seg_adjust : float, optional
|
| 1136 |
+
Percentage adjustment to apply to the automatically generated segmentation mask. A positive
|
| 1137 |
+
value will dilate (expand) the wound mask, while a negative value will erode (shrink)
|
| 1138 |
+
the mask. Defaults to 0.0 (no adjustment).
|
| 1139 |
+
manual_mask_path : Optional[str], optional
|
| 1140 |
+
Filepath to a user‑provided mask image. If provided and valid, this mask will
|
| 1141 |
+
override the automatically generated segmentation for length, width and area
|
| 1142 |
+
measurement. The mask should be a binary image where non‑zero values correspond
|
| 1143 |
+
to wound pixels. Defaults to None.
|
| 1144 |
+
|
| 1145 |
+
Returns
|
| 1146 |
+
-------
|
| 1147 |
+
Dict
|
| 1148 |
+
A dictionary containing keys such as ``success``, ``visual_analysis``, ``report`` and
|
| 1149 |
+
``saved_image_path``. If either ``seg_adjust`` or ``manual_mask_path`` is provided and
|
| 1150 |
+
metrics can be recalculated, the returned ``visual_analysis`` will reflect the
|
| 1151 |
+
adjusted measurements.
|
| 1152 |
+
"""
|
| 1153 |
try:
|
| 1154 |
+
# Normalize input to PIL
|
| 1155 |
if isinstance(image, str):
|
| 1156 |
if not os.path.exists(image):
|
| 1157 |
raise ValueError(f"Image file not found: {image}")
|
|
|
|
| 1163 |
else:
|
| 1164 |
raise ValueError(f"Unsupported image type: {type(image)}")
|
| 1165 |
|
| 1166 |
+
# Run the standard pipeline
|
| 1167 |
+
result = self.full_analysis_pipeline(image_pil, questionnaire_data or {})
|
| 1168 |
+
|
| 1169 |
+
# If neither manual mask nor adjustment specified, return as is
|
| 1170 |
+
if (not manual_mask_path) and (abs(seg_adjust) < 1e-5):
|
| 1171 |
+
return result
|
| 1172 |
+
|
| 1173 |
+
# Extract visual analysis and calibration from result
|
| 1174 |
+
visual = result.get("visual_analysis", {})
|
| 1175 |
+
px_per_cm = float(visual.get("px_per_cm", DEFAULT_PX_PER_CM))
|
| 1176 |
+
# Attempt to load the ROI mask generated by the pipeline
|
| 1177 |
+
roi_mask_path = visual.get("roi_mask_path")
|
| 1178 |
+
mask_img = None
|
| 1179 |
+
|
| 1180 |
+
# Use manual mask if provided
|
| 1181 |
+
if manual_mask_path:
|
| 1182 |
+
try:
|
| 1183 |
+
if os.path.exists(manual_mask_path):
|
| 1184 |
+
mask_img = Image.open(manual_mask_path)
|
| 1185 |
+
else:
|
| 1186 |
+
logging.warning(f"Manual mask path does not exist: {manual_mask_path}")
|
| 1187 |
+
except Exception as e:
|
| 1188 |
+
logging.warning(f"Failed to load manual mask: {e}")
|
| 1189 |
+
elif roi_mask_path and os.path.exists(roi_mask_path):
|
| 1190 |
+
# Otherwise load the automatically generated ROI mask
|
| 1191 |
+
try:
|
| 1192 |
+
mask_img = Image.open(roi_mask_path)
|
| 1193 |
+
except Exception as e:
|
| 1194 |
+
logging.warning(f"Failed to load ROI mask for adjustment: {e}")
|
| 1195 |
+
|
| 1196 |
+
if mask_img is not None:
|
| 1197 |
+
# Convert to numpy for processing
|
| 1198 |
+
mask_np = np.array(mask_img.convert("L"))
|
| 1199 |
+
# If adjustment is requested and no manual mask override
|
| 1200 |
+
if (manual_mask_path is None) and (abs(seg_adjust) >= 1e-5):
|
| 1201 |
+
# Determine the number of iterations based on percentage; roughly 5% increments
|
| 1202 |
+
iter_count = max(1, int(round(abs(seg_adjust) / 5)))
|
| 1203 |
+
kernel = np.ones((3, 3), np.uint8)
|
| 1204 |
+
try:
|
| 1205 |
+
if seg_adjust > 0:
|
| 1206 |
+
# Dilate (expand) mask
|
| 1207 |
+
mask_np = cv2.dilate((mask_np > 127).astype(np.uint8), kernel, iterations=iter_count)
|
| 1208 |
+
else:
|
| 1209 |
+
# Erode (shrink) mask
|
| 1210 |
+
mask_np = cv2.erode((mask_np > 127).astype(np.uint8), kernel, iterations=iter_count)
|
| 1211 |
+
except Exception as e:
|
| 1212 |
+
logging.warning(f"Segmentation adjustment failed: {e}")
|
| 1213 |
+
else:
|
| 1214 |
+
# If manual mask provided, binarize it directly
|
| 1215 |
+
mask_np = (mask_np > 127).astype(np.uint8)
|
| 1216 |
+
|
| 1217 |
+
# Recalculate length, width and area using the adjusted or manual mask
|
| 1218 |
+
try:
|
| 1219 |
+
length_cm, breadth_cm, area_cm2 = self._refine_metrics_from_mask(mask_np, px_per_cm)
|
| 1220 |
+
visual["length_cm"] = length_cm
|
| 1221 |
+
visual["breadth_cm"] = breadth_cm
|
| 1222 |
+
visual["surface_area_cm2"] = area_cm2
|
| 1223 |
+
# Indicate that segmentation was refined manually or adjusted
|
| 1224 |
+
visual["segmentation_refined"] = bool(manual_mask_path) or (abs(seg_adjust) >= 1e-5)
|
| 1225 |
+
except Exception as e:
|
| 1226 |
+
logging.warning(f"Failed to recalculate metrics from mask: {e}")
|
| 1227 |
+
result["visual_analysis"] = visual
|
| 1228 |
+
return result
|
| 1229 |
except Exception as e:
|
| 1230 |
logging.error(f"Wound analysis error: {e}")
|
| 1231 |
return {
|