Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import logging | |
| from ultralytics import YOLO | |
| import numpy as np | |
| import cv2 | |
| from io import BytesIO | |
| from PIL import Image | |
| import base64 | |
| import os | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="Damage Detection API") | |
| # Log model file presence | |
| model_file = "damage_general_model.pt" | |
| if os.path.exists(model_file): | |
| logger.info(f"Model file found: {model_file}") | |
| else: | |
| logger.error(f"Model file missing: {model_file}") | |
| raise RuntimeError(f"Model file missing: {model_file}") | |
| # Load YOLO model | |
| try: | |
| logger.info("Loading damage model...") | |
| damage_model = YOLO(model_file) | |
| logger.info("Damage model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to load model: {str(e)}") | |
| raise RuntimeError(f"Failed to load model: {str(e)}") | |
| def image_to_base64(img: np.ndarray) -> str: | |
| """Convert numpy image to base64 string.""" | |
| try: | |
| _, buffer = cv2.imencode(".png", img) | |
| return base64.b64encode(buffer).decode("utf-8") | |
| except Exception as e: | |
| logger.error(f"Error encoding image to base64: {str(e)}") | |
| raise | |
| async def predict(file: UploadFile = File(...)): | |
| """Upload an image and get damage detection results.""" | |
| logger.info("Received image upload") | |
| try: | |
| contents = await file.read() | |
| image = Image.open(BytesIO(contents)).convert("RGB") | |
| img = np.array(image) | |
| logger.info(f"Image loaded: shape={img.shape}") | |
| blank_img = np.full((img.shape[0], img.shape[1], 3), 128, dtype=np.uint8) | |
| damage_img = blank_img.copy() | |
| damage_text = "Damage: No detections" | |
| try: | |
| logger.info("Running damage detection...") | |
| damage_results = damage_model(img)[0] | |
| if damage_results.boxes: | |
| damage_img = damage_results.plot()[..., ::-1] | |
| damage_text = "Damage:\n" + "\n".join( | |
| f"- {damage_results.names[int(cls)]} ({conf:.2f})" | |
| for conf, cls in zip(damage_results.boxes.conf, damage_results.boxes.cls) | |
| ) | |
| logger.info("Damage detection completed") | |
| except Exception as e: | |
| damage_text = f"Damage: Error: {str(e)}" | |
| logger.error(f"Damage detection error: {str(e)}") | |
| damage_img_base64 = image_to_base64(damage_img) | |
| logger.info("Returning prediction results") | |
| return JSONResponse({ | |
| "damage_image": damage_img_base64, | |
| "damage_text": damage_text | |
| }) | |
| except Exception as e: | |
| logger.error(f"Inference error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Inference error: {str(e)}") | |
| async def root(): | |
| """Check if the API is running.""" | |
| logger.info("Health check accessed") | |
| return {"message": "Damage Detection API is running"} |