Spaces:
Sleeping
Sleeping
| import logging | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import json | |
| from datetime import datetime | |
| import os | |
| # Try to import AI libraries (graceful fallback if not available) | |
| try: | |
| from transformers import pipeline | |
| TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| TRANSFORMERS_AVAILABLE = False | |
| logging.warning("Transformers not available") | |
| try: | |
| from ultralytics import YOLO | |
| YOLO_AVAILABLE = True | |
| except ImportError: | |
| YOLO_AVAILABLE = False | |
| logging.warning("YOLO not available") | |
| try: | |
| import tensorflow as tf | |
| TF_AVAILABLE = True | |
| except ImportError: | |
| TF_AVAILABLE = False | |
| logging.warning("TensorFlow not available") | |
| class WoundAnalyzer: | |
| """AI-powered wound analysis system""" | |
| def __init__(self, config): | |
| """Initialize wound analyzer with configuration""" | |
| self.config = config | |
| self.models_loaded = False | |
| self.load_models() | |
| def load_models(self): | |
| """Load AI models for wound analysis""" | |
| try: | |
| # Load models if libraries are available | |
| if TRANSFORMERS_AVAILABLE: | |
| try: | |
| # Load a general image classification model | |
| self.image_classifier = pipeline( | |
| "image-classification", | |
| model="google/vit-base-patch16-224", | |
| token=self.config.HF_TOKEN | |
| ) | |
| logging.info("✅ Image classification model loaded") | |
| except Exception as e: | |
| logging.warning(f"Could not load image classifier: {e}") | |
| self.image_classifier = None | |
| if YOLO_AVAILABLE: | |
| try: | |
| # Try to load YOLO model (will download if not present) | |
| self.yolo_model = YOLO('yolov8n.pt') | |
| logging.info("✅ YOLO model loaded") | |
| except Exception as e: | |
| logging.warning(f"Could not load YOLO model: {e}") | |
| self.yolo_model = None | |
| self.models_loaded = True | |
| logging.info("✅ Wound analyzer initialized") | |
| except Exception as e: | |
| logging.error(f"Error loading models: {e}") | |
| self.models_loaded = False | |
| def analyze_wound(self, image, questionnaire_id): | |
| """Analyze wound image and return comprehensive results""" | |
| start_time = datetime.now() | |
| try: | |
| if not image: | |
| return self._create_error_result("No image provided") | |
| # Get questionnaire data for context | |
| questionnaire_data = self._get_questionnaire_data(questionnaire_id) | |
| # Convert image to various formats for analysis | |
| cv_image = self._pil_to_cv2(image) | |
| np_image = np.array(image) | |
| # Perform basic image analysis | |
| basic_analysis = self._basic_image_analysis(cv_image, np_image) | |
| # Perform AI analysis if models are available | |
| ai_analysis = self._ai_image_analysis(image) | |
| # Use enhanced AI processor if available | |
| try: | |
| from .ai_processor import AIProcessor | |
| ai_processor = AIProcessor() | |
| # Perform visual analysis using AI processor | |
| visual_results = ai_processor.perform_visual_analysis(image) | |
| # Query clinical guidelines | |
| query = f"wound care {questionnaire_data.get('wound_location', '')} {questionnaire_data.get('diabetic_status', '')}" | |
| guideline_context = ai_processor.query_guidelines(query) | |
| # Generate comprehensive report | |
| comprehensive_report = ai_processor.generate_final_report( | |
| questionnaire_data, visual_results, guideline_context, image | |
| ) | |
| # Merge AI processor results with basic analysis | |
| ai_analysis.update({ | |
| 'visual_analysis': visual_results, | |
| 'clinical_guidelines': guideline_context, | |
| 'comprehensive_report': comprehensive_report | |
| }) | |
| logging.info("Enhanced AI analysis completed") | |
| except Exception as e: | |
| logging.warning(f"Enhanced AI processor not available: {e}") | |
| # Combine results | |
| analysis_result = self._combine_analysis_results( | |
| basic_analysis, | |
| ai_analysis, | |
| questionnaire_id | |
| ) | |
| # Calculate processing time | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| analysis_result['processing_time'] = processing_time | |
| logging.info(f"Wound analysis completed in {processing_time:.2f} seconds") | |
| return analysis_result | |
| except Exception as e: | |
| logging.error(f"Wound analysis error: {e}") | |
| return self._create_error_result(f"Analysis failed: {str(e)}") | |
| def _get_questionnaire_data(self, questionnaire_id): | |
| """Get questionnaire data for analysis context""" | |
| try: | |
| # This should connect to the database to get questionnaire data | |
| # For now, return empty dict as fallback | |
| return {} | |
| except Exception as e: | |
| logging.warning(f"Could not fetch questionnaire data: {e}") | |
| return {} | |
| def _pil_to_cv2(self, pil_image): | |
| """Convert PIL image to OpenCV format""" | |
| try: | |
| # Convert PIL to RGB if not already | |
| if pil_image.mode != 'RGB': | |
| pil_image = pil_image.convert('RGB') | |
| # Convert to numpy array and then to OpenCV format | |
| np_array = np.array(pil_image) | |
| cv_image = cv2.cvtColor(np_array, cv2.COLOR_RGB2BGR) | |
| return cv_image | |
| except Exception as e: | |
| logging.error(f"Error converting PIL to CV2: {e}") | |
| return None | |
| def _basic_image_analysis(self, cv_image, np_image): | |
| """Perform basic image analysis using OpenCV""" | |
| try: | |
| analysis = {} | |
| if cv_image is not None: | |
| # Image properties | |
| height, width = cv_image.shape[:2] | |
| analysis['dimensions'] = f"{width}x{height}" | |
| analysis['image_quality'] = self._assess_image_quality(cv_image) | |
| # Color analysis | |
| analysis['color_analysis'] = self._analyze_colors(cv_image) | |
| # Texture analysis | |
| analysis['texture_analysis'] = self._analyze_texture(cv_image) | |
| # Edge detection for wound boundaries | |
| analysis['edge_analysis'] = self._analyze_edges(cv_image) | |
| return analysis | |
| except Exception as e: | |
| logging.error(f"Basic image analysis error: {e}") | |
| return {} | |
| def _assess_image_quality(self, cv_image): | |
| """Assess image quality metrics""" | |
| try: | |
| # Calculate sharpness using Laplacian variance | |
| gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) | |
| sharpness = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| # Calculate brightness | |
| brightness = np.mean(cv_image) | |
| # Calculate contrast | |
| contrast = np.std(cv_image) | |
| # Determine quality rating | |
| if sharpness > 500 and 50 < brightness < 200 and contrast > 30: | |
| quality = "Good" | |
| elif sharpness > 100 and 30 < brightness < 230 and contrast > 15: | |
| quality = "Fair" | |
| else: | |
| quality = "Poor" | |
| return { | |
| 'sharpness': float(sharpness), | |
| 'brightness': float(brightness), | |
| 'contrast': float(contrast), | |
| 'overall_quality': quality | |
| } | |
| except Exception as e: | |
| logging.error(f"Image quality assessment error: {e}") | |
| return {'overall_quality': 'Unknown'} | |
| def _analyze_colors(self, cv_image): | |
| """Analyze color properties of the wound""" | |
| try: | |
| # Convert to HSV for better color analysis | |
| hsv = cv2.cvtColor(cv_image, cv2.COLOR_BGR2HSV) | |
| # Calculate color statistics | |
| mean_hue = np.mean(hsv[:, :, 0]) | |
| mean_saturation = np.mean(hsv[:, :, 1]) | |
| mean_value = np.mean(hsv[:, :, 2]) | |
| # Detect dominant colors | |
| dominant_colors = self._get_dominant_colors(cv_image) | |
| return { | |
| 'mean_hue': float(mean_hue), | |
| 'mean_saturation': float(mean_saturation), | |
| 'mean_value': float(mean_value), | |
| 'dominant_colors': dominant_colors | |
| } | |
| except Exception as e: | |
| logging.error(f"Color analysis error: {e}") | |
| return {} | |
| def _get_dominant_colors(self, cv_image, k=3): | |
| """Get dominant colors in the image""" | |
| try: | |
| # Reshape image to be a list of pixels | |
| data = cv_image.reshape((-1, 3)) | |
| data = np.float32(data) | |
| # Apply k-means clustering | |
| criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0) | |
| _, labels, centers = cv2.kmeans(data, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS) | |
| # Convert back to uint8 and get color names | |
| centers = np.uint8(centers) | |
| dominant_colors = [] | |
| for center in centers: | |
| color_name = self._classify_color(center) | |
| dominant_colors.append({ | |
| 'rgb': center.tolist(), | |
| 'name': color_name | |
| }) | |
| return dominant_colors | |
| except Exception as e: | |
| logging.error(f"Dominant colors error: {e}") | |
| return [] | |
| def _classify_color(self, rgb_color): | |
| """Classify RGB color into medical color categories""" | |
| r, g, b = rgb_color | |
| # Simple color classification for wound assessment | |
| if r > 150 and g < 100 and b < 100: | |
| return "Red/Inflammatory" | |
| elif r > 150 and g > 150 and b < 100: | |
| return "Yellow/Exudate" | |
| elif r < 100 and g < 100 and b < 100: | |
| return "Dark/Necrotic" | |
| elif r > 200 and g > 200 and b > 200: | |
| return "White/Pale" | |
| elif r > 100 and g > 50 and b < 100: | |
| return "Pink/Healthy" | |
| else: | |
| return "Mixed/Other" | |
| def _analyze_texture(self, cv_image): | |
| """Analyze texture properties""" | |
| try: | |
| gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) | |
| # Calculate Local Binary Pattern (simplified) | |
| texture_variance = np.var(gray) | |
| texture_mean = np.mean(gray) | |
| # Determine texture category | |
| if texture_variance > 1000: | |
| texture_type = "Rough/Irregular" | |
| elif texture_variance > 500: | |
| texture_type = "Moderate" | |
| else: | |
| texture_type = "Smooth" | |
| return { | |
| 'variance': float(texture_variance), | |
| 'mean': float(texture_mean), | |
| 'type': texture_type | |
| } | |
| except Exception as e: | |
| logging.error(f"Texture analysis error: {e}") | |
| return {} | |
| def _analyze_edges(self, cv_image): | |
| """Analyze edges for wound boundary detection""" | |
| try: | |
| gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) | |
| # Apply Canny edge detection | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Count edge pixels | |
| edge_count = np.sum(edges > 0) | |
| total_pixels = edges.shape[0] * edges.shape[1] | |
| edge_ratio = edge_count / total_pixels | |
| # Determine wound boundary clarity | |
| if edge_ratio > 0.1: | |
| boundary_clarity = "Well-defined" | |
| elif edge_ratio > 0.05: | |
| boundary_clarity = "Moderately-defined" | |
| else: | |
| boundary_clarity = "Poorly-defined" | |
| return { | |
| 'edge_count': int(edge_count), | |
| 'edge_ratio': float(edge_ratio), | |
| 'boundary_clarity': boundary_clarity | |
| } | |
| except Exception as e: | |
| logging.error(f"Edge analysis error: {e}") | |
| return {} | |
| def _ai_image_analysis(self, image): | |
| """Perform AI-based image analysis""" | |
| try: | |
| ai_results = {} | |
| # Use image classifier if available | |
| if hasattr(self, 'image_classifier') and self.image_classifier: | |
| try: | |
| classification = self.image_classifier(image) | |
| ai_results['classification'] = classification[:3] # Top 3 results | |
| except Exception as e: | |
| logging.warning(f"Image classification failed: {e}") | |
| # Use YOLO for object detection if available | |
| if hasattr(self, 'yolo_model') and self.yolo_model: | |
| try: | |
| detection_results = self.yolo_model(image) | |
| ai_results['object_detection'] = self._process_yolo_results(detection_results) | |
| except Exception as e: | |
| logging.warning(f"YOLO detection failed: {e}") | |
| return ai_results | |
| except Exception as e: | |
| logging.error(f"AI image analysis error: {e}") | |
| return {} | |
| def _process_yolo_results(self, results): | |
| """Process YOLO detection results""" | |
| try: | |
| processed_results = [] | |
| for result in results: | |
| if hasattr(result, 'boxes') and result.boxes: | |
| for box in result.boxes: | |
| processed_results.append({ | |
| 'confidence': float(box.conf.item()) if hasattr(box, 'conf') else 0.0, | |
| 'class_name': result.names.get(int(box.cls.item()), 'unknown') if hasattr(box, 'cls') else 'unknown' | |
| }) | |
| return processed_results | |
| except Exception as e: | |
| logging.error(f"YOLO results processing error: {e}") | |
| return [] | |
| def _combine_analysis_results(self, basic_analysis, ai_analysis, questionnaire_id): | |
| """Combine all analysis results into a comprehensive report""" | |
| try: | |
| # Create comprehensive analysis result | |
| result = { | |
| 'questionnaire_id': questionnaire_id, | |
| 'basic_analysis': basic_analysis, | |
| 'ai_analysis': ai_analysis, | |
| 'model_version': 'SmartHeal-v1.0' | |
| } | |
| # Generate summary | |
| result['summary'] = self._generate_summary(basic_analysis, ai_analysis) | |
| # Generate recommendations | |
| result['recommendations'] = self._generate_recommendations(basic_analysis, ai_analysis) | |
| # Calculate risk assessment | |
| result['risk_assessment'] = self._calculate_risk_assessment(basic_analysis, ai_analysis) | |
| result['risk_level'] = result['risk_assessment']['level'] | |
| result['risk_score'] = result['risk_assessment']['score'] | |
| # Determine wound type | |
| result['wound_type'] = self._determine_wound_type(basic_analysis, ai_analysis) | |
| # Extract wound dimensions | |
| result['wound_dimensions'] = basic_analysis.get('dimensions', 'Unknown') | |
| return result | |
| except Exception as e: | |
| logging.error(f"Results combination error: {e}") | |
| return self._create_error_result("Failed to combine analysis results") | |
| def _generate_summary(self, basic_analysis, ai_analysis): | |
| """Generate analysis summary""" | |
| try: | |
| summary_parts = [] | |
| # Image quality assessment | |
| if 'image_quality' in basic_analysis: | |
| quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown') | |
| summary_parts.append(f"Image quality: {quality}") | |
| # Color analysis summary | |
| if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: | |
| colors = basic_analysis['color_analysis']['dominant_colors'] | |
| if colors: | |
| color_names = [color['name'] for color in colors[:2]] | |
| summary_parts.append(f"Dominant colors: {', '.join(color_names)}") | |
| # Texture summary | |
| if 'texture_analysis' in basic_analysis: | |
| texture_type = basic_analysis['texture_analysis'].get('type', 'Unknown') | |
| summary_parts.append(f"Texture: {texture_type}") | |
| # Boundary clarity | |
| if 'edge_analysis' in basic_analysis: | |
| boundary = basic_analysis['edge_analysis'].get('boundary_clarity', 'Unknown') | |
| summary_parts.append(f"Wound boundaries: {boundary}") | |
| # AI classification summary | |
| if 'classification' in ai_analysis and ai_analysis['classification']: | |
| top_class = ai_analysis['classification'][0] | |
| summary_parts.append(f"AI classification: {top_class.get('label', 'Unknown')}") | |
| summary = "Wound Analysis Summary: " + "; ".join(summary_parts) if summary_parts else "Basic wound analysis completed." | |
| return summary | |
| except Exception as e: | |
| logging.error(f"Summary generation error: {e}") | |
| return "Wound analysis completed with limited information due to processing constraints." | |
| def _generate_recommendations(self, basic_analysis, ai_analysis): | |
| """Generate treatment recommendations based on analysis""" | |
| try: | |
| recommendations = [] | |
| # Image quality recommendations | |
| if 'image_quality' in basic_analysis: | |
| quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown') | |
| if quality == 'Poor': | |
| recommendations.append("Consider retaking the image with better lighting and focus for more accurate analysis.") | |
| # Color-based recommendations | |
| if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: | |
| colors = basic_analysis['color_analysis']['dominant_colors'] | |
| for color in colors: | |
| color_name = color.get('name', '') | |
| if 'Red/Inflammatory' in color_name: | |
| recommendations.append("Red coloration may indicate inflammation. Monitor for infection signs.") | |
| elif 'Yellow/Exudate' in color_name: | |
| recommendations.append("Yellow areas suggest possible exudate. Consider wound cleansing.") | |
| elif 'Dark/Necrotic' in color_name: | |
| recommendations.append("Dark areas may indicate necrotic tissue. Consult for debridement evaluation.") | |
| elif 'Pink/Healthy' in color_name: | |
| recommendations.append("Pink coloration suggests healthy granulation tissue - positive healing sign.") | |
| # Texture-based recommendations | |
| if 'texture_analysis' in basic_analysis: | |
| texture_type = basic_analysis['texture_analysis'].get('type', '') | |
| if 'Rough/Irregular' in texture_type: | |
| recommendations.append("Irregular texture may require specialized wound care approach.") | |
| # Boundary-based recommendations | |
| if 'edge_analysis' in basic_analysis: | |
| boundary = basic_analysis['edge_analysis'].get('boundary_clarity', '') | |
| if 'Poorly-defined' in boundary: | |
| recommendations.append("Poorly defined wound edges may indicate ongoing tissue breakdown.") | |
| # General recommendations | |
| recommendations.extend([ | |
| "Continue regular wound monitoring and documentation.", | |
| "Maintain appropriate wound hygiene and dressing protocols.", | |
| "Consult healthcare provider for persistent or worsening symptoms.", | |
| "Follow established wound care guidelines for optimal healing." | |
| ]) | |
| return "; ".join(recommendations) if recommendations else "Standard wound care protocols recommended." | |
| except Exception as e: | |
| logging.error(f"Recommendations generation error: {e}") | |
| return "Consult healthcare provider for appropriate wound care recommendations." | |
| def _calculate_risk_assessment(self, basic_analysis, ai_analysis): | |
| """Calculate risk assessment based on analysis""" | |
| try: | |
| risk_score = 0 | |
| risk_factors = [] | |
| # Image quality factor | |
| if 'image_quality' in basic_analysis: | |
| quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown') | |
| if quality == 'Poor': | |
| risk_score += 10 | |
| risk_factors.append("Poor image quality") | |
| # Color-based risk factors | |
| if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: | |
| colors = basic_analysis['color_analysis']['dominant_colors'] | |
| for color in colors: | |
| color_name = color.get('name', '') | |
| if 'Dark/Necrotic' in color_name: | |
| risk_score += 30 | |
| risk_factors.append("Possible necrotic tissue") | |
| elif 'Red/Inflammatory' in color_name: | |
| risk_score += 20 | |
| risk_factors.append("Signs of inflammation") | |
| elif 'Yellow/Exudate' in color_name: | |
| risk_score += 15 | |
| risk_factors.append("Possible exudate") | |
| # Texture risk factors | |
| if 'texture_analysis' in basic_analysis: | |
| texture_type = basic_analysis['texture_analysis'].get('type', '') | |
| if 'Rough/Irregular' in texture_type: | |
| risk_score += 10 | |
| risk_factors.append("Irregular texture") | |
| # Boundary risk factors | |
| if 'edge_analysis' in basic_analysis: | |
| boundary = basic_analysis['edge_analysis'].get('boundary_clarity', '') | |
| if 'Poorly-defined' in boundary: | |
| risk_score += 15 | |
| risk_factors.append("Poorly defined boundaries") | |
| # Determine risk level | |
| if risk_score >= 50: | |
| risk_level = "High" | |
| elif risk_score >= 25: | |
| risk_level = "Moderate" | |
| elif risk_score >= 10: | |
| risk_level = "Low" | |
| else: | |
| risk_level = "Minimal" | |
| return { | |
| 'score': min(risk_score, 100), # Cap at 100 | |
| 'level': risk_level, | |
| 'factors': risk_factors | |
| } | |
| except Exception as e: | |
| logging.error(f"Risk assessment error: {e}") | |
| return { | |
| 'score': 0, | |
| 'level': 'Unknown', | |
| 'factors': ['Assessment error'] | |
| } | |
| def _determine_wound_type(self, basic_analysis, ai_analysis): | |
| """Determine wound type based on analysis""" | |
| try: | |
| # This is a simplified wound type determination | |
| # In a real system, this would use more sophisticated ML models | |
| wound_characteristics = [] | |
| # Color-based characteristics | |
| if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: | |
| colors = basic_analysis['color_analysis']['dominant_colors'] | |
| for color in colors: | |
| color_name = color.get('name', '') | |
| if 'Red/Inflammatory' in color_name: | |
| wound_characteristics.append("inflammatory") | |
| elif 'Pink/Healthy' in color_name: | |
| wound_characteristics.append("granulating") | |
| elif 'Yellow/Exudate' in color_name: | |
| wound_characteristics.append("exudative") | |
| elif 'Dark/Necrotic' in color_name: | |
| wound_characteristics.append("necrotic") | |
| # Determine primary wound type | |
| if "necrotic" in wound_characteristics: | |
| return "Necrotic wound" | |
| elif "inflammatory" in wound_characteristics and "exudative" in wound_characteristics: | |
| return "Infected/Inflammatory wound" | |
| elif "granulating" in wound_characteristics: | |
| return "Healing/Granulating wound" | |
| elif "exudative" in wound_characteristics: | |
| return "Exudative wound" | |
| else: | |
| return "Acute wound" | |
| except Exception as e: | |
| logging.error(f"Wound type determination error: {e}") | |
| return "Undetermined wound type" | |
| def _create_error_result(self, error_message): | |
| """Create error result structure""" | |
| return { | |
| 'error': True, | |
| 'summary': f"Analysis Error: {error_message}", | |
| 'recommendations': "Please ensure image quality is adequate and try again. Consult healthcare provider if issues persist.", | |
| 'risk_level': 'Unknown', | |
| 'risk_score': 0, | |
| 'wound_type': 'Unknown', | |
| 'wound_dimensions': 'Unknown', | |
| 'processing_time': 0.0, | |
| 'model_version': 'SmartHeal-v1.0' | |
| } | |