Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| from datetime import datetime | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from huggingface_hub import HfApi, HfFolder | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| # =============== LOGGING SETUP =============== | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # =============== CONFIGURATION =============== | |
| UPLOADS_DIR = "uploads" | |
| if not os.path.exists(UPLOADS_DIR): | |
| os.makedirs(UPLOADS_DIR) | |
| logging.info(f"Created uploads directory: {UPLOADS_DIR}") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| YOLO_MODEL_PATH = "best.pt" | |
| SEG_MODEL_PATH = "segmentation_model.h5" | |
| GUIDELINE_PDFS = ["eHealth in Wound Care.pdf", "IWGDF Guideline.pdf", "evaluation.pdf"] | |
| DATASET_ID = "SmartHeal/wound-image-uploads" | |
| MAX_NEW_TOKENS = 2048 | |
| PIXELS_PER_CM = 38 | |
| # =============== GLOBAL CACHES =============== | |
| models_cache = {} | |
| knowledge_base_cache = {} | |
| # =============== LAZY LOADING FUNCTIONS (CPU-SAFE) =============== | |
| def load_yolo_model(yolo_model_path): | |
| """Lazy import and load YOLO model to avoid CUDA initialization.""" | |
| from ultralytics import YOLO | |
| return YOLO(yolo_model_path) | |
| def load_segmentation_model(seg_model_path): | |
| """Lazy import and load segmentation model.""" | |
| import tensorflow as tf | |
| tf.config.set_visible_devices([], 'GPU') # Force CPU for TensorFlow | |
| from tensorflow.keras.models import load_model | |
| return load_model(seg_model_path, compile=False) | |
| def load_classification_pipeline(hf_token): | |
| """Lazy import and load classification pipeline (CPU only).""" | |
| from transformers import pipeline | |
| return pipeline( | |
| "image-classification", | |
| model="Hemg/Wound-classification", | |
| token=hf_token, | |
| device="cpu" | |
| ) | |
| def load_embedding_model(): | |
| """Load embedding model for knowledge base.""" | |
| return HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": "cpu"} | |
| ) | |
| # =============== MODEL INITIALIZATION =============== | |
| def initialize_cpu_models(): | |
| """Initialize all CPU-only models once.""" | |
| global models_cache | |
| if HF_TOKEN: | |
| HfFolder.save_token(HF_TOKEN) | |
| logging.info("β HuggingFace token set") | |
| if "det" not in models_cache: | |
| try: | |
| models_cache["det"] = load_yolo_model(YOLO_MODEL_PATH) | |
| logging.info("β YOLO model loaded (CPU only)") | |
| except Exception as e: | |
| logging.error(f"YOLO load failed: {e}") | |
| if "seg" not in models_cache: | |
| try: | |
| models_cache["seg"] = load_segmentation_model(SEG_MODEL_PATH) | |
| logging.info("β Segmentation model loaded (CPU)") | |
| except Exception as e: | |
| logging.warning(f"Segmentation model not available: {e}") | |
| if "cls" not in models_cache: | |
| try: | |
| models_cache["cls"] = load_classification_pipeline(HF_TOKEN) | |
| logging.info("β Classification pipeline loaded (CPU)") | |
| except Exception as e: | |
| logging.warning(f"Classification pipeline not available: {e}") | |
| if "embedding_model" not in models_cache: | |
| try: | |
| models_cache["embedding_model"] = load_embedding_model() | |
| logging.info("β Embedding model loaded (CPU)") | |
| except Exception as e: | |
| logging.warning(f"Embedding model not available: {e}") | |
| def setup_knowledge_base(): | |
| """Load PDF documents and create FAISS vector store.""" | |
| global knowledge_base_cache | |
| if "vector_store" in knowledge_base_cache: | |
| return | |
| docs = [] | |
| for pdf_path in GUIDELINE_PDFS: | |
| if os.path.exists(pdf_path): | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| docs.extend(loader.load()) | |
| logging.info(f"Loaded PDF: {pdf_path}") | |
| except Exception as e: | |
| logging.warning(f"Failed to load PDF {pdf_path}: {e}") | |
| if docs and "embedding_model" in models_cache: | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| chunks = splitter.split_documents(docs) | |
| knowledge_base_cache["vector_store"] = FAISS.from_documents(chunks, models_cache["embedding_model"]) | |
| logging.info(f"β Knowledge base ready with {len(chunks)} chunks") | |
| else: | |
| knowledge_base_cache["vector_store"] = None | |
| logging.warning("Knowledge base unavailable") | |
| # Initialize models on app startup | |
| initialize_cpu_models() | |
| setup_knowledge_base() | |
| # =============== GPU-DECORATED MEDGEMMA FUNCTION =============== | |
| def generate_medgemma_report( | |
| patient_info, | |
| visual_results, | |
| guideline_context, | |
| image_pil, | |
| max_new_tokens=None, | |
| ): | |
| """GPU-only function for MedGemma report generation - EXACTLY like working reference.""" | |
| from transformers import pipeline | |
| # Lazy-load MedGemma pipeline on GPU - EXACTLY like working reference | |
| if not hasattr(generate_medgemma_report, "_pipe"): | |
| try: | |
| generate_medgemma_report._pipe = pipeline( | |
| "image-text-to-text", | |
| model="google/medgemma-4b-it", | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| token=HF_TOKEN | |
| ) | |
| logging.info("β MedGemma pipeline loaded on GPU") | |
| except Exception as e: | |
| logging.warning(f"MedGemma pipeline load failed: {e}") | |
| return None | |
| pipe = generate_medgemma_report._pipe | |
| # Use the EXACT prompt format from the working reference | |
| prompt = f""" | |
| π©Ί You are SmartHeal-AI Agent, a world-class wound care AI specialist trained in clinical wound assessment and guideline-based treatment planning. | |
| Your task is to process the following structured inputs (patient data, wound measurements, clinical guidelines, and image) and perform **clinical reasoning and decision-making** to generate a complete wound care report. | |
| --- | |
| π **YOUR PROCESS β FOLLOW STRICTLY:** | |
| ### Step 1: Clinical Reasoning (Chain-of-Thought) | |
| Use the provided information to think step-by-step about: | |
| - Patient's risk factors (e.g. diabetes, age, healing limitations) | |
| - Wound characteristics (size, tissue appearance, moisture, infection signs) | |
| - Visual clues from the image (location, granulation, maceration, inflammation, surrounding skin) | |
| - Clinical guidelines provided β selectively choose the ones most relevant to this case | |
| Do NOT list all guidelines verbatim. Use judgment: apply them where relevant. Explain why or why not. | |
| Also assess whether this wound appears: | |
| - Acute vs chronic | |
| - Surgical vs traumatic | |
| - Inflammatory vs proliferative healing phase | |
| --- | |
| ### Step 2: Structured Clinical Report | |
| Generate the following report sections using markdown and medical terminology: | |
| #### **1. Clinical Summary** | |
| - Describe wound appearance and tissue types (e.g., slough, necrotic, granulating, epithelializing) | |
| - Include size, wound bed condition, peri-wound skin, and signs of infection or biofilm | |
| - Mention inferred location (e.g., heel, forefoot) if image allows | |
| - Summarize patient's systemic risk profile | |
| #### **2. Medicinal & Dressing Recommendations** | |
| Based on your analysis: | |
| - Recommend specific **wound care dressings** (e.g., hydrocolloid, alginate, foam, antimicrobial silver, etc.) suitable to wound moisture level and infection risk | |
| - Propose **topical or systemic agents** ONLY if relevant β include name classes (e.g., antiseptic: povidone iodine, antibiotic ointments, enzymatic debriders) | |
| - Mention **techniques** (e.g., sharp debridement, NPWT, moisture balance, pressure offloading, dressing frequency) | |
| - Avoid repeating guidelines β **apply them** | |
| #### **3. Key Risk Factors** | |
| Explain how the patient's condition (e.g., diabetic, poor circulation, advanced age, poor hygiene) may affect wound healing | |
| #### **4. Prognosis & Monitoring Advice** | |
| - Mention how often wound should be reassessed | |
| - Indicate signs to monitor for deterioration or improvement | |
| - Include when escalation to specialist is necessary | |
| #### **5. Disclaimer** | |
| This is an AI-generated summary based on available data. It is not a substitute for clinical evaluation by a wound care professional. | |
| **Note:** Every dressing change is a chance for wound reassessment. Always perform a thorough wound evaluation at each dressing change. | |
| --- | |
| π§Ύ **INPUT DATA** | |
| **Patient Info:** | |
| {patient_info} | |
| **Wound Details:** | |
| - Type: {visual_results['wound_type']} | |
| - Size: {visual_results['length_cm']} Γ {visual_results['breadth_cm']} cm | |
| - Area: {visual_results['surface_area_cm2']} cmΒ² | |
| **Clinical Guideline Evidence:** | |
| {guideline_context} | |
| You may now begin your analysis and generate the two-part report. | |
| """ | |
| # Use EXACT message format from working reference | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": [{"type": "text", "text": "You are a world-class medical AI assistant. Follow the user's instructions precisely to perform a two-step analysis and generate a structured report."}], | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image_pil}, | |
| {"type": "text", "text": prompt}, | |
| ] | |
| } | |
| ] | |
| try: | |
| output = pipe( | |
| text=messages, | |
| max_new_tokens=max_new_tokens or MAX_NEW_TOKENS, | |
| do_sample=False, | |
| ) | |
| result = output[0]["generated_text"][-1].get("content", "").strip() | |
| return result if result else "β οΈ No content generated. Try reducing max tokens or input size." | |
| except Exception as e: | |
| logging.error(f"Failed to generate MedGemma report: {e}", exc_info=True) | |
| return f"β An error occurred while generating the report: {e}" | |
| # =============== AI PROCESSOR CLASS =============== | |
| class AIProcessor: | |
| def __init__(self): | |
| self.models_cache = models_cache | |
| self.knowledge_base_cache = knowledge_base_cache | |
| self.px_per_cm = PIXELS_PER_CM | |
| self.uploads_dir = UPLOADS_DIR | |
| self.dataset_id = DATASET_ID | |
| self.hf_token = HF_TOKEN | |
| def perform_visual_analysis(self, image_pil: Image.Image) -> dict: | |
| """Performs the full visual analysis pipeline - EXACTLY like working reference.""" | |
| try: | |
| # Convert PIL to OpenCV format | |
| image_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) | |
| # YOLO Detection - EXACTLY like working reference | |
| results = self.models_cache["det"].predict(image_cv, verbose=False, device="cpu") | |
| if not results or not results[0].boxes: | |
| raise ValueError("No wound could be detected.") | |
| box = results[0].boxes[0].xyxy[0].cpu().numpy().astype(int) | |
| detected_region_cv = image_cv[box[1]:box[3], box[0]:box[2]] | |
| # Segmentation - EXACTLY like working reference | |
| input_size = self.models_cache["seg"].input_shape[1:3] | |
| resized = cv2.resize(detected_region_cv, (input_size[1], input_size[0])) | |
| mask_pred = self.models_cache["seg"].predict(np.expand_dims(resized / 255.0, 0), verbose=0)[0] | |
| mask_np = (mask_pred[:, :, 0] > 0.5).astype(np.uint8) | |
| # Calculate measurements - EXACTLY like working reference | |
| contours, _ = cv2.findContours(mask_np, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| length, breadth, area = (0, 0, 0) | |
| if contours: | |
| cnt = max(contours, key=cv2.contourArea) | |
| x, y, w, h = cv2.boundingRect(cnt) | |
| length, breadth, area = round(h / self.px_per_cm, 2), round(w / self.px_per_cm, 2), round(cv2.contourArea(cnt) / (self.px_per_cm ** 2), 2) | |
| # Classification - EXACTLY like working reference | |
| detected_image_pil = Image.fromarray(cv2.cvtColor(detected_region_cv, cv2.COLOR_BGR2RGB)) | |
| wound_type = max(self.models_cache["cls"](detected_image_pil), key=lambda x: x["score"])["label"] | |
| # Save detection visualization | |
| det_vis = image_cv.copy() | |
| cv2.rectangle(det_vis, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2) | |
| os.makedirs(f"{self.uploads_dir}/analysis", exist_ok=True) | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| det_path = f"{self.uploads_dir}/analysis/detection_{ts}.png" | |
| cv2.imwrite(det_path, det_vis) | |
| # Save original image for reference | |
| original_path = f"{self.uploads_dir}/analysis/original_{ts}.png" | |
| cv2.imwrite(original_path, image_cv) | |
| # Save segmentation visualization if available | |
| seg_path = None | |
| if contours: | |
| mask_resized = cv2.resize(mask_np * 255, (detected_region_cv.shape[1], detected_region_cv.shape[0]), interpolation=cv2.INTER_NEAREST) | |
| overlay = detected_region_cv.copy() | |
| overlay[mask_resized > 127] = [0, 0, 255] # Red overlay for wound area | |
| seg_vis = cv2.addWeighted(detected_region_cv, 0.7, overlay, 0.3, 0) | |
| seg_path = f"{self.uploads_dir}/analysis/segmentation_{ts}.png" | |
| cv2.imwrite(seg_path, seg_vis) | |
| visual_results = { | |
| "wound_type": wound_type, | |
| "length_cm": length, | |
| "breadth_cm": breadth, | |
| "surface_area_cm2": area, | |
| "detection_confidence": float(results[0].boxes.conf[0].cpu().item()) if results[0].boxes.conf is not None else 0.0, | |
| "detection_image_path": det_path, | |
| "segmentation_image_path": seg_path, | |
| "original_image_path": original_path | |
| } | |
| return visual_results | |
| except Exception as e: | |
| logging.error(f"Visual analysis failed: {e}") | |
| raise e | |
| def query_guidelines(self, query: str) -> str: | |
| """Query the knowledge base for relevant information.""" | |
| try: | |
| vector_store = self.knowledge_base_cache.get("vector_store") | |
| if not vector_store: | |
| return "Knowledge base is not available." | |
| retriever = vector_store.as_retriever(search_kwargs={"k": 10}) | |
| docs = retriever.invoke(query) | |
| return "\n\n".join([f"Source: {doc.metadata.get('source', 'N/A')}, Page: {doc.metadata.get('page', 'N/A')}\nContent: {doc.page_content}" for doc in docs]) | |
| except Exception as e: | |
| logging.error(f"Guidelines query failed: {e}") | |
| return f"Guidelines query failed: {str(e)}" | |
| def generate_final_report( | |
| self, patient_info: str, visual_results: dict, guideline_context: str, | |
| image_pil: Image.Image, max_new_tokens: int = None | |
| ) -> str: | |
| """Generate final report using MedGemma GPU pipeline - EXACTLY like working reference.""" | |
| try: | |
| report = generate_medgemma_report( | |
| patient_info, visual_results, guideline_context, image_pil, max_new_tokens | |
| ) | |
| if report and report.strip() and not report.startswith("β") and not report.startswith("β οΈ"): | |
| return report | |
| else: | |
| logging.warning("MedGemma returned empty or error response, using fallback") | |
| return self._generate_fallback_report(patient_info, visual_results, guideline_context) | |
| except Exception as e: | |
| logging.error(f"MedGemma report generation failed: {e}") | |
| return self._generate_fallback_report(patient_info, visual_results, guideline_context) | |
| def _generate_fallback_report( | |
| self, patient_info: str, visual_results: dict, guideline_context: str | |
| ) -> str: | |
| """Generate fallback report if MedGemma fails.""" | |
| report = f"""# π©Ί SmartHeal AI - Wound Analysis Report | |
| ## π Patient Information | |
| {patient_info} | |
| ## π Visual Analysis Results | |
| - **Wound Type**: {visual_results.get('wound_type', 'Unknown')} | |
| - **Dimensions**: {visual_results.get('length_cm', 0)} cm Γ {visual_results.get('breadth_cm', 0)} cm | |
| - **Surface Area**: {visual_results.get('surface_area_cm2', 0)} cmΒ² | |
| - **Detection Confidence**: {visual_results.get('detection_confidence', 0):.2f} | |
| ## π Analysis Images | |
| - **Detection Image**: {visual_results.get('detection_image_path', 'N/A')} | |
| - **Segmentation Image**: {visual_results.get('segmentation_image_path', 'N/A')} | |
| ## π Clinical Guidelines Context | |
| {guideline_context[:1000]}{'...' if len(guideline_context) > 1000 else ''} | |
| ## π― Assessment Summary | |
| Based on the automated visual analysis, the wound has been classified as **{visual_results.get('wound_type', 'Unknown')}** with measurable dimensions. The detection confidence indicates the reliability of the automated assessment. | |
| ### Clinical Observations | |
| - **Wound Classification**: {visual_results.get('wound_type', 'Unspecified')} | |
| - **Approximate Size**: {visual_results.get('length_cm', 0)} Γ {visual_results.get('breadth_cm', 0)} cm | |
| - **Calculated Area**: {visual_results.get('surface_area_cm2', 0)} cmΒ² | |
| ## π General Recommendations | |
| 1. **Clinical Evaluation**: This automated analysis should be supplemented with professional clinical assessment | |
| 2. **Documentation**: Regular monitoring and documentation of wound progression is recommended | |
| 3. **Treatment Planning**: Develop appropriate treatment protocol based on wound characteristics and patient factors | |
| 4. **Follow-up**: Schedule appropriate follow-up intervals based on wound severity and healing progress | |
| ## β οΈ Important Clinical Notes | |
| - This is an automated analysis and should not replace professional medical judgment | |
| - All measurements are estimates based on computer vision algorithms | |
| - Clinical correlation is essential for proper diagnosis and treatment planning | |
| - Consider patient-specific factors not captured in this automated assessment | |
| ## π₯ Next Steps | |
| 1. **Professional Assessment**: Consult with a qualified wound care specialist | |
| 2. **Comprehensive Evaluation**: Consider patient's overall health status and comorbidities | |
| 3. **Treatment Protocol**: Develop individualized care plan based on clinical findings | |
| 4. **Monitoring Plan**: Establish regular assessment schedule | |
| ## βοΈ Disclaimer | |
| This automated analysis is provided for informational purposes only and does not constitute medical advice. Always consult with qualified healthcare professionals for proper diagnosis and treatment. This AI-generated report should be used as a supplementary tool alongside professional clinical assessment. | |
| --- | |
| *Generated by SmartHeal AI - Advanced Wound Care Analysis System* | |
| *Report Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* | |
| """ | |
| return report | |
| def save_and_commit_image(self, image_pil: Image.Image) -> str: | |
| """Save image locally and optionally commit to HF dataset.""" | |
| try: | |
| os.makedirs(self.uploads_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{timestamp}.png" | |
| path = os.path.join(self.uploads_dir, filename) | |
| # Save image | |
| image_pil.convert("RGB").save(path) | |
| logging.info(f"β Image saved locally: {path}") | |
| # Upload to HuggingFace dataset if configured | |
| if self.hf_token and self.dataset_id: | |
| try: | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=path, | |
| path_in_repo=f"images/{filename}", | |
| repo_id=self.dataset_id, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| commit_message=f"Upload wound image: {filename}" | |
| ) | |
| logging.info("β Image committed to HF dataset") | |
| except Exception as e: | |
| logging.warning(f"HF upload failed: {e}") | |
| return path | |
| except Exception as e: | |
| logging.error(f"Failed to save image: {e}") | |
| return "" | |
| def full_analysis_pipeline(self, image_pil: Image.Image, questionnaire_data: dict) -> dict: | |
| """Run full analysis pipeline - EXACTLY like working reference.""" | |
| try: | |
| # Save image first | |
| saved_path = self.save_and_commit_image(image_pil) | |
| logging.info(f"Image saved: {saved_path}") | |
| # Perform visual analysis | |
| visual_results = self.perform_visual_analysis(image_pil) | |
| logging.info(f"Visual analysis completed: {visual_results}") | |
| # Process questionnaire data - EXACTLY like working reference | |
| patient_info = f"Age: {questionnaire_data.get('age', 'N/A')}, Diabetic: {questionnaire_data.get('diabetic', 'N/A')}, Allergies: {questionnaire_data.get('allergies', 'N/A')}, Date of Wound Sustained: {questionnaire_data.get('date_of_injury', 'N/A')}, Professional Care: {questionnaire_data.get('professional_care', 'N/A')}, Oozing/Bleeding: {questionnaire_data.get('oozing_bleeding', 'N/A')}, Infection: {questionnaire_data.get('infection', 'N/A')}, Moisture: {questionnaire_data.get('moisture', 'N/A')}" | |
| # Query guidelines - EXACTLY like working reference | |
| query = f"best practices for managing a {visual_results['wound_type']} with moisture level '{questionnaire_data.get('moisture', 'unknown')}' and signs of infection '{questionnaire_data.get('infection', 'unknown')}' in a patient who is diabetic '{questionnaire_data.get('diabetic', 'unknown')}'" | |
| guideline_context = self.query_guidelines(query) | |
| logging.info("Guidelines queried successfully") | |
| # Generate final report | |
| report = self.generate_final_report(patient_info, visual_results, guideline_context, image_pil) | |
| logging.info("Report generated successfully") | |
| return { | |
| 'success': True, | |
| 'visual_analysis': visual_results, | |
| 'report': report, | |
| 'saved_image_path': saved_path, | |
| 'guideline_context': guideline_context[:500] + "..." if len(guideline_context) > 500 else guideline_context | |
| } | |
| except Exception as e: | |
| logging.error(f"Pipeline error: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'visual_analysis': {}, | |
| 'report': f"Analysis failed: {str(e)}", | |
| 'saved_image_path': None, | |
| 'guideline_context': "" | |
| } | |
| def analyze_wound(self, image, questionnaire_data: dict) -> dict: | |
| """Main analysis entry point - maintains original function name.""" | |
| try: | |
| # Handle different image input formats | |
| if isinstance(image, str): | |
| if os.path.exists(image): | |
| image_pil = Image.open(image) | |
| else: | |
| raise ValueError(f"Image file not found: {image}") | |
| elif isinstance(image, Image.Image): | |
| image_pil = image | |
| elif isinstance(image, np.ndarray): | |
| image_pil = Image.fromarray(image) | |
| else: | |
| raise ValueError(f"Unsupported image type: {type(image)}") | |
| return self.full_analysis_pipeline(image_pil, questionnaire_data) | |
| except Exception as e: | |
| logging.error(f"Wound analysis error: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'visual_analysis': {}, | |
| 'report': f"Analysis initialization failed: {str(e)}", | |
| 'saved_image_path': None, | |
| 'guideline_context': "" | |
| } | |
| def _assess_risk_legacy(self, questionnaire_data: dict) -> dict: | |
| """Legacy risk assessment function - maintains original function name.""" | |
| risk_factors = [] | |
| risk_score = 0 | |
| try: | |
| # Age assessment | |
| age = questionnaire_data.get('age', 0) | |
| if isinstance(age, str): | |
| try: | |
| age = int(age) | |
| except ValueError: | |
| age = 0 | |
| if age > 65: | |
| risk_factors.append("Advanced age (>65)") | |
| risk_score += 2 | |
| elif age > 50: | |
| risk_factors.append("Older adult (50-65)") | |
| risk_score += 1 | |
| # Wound duration assessment | |
| duration = str(questionnaire_data.get('wound_duration', '')).lower() | |
| if any(term in duration for term in ['month', 'months', 'year', 'years']): | |
| risk_factors.append("Chronic wound (>4 weeks)") | |
| risk_score += 3 | |
| elif any(term in duration for term in ['week', 'weeks']): | |
| # Try to extract number of weeks | |
| import re | |
| weeks_match = re.search(r'(\d+)\s*week', duration) | |
| if weeks_match and int(weeks_match.group(1)) > 4: | |
| risk_factors.append("Chronic wound (>4 weeks)") | |
| risk_score += 3 | |
| # Pain level assessment | |
| pain = questionnaire_data.get('pain_level', 0) | |
| if isinstance(pain, str): | |
| try: | |
| pain = float(pain) | |
| except ValueError: | |
| pain = 0 | |
| if pain >= 7: | |
| risk_factors.append("High pain level (β₯7/10)") | |
| risk_score += 2 | |
| elif pain >= 5: | |
| risk_factors.append("Moderate pain level (5-6/10)") | |
| risk_score += 1 | |
| # Medical history assessment | |
| medical_history = str(questionnaire_data.get('medical_history', '')).lower() | |
| diabetic_status = str(questionnaire_data.get('diabetic', '')).lower() | |
| if 'diabetes' in medical_history or 'yes' in diabetic_status: | |
| risk_factors.append("Diabetes mellitus") | |
| risk_score += 3 | |
| if any(term in medical_history for term in ['vascular', 'circulation', 'arterial', 'venous']): | |
| risk_factors.append("Vascular disease") | |
| risk_score += 2 | |
| if any(term in medical_history for term in ['immune', 'immunocompromised', 'steroid', 'chemotherapy']): | |
| risk_factors.append("Immune system compromise") | |
| risk_score += 2 | |
| if any(term in medical_history for term in ['smoking', 'smoker', 'tobacco']): | |
| risk_factors.append("Smoking history") | |
| risk_score += 2 | |
| # Infection signs | |
| infection_signs = str(questionnaire_data.get('infection', '')).lower() | |
| if 'yes' in infection_signs: | |
| risk_factors.append("Signs of infection present") | |
| risk_score += 3 | |
| # Moisture level | |
| moisture = str(questionnaire_data.get('moisture', '')).lower() | |
| if any(term in moisture for term in ['wet', 'heavy', 'excessive']): | |
| risk_factors.append("Excessive wound exudate") | |
| risk_score += 1 | |
| # Determine risk level | |
| if risk_score >= 8: | |
| risk_level = "Very High" | |
| elif risk_score >= 6: | |
| risk_level = "High" | |
| elif risk_score >= 3: | |
| risk_level = "Moderate" | |
| else: | |
| risk_level = "Low" | |
| return { | |
| 'risk_score': risk_score, | |
| 'risk_level': risk_level, | |
| 'risk_factors': risk_factors, | |
| 'recommendations': self._get_risk_recommendations(risk_level, risk_factors) | |
| } | |
| except Exception as e: | |
| logging.error(f"Risk assessment error: {e}") | |
| return { | |
| 'risk_score': 0, | |
| 'risk_level': 'Unknown', | |
| 'risk_factors': [], | |
| 'recommendations': ["Unable to assess risk due to data processing error"] | |
| } | |
| def _get_risk_recommendations(self, risk_level: str, risk_factors: list) -> list: | |
| """Generate risk-based recommendations.""" | |
| recommendations = [] | |
| if risk_level in ["High", "Very High"]: | |
| recommendations.append("Urgent referral to wound care specialist recommended") | |
| recommendations.append("Consider daily wound monitoring") | |
| recommendations.append("Implement aggressive wound care protocol") | |
| elif risk_level == "Moderate": | |
| recommendations.append("Regular wound care follow-up every 2-3 days") | |
| recommendations.append("Monitor for signs of deterioration") | |
| else: | |
| recommendations.append("Standard wound care monitoring") | |
| recommendations.append("Weekly assessment recommended") | |
| # Specific recommendations based on risk factors | |
| if "Diabetes mellitus" in risk_factors: | |
| recommendations.append("Strict glycemic control essential") | |
| recommendations.append("Monitor for diabetic complications") | |
| if "Signs of infection present" in risk_factors: | |
| recommendations.append("Consider antibiotic therapy") | |
| recommendations.append("Increase wound cleaning frequency") | |
| if "Excessive wound exudate" in risk_factors: | |
| recommendations.append("Use high-absorption dressings") | |
| recommendations.append("More frequent dressing changes may be needed") | |
| return recommendations | |
| # =============== STANDALONE SAVE AND COMMIT FUNCTION =============== | |
| def save_and_commit_image(image_to_save): | |
| """Saves an image locally and commits it to the separate HF Dataset repository - EXACTLY like working reference.""" | |
| if not image_to_save: | |
| return | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"{timestamp}.png" | |
| local_save_path = os.path.join(UPLOADS_DIR, filename) | |
| image_to_save.convert("RGB").save(local_save_path) | |
| logging.info(f"β Image saved to temporary local storage: {local_save_path}") | |
| if DATASET_ID and HF_TOKEN: | |
| try: | |
| api = HfApi() | |
| repo_path = f"images/{filename}" | |
| logging.info(f"Attempting to commit {local_save_path} to DATASET {DATASET_ID}...") | |
| api.upload_file( | |
| path_or_fileobj=local_save_path, | |
| path_in_repo=repo_path, | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| commit_message=f"Upload wound image: {filename}" | |
| ) | |
| logging.info(f"β Image successfully committed to dataset.") | |
| except Exception as e: | |
| logging.error(f"β FAILED TO COMMIT IMAGE TO DATASET: {e}") | |
| else: | |
| logging.warning("DATASET_ID or HF_TOKEN not set. Skipping file commit.") | |
| # =============== MAIN ANALYSIS FUNCTION (with @spaces.GPU) - EXACTLY LIKE WORKING REFERENCE =============== | |
| def analyze(image, age, diabetic, allergies, date_of_injury, professional_care, oozing_bleeding, infection, moisture): | |
| """Main analysis function with GPU decorator - EXACTLY like working reference.""" | |
| try: | |
| yield None, None, "β³ Initializing... Loading AI models..." | |
| # Load all models - using global cache | |
| if "medgemma_pipe" not in models_cache: | |
| from transformers import pipeline | |
| models_cache["medgemma_pipe"] = pipeline( | |
| "image-text-to-text", | |
| model="google/medgemma-4b-it", | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| token=HF_TOKEN | |
| ) | |
| logging.info("β All models loaded.") | |
| yield None, None, "β³ Setting up knowledge base from guidelines..." | |
| # Save image | |
| save_and_commit_image(image) | |
| # Create processor instance | |
| processor = AIProcessor() | |
| yield None, None, "β³ Performing visual analysis..." | |
| # Perform visual analysis - EXACTLY like working reference | |
| image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| results = models_cache["det"].predict(image_cv, verbose=False, device="cpu") | |
| if not results or not results[0].boxes: | |
| raise ValueError("No wound could be detected.") | |
| box = results[0].boxes[0].xyxy[0].cpu().numpy().astype(int) | |
| detected_region_cv = image_cv[box[1]:box[3], box[0]:box[2]] | |
| input_size = models_cache["seg"].input_shape[1:3] | |
| resized = cv2.resize(detected_region_cv, (input_size[1], input_size[0])) | |
| mask_pred = models_cache["seg"].predict(np.expand_dims(resized / 255.0, 0), verbose=0)[0] | |
| mask_np = (mask_pred[:, :, 0] > 0.5).astype(np.uint8) | |
| contours, _ = cv2.findContours(mask_np, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| length, breadth, area = (0, 0, 0) | |
| if contours: | |
| cnt = max(contours, key=cv2.contourArea) | |
| x, y, w, h = cv2.boundingRect(cnt) | |
| length, breadth, area = round(h / PIXELS_PER_CM, 2), round(w / PIXELS_PER_CM, 2), round(cv2.contourArea(cnt) / (PIXELS_PER_CM ** 2), 2) | |
| detected_image_pil = Image.fromarray(cv2.cvtColor(detected_region_cv, cv2.COLOR_BGR2RGB)) | |
| wound_type = max(models_cache["cls"](detected_image_pil), key=lambda x: x["score"])["label"] | |
| visual_results = {"wound_type": wound_type, "length_cm": length, "breadth_cm": breadth, "surface_area_cm2": area} | |
| # Create visualization images | |
| segmented_mask = Image.fromarray(cv2.resize(mask_np * 255, (detected_region_cv.shape[1], detected_region_cv.shape[0]), interpolation=cv2.INTER_NEAREST)) | |
| yield detected_image_pil, segmented_mask, f"β Visual analysis complete. Detected: {visual_results['wound_type']}. Querying guidelines..." | |
| # Query guidelines | |
| patient_info = f"Age: {age}, Diabetic: {diabetic}, Allergies: {allergies}, Date of Wound Sustained: {date_of_injury}, Professional Care: {professional_care}, Oozing/Bleeding: {oozing_bleeding}, Infection: {infection}, Moisture: {moisture}" | |
| query = f"best practices for managing a {visual_results['wound_type']} with moisture level '{moisture}' and signs of infection '{infection}' in a patient who is diabetic '{diabetic}'" | |
| guideline_context = processor.query_guidelines(query) | |
| yield detected_image_pil, segmented_mask, "β Guidelines queried. Generating final report..." | |
| # Generate final report using MedGemma | |
| final_report = generate_medgemma_report( | |
| patient_info, | |
| visual_results, | |
| guideline_context, | |
| image_pil=image | |
| ) | |
| visual_summary = f"""## π Programmatic Visual Analysis | |
| | Metric | Result | | |
| | :--- | :--- | | |
| | **Detected Wound Type** | {visual_results['wound_type']} | | |
| | **Estimated Dimensions** | {visual_results['length_cm']}cm x {visual_results['breadth_cm']}cm (Area: {visual_results['surface_area_cm2']}cmΒ²) | | |
| --- | |
| """ | |
| final_output_text = visual_summary + "## π©Ί MedHeal-AI Clinical Assessment\n" + final_report | |
| yield detected_image_pil, segmented_mask, final_output_text | |
| except Exception as e: | |
| logging.error(f"An error occurred during analysis: {e}", exc_info=True) | |
| yield None, None, f"β **An error occurred:** {e}" |