Spaces:
Runtime error
Runtime error
| # UVIS - Gradio App with Upload, URL & Video Support | |
| """ | |
| This script launches the UVIS (Unified Visual Intelligence System) as a Gradio Web App. | |
| Supports image, video, and URL-based media inputs for detection, segmentation, and depth estimation. | |
| Outputs include scene blueprint, structured JSON, and downloadable results. | |
| """ | |
| import time | |
| import logging | |
| import traceback | |
| import gradio as gr | |
| from PIL import Image | |
| import cv2 | |
| import timeout_decorator | |
| import spaces | |
| import tempfile | |
| import shutil | |
| import os | |
| from registry import get_model | |
| from core.describe_scene import describe_scene | |
| from core.process import process_image, process_video | |
| from core.input_handler import resolve_input, validate_video, validate_image | |
| from utils.helpers import format_error, generate_session_id | |
| from huggingface_hub import hf_hub_download | |
| try: | |
| shutil.rmtree(os.path.expanduser("~/.cache/huggingface"), ignore_errors=True) | |
| shutil.rmtree("/home/user/.cache/huggingface", ignore_errors=True) | |
| print("π₯ Nuked HF model cache from runtime.") | |
| except Exception as e: | |
| print("π« Failed to nuke cache:", e) | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Model mappings | |
| DETECTION_MODEL_MAP = { | |
| "YOLOv8-Nano": "yolov8n", | |
| "YOLOv8-Small": "yolov8s", | |
| "YOLOv8-Large": "yolov8l", | |
| "YOLOv11-Beta": "yolov11b" | |
| } | |
| SEGMENTATION_MODEL_MAP = { | |
| "SegFormer-B0": "segformer_b0", | |
| "SegFormer-B5": "segformer_b5", | |
| "DeepLabV3-ResNet50": "deeplabv3_resnet50" | |
| } | |
| DEPTH_MODEL_MAP = { | |
| "MiDaS v21 Small 256": "midas_v21_small_256", | |
| "MiDaS v21 384": "midas_v21_384", | |
| "DPT Hybrid 384": "dpt_hybrid_384", | |
| "DPT Swin2 Large 384": "dpt_swin2_large_384", | |
| "DPT Beit Large 512": "dpt_beit_large_512" | |
| } | |
| # # Resource Limits | |
| # MAX_IMAGE_MB = 15 | |
| # MAX_IMAGE_RES = (1920, 1080) | |
| # MAX_VIDEO_MB = 50 | |
| # MAX_VIDEO_DURATION = 15 # seconds | |
| # def preload_models(): | |
| # """ | |
| # This function is needed to activate ZeroGPU. It must be decorated with @spaces.GPU. | |
| # It can be used to warm up models or load them into memory. | |
| # """ | |
| # from registry import get_model | |
| # print("Warming up models for ZeroGPU...") | |
| # get_model("detection", "yolov8n", device="cpu") | |
| # get_model("segmentation", "deeplabv3_resnet50", device="cpu") | |
| # get_model("depth", "midas_v21_small_256", device="cpu") | |
| def handle(mode, media_upload, url, | |
| run_det, det_model, det_confidence, | |
| run_seg, seg_model, | |
| run_depth, depth_model, | |
| blend): | |
| """ | |
| Master handler for resolving input and processing. | |
| Returns: (img_out, vid_out, json_out, zip_out) | |
| """ | |
| session_id = generate_session_id() | |
| logger.info(f"Session ID: {session_id} | Handler activated with mode: {mode}") | |
| start_time = time.time() | |
| media = resolve_input(mode, media_upload, url) | |
| if not media: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error("No valid input provided. Please check your upload or URL."), | |
| None | |
| ) | |
| first_input = media[0] | |
| # π§ Resolve dropdown label to model keys | |
| resolved_det_model = DETECTION_MODEL_MAP.get(det_model, det_model) | |
| resolved_seg_model = SEGMENTATION_MODEL_MAP.get(seg_model, seg_model) | |
| resolved_depth_model = DEPTH_MODEL_MAP.get(depth_model, depth_model) | |
| # --- VIDEO PATH --- | |
| if isinstance(first_input, str) and first_input.lower().endswith((".mp4", ".mov", ".avi")): | |
| valid, err = validate_video(first_input) | |
| if not valid: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error(err), | |
| None | |
| ) | |
| try: | |
| _, msg, output_video_path = process_video( | |
| video_path=first_input, | |
| run_det=run_det, | |
| det_model=resolved_det_model, | |
| det_confidence=det_confidence, | |
| run_seg=run_seg, | |
| seg_model=resolved_seg_model, | |
| run_depth=run_depth, | |
| depth_model=resolved_depth_model, | |
| blend=blend | |
| ) | |
| return ( | |
| gr.update(visible=False), # hide image | |
| gr.update(value=output_video_path, visible=True), # show video | |
| msg, | |
| output_video_path # for download | |
| ) | |
| except Exception as e: | |
| logger.error(f"Video processing failed: {e}") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error(str(e)), | |
| None | |
| ) | |
| # --- IMAGE PATH --- | |
| elif isinstance(first_input, Image.Image): | |
| valid, err = validate_image(first_input) | |
| if not valid: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error(err), | |
| None | |
| ) | |
| try: | |
| result_img, msg, output_zip = process_image( | |
| image=first_input, | |
| run_det=run_det, | |
| det_model=resolved_det_model, | |
| det_confidence=det_confidence, | |
| run_seg=run_seg, | |
| seg_model=resolved_seg_model, | |
| run_depth=run_depth, | |
| depth_model=resolved_depth_model, | |
| blend=blend | |
| ) | |
| return ( | |
| gr.update(value=result_img, visible=True), # show image | |
| gr.update(visible=False), # hide video | |
| msg, | |
| output_zip | |
| ) | |
| except timeout_decorator.timeout_decorator.TimeoutError: | |
| logger.error("Image processing timed out.") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error("Processing timed out. Try a smaller image or simpler model."), | |
| None | |
| ) | |
| except Exception as e: | |
| traceback.print_exc() | |
| logger.error(f"Image processing failed: {e}") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error(str(e)), | |
| None | |
| ) | |
| logger.warning("Unsupported media type resolved.") | |
| log_runtime(start_time) | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| format_error("Unsupported input type."), | |
| None | |
| ) | |
| def show_preview_from_upload(files): | |
| if not files: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| file = files[0] | |
| filename = file.name.lower() | |
| if filename.endswith((".png", ".jpg", ".jpeg", ".webp")): | |
| img = Image.open(file).convert("RGB") | |
| return gr.update(value=img, visible=True), gr.update(visible=False) | |
| elif filename.endswith((".mp4", ".mov", ".avi")): | |
| # Copy uploaded video to a known temp location | |
| temp_dir = tempfile.mkdtemp() | |
| ext = os.path.splitext(filename)[-1] | |
| safe_path = os.path.join(temp_dir, f"uploaded_video{ext}") | |
| with open(safe_path, "wb") as f: | |
| f.write(file.read()) | |
| return gr.update(visible=False), gr.update(value=safe_path, visible=True) | |
| return gr.update(visible=False), gr.update(visible=False) | |
| def show_preview_from_url(url_input): | |
| if not url_input: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| path = url_input.strip().lower() | |
| if path.endswith((".png", ".jpg", ".jpeg", ".webp")): | |
| return gr.update(value=url_input, visible=True), gr.update(visible=False) | |
| elif path.endswith((".mp4", ".mov", ".avi")): | |
| return gr.update(visible=False), gr.update(value=url_input, visible=True) | |
| return gr.update(visible=False), gr.update(visible=False) | |
| def clear_model_cache(): | |
| """ | |
| Deletes all model weight folders so they are redownloaded fresh. | |
| """ | |
| folders = [ | |
| "models/detection/weights", | |
| "models/segmentation/weights", | |
| "models/depth/weights" | |
| ] | |
| for folder in folders: | |
| shutil.rmtree(folder, ignore_errors=True) | |
| logger.info(f" Cleared: {folder}") | |
| return " Model cache cleared. Models will be reloaded on next run." | |
| # Gradio Interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Unified Visual Intelligence System (UVIS)") | |
| with gr.Row(): | |
| # left panel | |
| with gr.Column(scale=2): | |
| # Input Mode Toggle | |
| mode = gr.Radio(["Upload", "URL"], value="Upload", label="Input Mode") | |
| # File upload: accepts multiple images or one video (user chooses wisely) | |
| media_upload = gr.File( | |
| label="Upload Images (1β5) or 1 Video", | |
| file_types=["image", ".mp4", ".mov", ".avi"], | |
| file_count="multiple", | |
| visible=True | |
| ) | |
| # URL input | |
| url = gr.Textbox(label="URL (Image/Video)", visible=False) | |
| # Toggle visibility | |
| def toggle_inputs(selected_mode): | |
| return [ | |
| gr.update(visible=(selected_mode == "Upload")), # media_upload | |
| gr.update(visible=(selected_mode == "URL")), # url | |
| gr.update(visible=False), # preview_image | |
| gr.update(visible=False) # preview_video | |
| ] | |
| mode.change(toggle_inputs, inputs=mode, outputs=[media_upload, url]) | |
| # Visibility logic function | |
| def toggle_visibility(checked): | |
| return gr.update(visible=checked) | |
| # def toggle_det_visibility(checked): | |
| # return [gr.update(visible=checked), gr.update(visible=checked)] | |
| run_det = gr.Checkbox(label="Object Detection") | |
| run_seg = gr.Checkbox(label="Semantic Segmentation") | |
| run_depth = gr.Checkbox(label="Depth Estimation") | |
| with gr.Row(): | |
| with gr.Column(visible=False) as OD_Settings: | |
| with gr.Accordion("Object Detection Settings", open=True): | |
| det_model = gr.Dropdown(choices=list(DETECTION_MODEL_MAP), label="Detection Model") | |
| det_confidence = gr.Slider(0.1, 1.0, 0.5, label="Detection Confidence Threshold") | |
| nms_thresh = gr.Slider(0.1, 1.0, 0.45, label="NMS Threshold") | |
| max_det = gr.Slider(1, 100, 20, step=1, label="Max Detections") | |
| iou_thresh = gr.Slider(0.1, 1.0, 0.5, label="IoU Threshold") | |
| class_filter = gr.CheckboxGroup(["Person", "Car", "Dog"], label="Class Filter") | |
| with gr.Column(visible=False) as SS_Settings: | |
| with gr.Accordion("Semantic Segmentation Settings", open=True): | |
| seg_model = gr.Dropdown(choices=list(SEGMENTATION_MODEL_MAP), label="Segmentation Model") | |
| resize_strategy = gr.Dropdown(["Crop", "Pad", "Scale"], label="Resize Strategy") | |
| overlay_alpha = gr.Slider(0.0, 1.0, 0.5, label="Overlay Opacity") | |
| seg_classes = gr.CheckboxGroup(["Road", "Sky", "Building"], label="Target Classes") | |
| enable_crf = gr.Checkbox(label="Postprocessing (CRF)") | |
| with gr.Column(visible=False) as DE_Settings: | |
| with gr.Accordion("Depth Estimation Settings", open=True): | |
| depth_model = gr.Dropdown(choices=list(DEPTH_MODEL_MAP), label="Depth Model") | |
| output_type = gr.Dropdown(["Raw", "Disparity", "Scaled"], label="Output Type") | |
| colormap = gr.Dropdown(["Jet", "Viridis", "Plasma"], label="Colormap") | |
| blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend") | |
| normalize = gr.Checkbox(label="Normalize Depth") | |
| max_depth = gr.Slider(0.1, 10.0, 5.0, label="Max Depth (meters)") | |
| # Attach Visibility Logic | |
| run_det.change(fn=toggle_visibility, inputs=[run_det], outputs=[OD_Settings]) | |
| run_seg.change(fn=toggle_visibility, inputs=[run_seg], outputs=[SS_Settings]) | |
| run_depth.change(fn=toggle_visibility, inputs=[run_depth], outputs=[DE_Settings]) | |
| blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend") | |
| # Run Button | |
| run = gr.Button("Run Analysis") | |
| #Right panel | |
| with gr.Column(scale=1): | |
| # single_img_preview = gr.Image(label="Preview (Image)", visible=False) | |
| # gallery_preview = gr.Gallery(label="Preview (Gallery)", columns=3, height="auto", visible=False) | |
| # video_preview = gr.Video(label="Preview (Video)", visible=False) | |
| # Only one is shown at a time β image or video | |
| img_out = gr.Image(label="Preview / Processed Output", visible=False) | |
| vid_out = gr.Video(label="Preview / Processed Video", visible=False, streaming=True, autoplay=True) | |
| json_out = gr.JSON(label="Scene JSON") | |
| zip_out = gr.File(label="Download Results") | |
| clear_button = gr.Button("π§Ή Clear Model Cache") | |
| status_box = gr.Textbox(label="Status", interactive=False) | |
| clear_button.click(fn=clear_model_cache, inputs=[], outputs=[status_box]) | |
| media_upload.change(show_preview_from_upload, inputs=media_upload, outputs=[img_out, vid_out]) | |
| url.submit(show_preview_from_url, inputs=url, outputs=[img_out, vid_out]) | |
| # Unified run click β switch visibility based on image or video output | |
| def route_output(image_output, json_output, zip_file): | |
| # Show img_out if image was returned, else show video | |
| if isinstance(image_output, Image.Image): | |
| return gr.update(value=image_output, visible=True), gr.update(visible=False), json_output, zip_file | |
| elif isinstance(zip_file, str) and zip_file.endswith(".mp4"): | |
| return gr.update(visible=False), gr.update(value=zip_file, visible=True), json_output, zip_file | |
| else: | |
| return gr.update(visible=False), gr.update(visible=False), json_output, zip_file | |
| # # Output Tabs | |
| # with gr.Tab("Scene JSON"): | |
| # json_out = gr.JSON() | |
| # with gr.Tab("Scene Blueprint"): | |
| # img_out = gr.Image() | |
| # with gr.Tab("Download"): | |
| # zip_out = gr.File() | |
| # Button Click Event | |
| run.click( | |
| fn=handle, | |
| inputs=[ | |
| mode, media_upload, url, | |
| run_det, det_model, det_confidence, | |
| run_seg, seg_model, | |
| run_depth, depth_model, | |
| blend | |
| ], | |
| outputs=[ | |
| img_out, # will be visible only if it's an image | |
| vid_out, # will be visible only if it's a video | |
| json_out, | |
| zip_out | |
| ] | |
| ) | |
| # Footer Section | |
| gr.Markdown("---") | |
| gr.Markdown( | |
| """ | |
| <div style='text-align: center; font-size: 14px;'> | |
| Built by <b>Durga Deepak Valluri</b><br> | |
| <a href="https://github.com/DurgaDeepakValluri" target="_blank">GitHub</a> | | |
| <a href="https://deecoded.io" target="_blank">Website</a> | | |
| <a href="https://www.linkedin.com/in/durga-deepak-valluri" target="_blank">LinkedIn</a> | |
| </div> | |
| """, | |
| ) | |
| # Launch the Gradio App | |
| demo.launch() | |