Spaces:
Runtime error
Runtime error
| # UVIS - Gradio App with Upload, URL & Video Support | |
| """ | |
| This script launches the UVIS (Unified Visual Intelligence System) as a Gradio Web App. | |
| Supports image, video, and URL-based media inputs for detection, segmentation, and depth estimation. | |
| Outputs include scene blueprint, structured JSON, and downloadable results. | |
| """ | |
| import gradio as gr | |
| from PIL import Image | |
| import numpy as np | |
| import os | |
| import io | |
| import zipfile | |
| import json | |
| import tempfile | |
| import logging | |
| import cv2 | |
| import requests | |
| from urllib.parse import urlparse | |
| from registry import get_model | |
| from core.describe_scene import describe_scene | |
| import uuid | |
| import time | |
| import timeout_decorator | |
| import socket | |
| import ipaddress | |
| from huggingface_hub import hf_hub_download | |
| import spaces | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Model mappings | |
| DETECTION_MODEL_MAP = { | |
| "YOLOv5-Nano": "yolov5n-seg", | |
| "YOLOv5-Small": "yolov5s-seg", | |
| "YOLOv8-Small": "yolov8s", | |
| "YOLOv8-Large": "yolov8l", | |
| "RT-DETR": "rtdetr" # For future support | |
| } | |
| SEGMENTATION_MODEL_MAP = { | |
| "SegFormer-B0": "nvidia/segformer-b0-finetuned-ade-512-512", | |
| "SegFormer-B5": "nvidia/segformer-b5-finetuned-ade-512-512", | |
| "DeepLabV3-ResNet50": "deeplabv3_resnet50" | |
| } | |
| DEPTH_MODEL_MAP = { | |
| "MiDaS v21 Small 256": "midas_v21_small_256", | |
| "MiDaS v21 384": "midas_v21_384", | |
| "DPT Hybrid 384": "dpt_hybrid_384", | |
| "DPT Swin2 Large 384": "dpt_swin2_large_384", | |
| "DPT Beit Large 512": "dpt_beit_large_512" | |
| } | |
| # Resource Limits | |
| MAX_IMAGE_MB = 5 | |
| MAX_IMAGE_RES = (1920, 1080) | |
| MAX_VIDEO_MB = 50 | |
| MAX_VIDEO_DURATION = 30 # seconds | |
| def preload_models(): | |
| """ | |
| This function is needed to activate ZeroGPU. It must be decorated with @spaces.GPU. | |
| It can be used to warm up models or load them into memory. | |
| """ | |
| from registry import get_model | |
| print("Warming up models for ZeroGPU...") | |
| get_model("detection", "yolov5n-seg", device="cpu") | |
| get_model("segmentation", "deeplabv3_resnet50", device="cpu") | |
| get_model("depth", "midas_v21_small_256", device="cpu") | |
| # Utility Functions | |
| def format_error(message): | |
| """Formats error messages for consistent user feedback.""" | |
| return {"error": message} | |
| def toggle_visibility(show, *components): | |
| """Toggles visibility for multiple Gradio components.""" | |
| return [gr.update(visible=show) for _ in components] | |
| def generate_session_id(): | |
| """Generates a unique session ID for tracking inputs.""" | |
| return str(uuid.uuid4()) | |
| def log_runtime(start_time): | |
| """Logs the runtime of a process.""" | |
| elapsed_time = time.time() - start_time | |
| logger.info(f"Process completed in {elapsed_time:.2f} seconds.") | |
| return elapsed_time | |
| def is_public_ip(url): | |
| """ | |
| Checks whether the resolved IP address of a URL is public (non-local). | |
| Prevents SSRF by blocking internal addresses like 127.0.0.1 or 192.168.x.x. | |
| """ | |
| try: | |
| hostname = urlparse(url).hostname | |
| ip = socket.gethostbyname(hostname) | |
| ip_obj = ipaddress.ip_address(ip) | |
| return ip_obj.is_global # Only allow globally routable IPs | |
| except Exception as e: | |
| logger.warning(f"URL IP validation failed: {e}") | |
| return False | |
| def fetch_media_from_url(url): | |
| """ | |
| Downloads media from a URL. Supports images and videos. | |
| Returns PIL.Image or video file path. | |
| """ | |
| logger.info(f"Fetching media from URL: {url}") | |
| if not is_public_ip(url): | |
| logger.warning("Blocked non-public URL request (possible SSRF).") | |
| return None | |
| try: | |
| parsed_url = urlparse(url) | |
| ext = os.path.splitext(parsed_url.path)[-1].lower() | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| r = requests.get(url, headers=headers, timeout=10) | |
| if r.status_code != 200 or len(r.content) > 50 * 1024 * 1024: | |
| logger.warning(f"Download failed or file too large.") | |
| return None | |
| tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) | |
| tmp_file.write(r.content) | |
| tmp_file.close() | |
| if ext in [".jpg", ".jpeg", ".png"]: | |
| return Image.open(tmp_file.name).convert("RGB") | |
| elif ext in [".mp4", ".avi", ".mov"]: | |
| return tmp_file.name | |
| else: | |
| logger.warning("Unsupported file type from URL.") | |
| return None | |
| except Exception as e: | |
| logger.error(f"URL fetch failed: {e}") | |
| return None | |
| # Input Validation Functions | |
| def validate_image(img): | |
| """ | |
| Validates the uploaded image based on size and resolution limits. | |
| Args: | |
| img (PIL.Image.Image): Image to validate. | |
| Returns: | |
| Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise. | |
| """ | |
| logger.info("Validating uploaded image.") | |
| try: | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="PNG") | |
| size_mb = len(buffer.getvalue()) / (1024 * 1024) | |
| if size_mb > MAX_IMAGE_MB: | |
| logger.warning("Image exceeds size limit of 5MB.") | |
| return False, "Image exceeds 5MB limit." | |
| if img.width > MAX_IMAGE_RES[0] or img.height > MAX_IMAGE_RES[1]: | |
| logger.warning("Image resolution exceeds 1920x1080.") | |
| return False, "Image resolution exceeds 1920x1080." | |
| logger.info("Image validation passed.") | |
| return True, None | |
| except Exception as e: | |
| logger.error(f"Error validating image: {e}") | |
| return False, str(e) | |
| def validate_video(path): | |
| """ | |
| Validates the uploaded video based on size and duration limits. | |
| Args: | |
| path (str): Path to the video file. | |
| Returns: | |
| Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise. | |
| """ | |
| logger.info(f"Validating video file at: {path}") | |
| try: | |
| size_mb = os.path.getsize(path) / (1024 * 1024) | |
| if size_mb > MAX_VIDEO_MB: | |
| logger.warning("Video exceeds size limit of 50MB.") | |
| return False, "Video exceeds 50MB limit." | |
| cap = cv2.VideoCapture(path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
| duration = frames / fps if fps else 0 | |
| cap.release() | |
| if duration > MAX_VIDEO_DURATION: | |
| logger.warning("Video exceeds 30 seconds duration limit.") | |
| return False, "Video exceeds 30 seconds duration limit." | |
| logger.info("Video validation passed.") | |
| return True, None | |
| except Exception as e: | |
| logger.error(f"Error validating video: {e}") | |
| return False, str(e) | |
| # Input Resolution | |
| def resolve_input(mode, uploaded_files, url): | |
| """ | |
| Resolves the media input based on selected mode. | |
| - If mode is 'Upload', accepts either: | |
| * 1–5 images (PIL.Image) | |
| * OR 1 video file (file path as string) | |
| - If mode is 'URL', fetches remote image or video. | |
| Args: | |
| mode (str): 'Upload' or 'URL' | |
| uploaded_files (List[Union[PIL.Image.Image, str]]): Uploaded media | |
| url (str): URL to image or video | |
| Returns: | |
| List[Union[PIL.Image.Image, str]] or None | |
| """ | |
| try: | |
| logger.info(f"Resolving input for mode: {mode}") | |
| if mode == "Upload": | |
| if not uploaded_files: | |
| logger.warning("No upload detected.") | |
| return None | |
| image_files = [f for f in uploaded_files if isinstance(f, Image.Image)] | |
| video_files = [f for f in uploaded_files if isinstance(f, str) and f.lower().endswith((".mp4", ".mov", ".avi"))] | |
| if image_files and video_files: | |
| logger.warning("Mixed media upload not supported (images + video).") | |
| return None | |
| if image_files: | |
| if 1 <= len(image_files) <= 5: | |
| logger.info(f"Accepted {len(image_files)} image(s).") | |
| return image_files | |
| logger.warning("Invalid number of images. Must be 1 to 5.") | |
| return None | |
| if video_files: | |
| if len(video_files) == 1: | |
| logger.info("Accepted single video upload.") | |
| return video_files | |
| logger.warning("Only one video allowed.") | |
| return None | |
| logger.warning("Unsupported upload type.") | |
| return None | |
| elif mode == "URL": | |
| if not url: | |
| logger.warning("URL mode selected but URL is empty.") | |
| return None | |
| media = fetch_media_from_url(url) | |
| if media: | |
| logger.info("Media successfully fetched from URL.") | |
| return [media] | |
| else: | |
| logger.warning("Failed to resolve media from URL.") | |
| return None | |
| else: | |
| logger.error(f"Invalid mode selected: {mode}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Exception in resolve_input(): {e}") | |
| return None | |
| # 35 sec limit per image | |
| def process_image( | |
| image: Image.Image, | |
| run_det: bool, | |
| det_model: str, | |
| det_confidence: float, | |
| run_seg: bool, | |
| seg_model: str, | |
| run_depth: bool, | |
| depth_model: str, | |
| blend: float | |
| ): | |
| """ | |
| Runs selected perception tasks on the input image and packages results. | |
| Args: | |
| image (PIL.Image): Input image. | |
| run_det (bool): Run object detection. | |
| det_model (str): Detection model key. | |
| det_confidence (float): Detection confidence threshold. | |
| run_seg (bool): Run segmentation. | |
| seg_model (str): Segmentation model key. | |
| run_depth (bool): Run depth estimation. | |
| depth_model (str): Depth model key. | |
| blend (float): Overlay blend alpha (0.0 - 1.0). | |
| Returns: | |
| Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP. | |
| """ | |
| logger.info("Starting image processing pipeline.") | |
| start_time = time.time() | |
| outputs, scene = {}, {} | |
| combined_np = np.array(image) | |
| try: | |
| # Detection | |
| if run_det: | |
| logger.info(f"Running detection with model: {det_model}") | |
| load_start = time.time() | |
| model = get_model("detection", DETECTION_MODEL_MAP[det_model], device="cpu") | |
| logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.") | |
| boxes = model.predict(image, conf_threshold=det_confidence) | |
| overlay = model.draw(image, boxes) | |
| combined_np = np.array(overlay) | |
| buf = io.BytesIO() | |
| overlay.save(buf, format="PNG") | |
| outputs["detection.png"] = buf.getvalue() | |
| scene["detection"] = boxes | |
| # Segmentation | |
| if run_seg: | |
| logger.info(f"Running segmentation with model: {seg_model}") | |
| load_start = time.time() | |
| model = get_model("segmentation", SEGMENTATION_MODEL_MAP[seg_model], device="cpu") | |
| logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.") | |
| mask = model.predict(image) | |
| overlay = model.draw(image, mask, alpha=blend) | |
| combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0) | |
| buf = io.BytesIO() | |
| overlay.save(buf, format="PNG") | |
| outputs["segmentation.png"] = buf.getvalue() | |
| scene["segmentation"] = mask.tolist() | |
| # Depth Estimation | |
| if run_depth: | |
| logger.info(f"Running depth estimation with model: {depth_model}") | |
| load_start = time.time() | |
| model = get_model("depth", DEPTH_MODEL_MAP[depth_model], device="cpu") | |
| logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.") | |
| dmap = model.predict(image) | |
| norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8) | |
| d_pil = Image.fromarray(norm_dmap) | |
| combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0) | |
| buf = io.BytesIO() | |
| d_pil.save(buf, format="PNG") | |
| outputs["depth_map.png"] = buf.getvalue() | |
| scene["depth"] = dmap.tolist() | |
| # Final image overlay | |
| final_img = Image.fromarray(combined_np) | |
| buf = io.BytesIO() | |
| final_img.save(buf, format="PNG") | |
| outputs["scene_blueprint.png"] = buf.getvalue() | |
| # Scene description | |
| try: | |
| scene_json = describe_scene(**scene) | |
| except Exception as e: | |
| logger.warning(f"describe_scene failed: {e}") | |
| scene_json = {"error": str(e)} | |
| telemetry = { | |
| "session_id": generate_session_id(), | |
| "runtime_sec": round(log_runtime(start_time), 2), | |
| "used_models": { | |
| "detection": det_model if run_det else None, | |
| "segmentation": seg_model if run_seg else None, | |
| "depth": depth_model if run_depth else None | |
| } | |
| } | |
| scene_json["telemetry"] = telemetry | |
| outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8") | |
| # ZIP file creation | |
| zip_buf = io.BytesIO() | |
| with zipfile.ZipFile(zip_buf, "w") as zipf: | |
| for name, data in outputs.items(): | |
| zipf.writestr(name, data) | |
| elapsed = log_runtime(start_time) | |
| logger.info(f"Image processing completed in {elapsed:.2f} seconds.") | |
| return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue()) | |
| except Exception as e: | |
| logger.error(f"Error in processing pipeline: {e}") | |
| return None, {"error": str(e)}, None | |
| # Main Handler | |
| def handle(mode, uploaded_files, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend): | |
| """ | |
| Master handler for resolving input and processing. | |
| Returns outputs for Gradio interface. | |
| """ | |
| session_id = generate_session_id() | |
| logger.info(f"Session ID: {session_id} | Handler activated with mode: {mode}") | |
| start_time = time.time() | |
| media = resolve_input(mode, uploaded_files, url) | |
| if not media: | |
| return None, format_error("No valid input provided. Please check your upload or URL."), None | |
| results = [] | |
| for single_media in media: | |
| if isinstance(single_media, str): # Video file | |
| valid, err = validate_video(single_media) | |
| if not valid: | |
| return None, format_error(err), None | |
| cap = cv2.VideoCapture(single_media) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| return None, format_error("Failed to read video frame."), None | |
| single_media = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| if isinstance(single_media, Image.Image): | |
| valid, err = validate_image(single_media) | |
| if not valid: | |
| return None, format_error(err), None | |
| try: | |
| return process_image(single_media, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend) | |
| except timeout_decorator.timeout_decorator.TimeoutError: | |
| logger.error("Image processing timed out.") | |
| return None, format_error("Processing timed out. Try a smaller image or simpler model."), None | |
| logger.warning("Unsupported media type resolved.") | |
| log_runtime(start_time) | |
| return None, format_error("Invalid input. Please check your upload or URL."), None | |
| # Gradio Interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Unified Visual Intelligence System (UVIS)") | |
| # Input Mode Toggle | |
| mode = gr.Radio(["Upload", "URL"], value="Upload", label="Input Mode") | |
| # File upload: accepts multiple images or one video (user chooses wisely) | |
| media_upload = gr.File( | |
| label="Upload Images (1–5) or 1 Video", | |
| file_types=["image", ".mp4", ".mov", ".avi"], | |
| file_count="multiple" | |
| ) | |
| # URL input | |
| url = gr.Textbox(label="URL (Image/Video)", visible=False) | |
| # Toggle visibility | |
| def toggle_inputs(selected_mode): | |
| return [ | |
| gr.update(visible=(selected_mode == "Upload")), # media_upload | |
| gr.update(visible=(selected_mode == "URL")) # url | |
| ] | |
| mode.change(toggle_inputs, inputs=mode, outputs=[media_upload, url]) | |
| # Task Selection with parameters | |
| with gr.Accordion("Object Detection Settings", open=False): | |
| run_det = gr.Checkbox(label="Enable Object Detection") | |
| det_model = gr.Dropdown(list(DETECTION_MODEL_MAP), label="Detection Model", visible=False) | |
| det_confidence = gr.Slider(0.1, 1.0, 0.5, label="Detection Confidence Threshold", visible=False) | |
| with gr.Accordion("Semantic Segmentation Settings", open=False): | |
| run_seg = gr.Checkbox(label="Enable Segmentation") | |
| seg_model = gr.Dropdown(list(SEGMENTATION_MODEL_MAP), label="Segmentation Model", visible=False) | |
| with gr.Accordion("Depth Estimation Settings", open=False): | |
| run_depth = gr.Checkbox(label="Enable Depth Estimation") | |
| depth_model = gr.Dropdown(list(DEPTH_MODEL_MAP), label="Depth Model", visible=False) | |
| blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend") | |
| # Run Button | |
| run = gr.Button("Run Analysis") | |
| # Output Tabs | |
| with gr.Tab("Scene JSON"): | |
| json_out = gr.JSON() | |
| with gr.Tab("Scene Blueprint"): | |
| img_out = gr.Image() | |
| with gr.Tab("Download"): | |
| zip_out = gr.File() | |
| # Attach Visibility Logic | |
| run_det.change(toggle_visibility, run_det, [det_model, det_confidence]) | |
| run_seg.change(toggle_visibility, run_seg, [seg_model]) | |
| run_depth.change(toggle_visibility, run_depth, [depth_model]) | |
| # Button Click Event | |
| run.click( | |
| handle, | |
| inputs=[mode, uploaded_files, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend], | |
| outputs=[img_out, json_out, zip_out] | |
| ) | |
| # Footer Section | |
| gr.Markdown("---") | |
| gr.Markdown( | |
| """ | |
| <div style='text-align: center; font-size: 14px;'> | |
| Built by <b>Durga Deepak Valluri</b><br> | |
| <a href="https://github.com/DurgaDeepakValluri/UVIS" target="_blank">GitHub</a> | | |
| <a href="https://deecoded.io" target="_blank">Website</a> | | |
| <a href="https://www.linkedin.com/in/durga-deepak-valluri" target="_blank">LinkedIn</a> | |
| </div> | |
| """, | |
| ) | |
| # Launch the Gradio App | |
| demo.launch() | |