Spaces:
Runtime error
Runtime error
| import logging | |
| import time | |
| import timeout_decorator | |
| import io | |
| import os | |
| import zipfile | |
| import json | |
| import cv2 | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| from registry import get_model | |
| from core.describe_scene import describe_scene | |
| from utils.helpers import generate_session_id, log_runtime | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Model mappings | |
| DETECTION_MODEL_MAP = { | |
| "YOLOv8-Nano": "yolov8n", | |
| "YOLOv8-Small": "yolov8s", | |
| "YOLOv8-Large": "yolov8l", | |
| "YOLOv11-Beta": "yolov11b" | |
| } | |
| SEGMENTATION_MODEL_MAP = { | |
| "SegFormer-B0": "segformer_b0", | |
| "SegFormer-B5": "segformer_b5", | |
| "DeepLabV3-ResNet50": "deeplabv3_resnet50" | |
| } | |
| DEPTH_MODEL_MAP = { | |
| "MiDaS v21 Small 256": "midas_v21_small_256", | |
| "MiDaS v21 384": "midas_v21_384", | |
| "DPT Hybrid 384": "dpt_hybrid_384", | |
| "DPT Swin2 Large 384": "dpt_swin2_large_384", | |
| "DPT Beit Large 512": "dpt_beit_large_512" | |
| } | |
| def process_video( | |
| video_path: str, | |
| run_det: bool, | |
| det_model: str, | |
| det_confidence: float, | |
| run_seg: bool, | |
| seg_model: str, | |
| run_depth: bool, | |
| depth_model: str, | |
| blend: float | |
| ): | |
| """ | |
| Reads each frame from `video_path`, runs your existing `process_image()` on it, | |
| and writes out a new MP4 in outputs/processed_<name>.mp4. | |
| Returns: | |
| (None, scene_json: dict, output_video_path: str) | |
| """ | |
| logger.info(f"Starting video processing for {video_path}") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| os.makedirs("outputs", exist_ok=True) | |
| base = os.path.basename(video_path) | |
| out_path = os.path.join("outputs", f"processed_{base}") | |
| writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) | |
| if not writer.isOpened(): | |
| cap.release() | |
| raise RuntimeError(f"Cannot write to: {out_path}") | |
| frame_idx = 0 | |
| scene_info = {"video": base, "frames_processed": 0} | |
| while True: | |
| ret, frame_bgr = cap.read() | |
| if not ret: | |
| break | |
| # BGR→RGB→PIL | |
| frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) | |
| pil_frame = Image.fromarray(frame_rgb) | |
| # Run your existing image pipeline | |
| try: | |
| processed_img, _, _ = process_image( | |
| image=pil_frame, | |
| run_det=run_det, | |
| det_model=det_model, | |
| det_confidence=det_confidence, | |
| run_seg=run_seg, | |
| seg_model=seg_model, | |
| run_depth=run_depth, | |
| depth_model=depth_model, | |
| blend=blend | |
| ) | |
| except Exception as e: | |
| cap.release() | |
| writer.release() | |
| raise RuntimeError(f"Error on frame {frame_idx}: {e}") | |
| # PIL→BGR numpy | |
| out_bgr = cv2.cvtColor(np.array(processed_img), cv2.COLOR_RGB2BGR) | |
| writer.write(out_bgr) | |
| frame_idx += 1 | |
| scene_info["frames_processed"] = frame_idx | |
| cap.release() | |
| writer.release() | |
| logger.info(f"Finished video. Wrote {frame_idx} frames to {out_path}") | |
| # Minimal JSON summary | |
| scene_json = { | |
| "video": scene_info["video"], | |
| "frames_processed": scene_info["frames_processed"] | |
| } | |
| return None, scene_json, out_path | |
| #@timeout_decorator.timeout(35, use_signals=False) # 35 sec limit per image | |
| def process_image( | |
| image: Image.Image, | |
| run_det: bool, | |
| det_model: str, | |
| det_confidence: float, | |
| run_seg: bool, | |
| seg_model: str, | |
| run_depth: bool, | |
| depth_model: str, | |
| blend: float | |
| ): | |
| """ | |
| Runs selected perception tasks on the input image and packages results. | |
| Args: | |
| image (PIL.Image): Input image. | |
| run_det (bool): Run object detection. | |
| det_model (str): Detection model key. | |
| det_confidence (float): Detection confidence threshold. | |
| run_seg (bool): Run segmentation. | |
| seg_model (str): Segmentation model key. | |
| run_depth (bool): Run depth estimation. | |
| depth_model (str): Depth model key. | |
| blend (float): Overlay blend alpha (0.0 - 1.0). | |
| Returns: | |
| Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP. | |
| """ | |
| logger.info("Starting image processing pipeline.") | |
| start_time = time.time() | |
| outputs, scene = {}, {} | |
| combined_np = np.array(image) | |
| try: | |
| # Detection | |
| if run_det: | |
| logger.info(f"Running detection with model: {det_model}") | |
| load_start = time.time() | |
| model = get_model("detection", det_model, device="cpu") | |
| model.load_model() | |
| logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.") | |
| boxes = model.predict(image, conf_threshold=det_confidence) | |
| overlay = model.draw(image, boxes) | |
| combined_np = np.array(overlay) | |
| buf = io.BytesIO() | |
| overlay.save(buf, format="PNG") | |
| outputs["detection.png"] = buf.getvalue() | |
| scene["detection"] = boxes | |
| # Segmentation | |
| if run_seg: | |
| logger.info(f"Running segmentation with model: {seg_model}") | |
| load_start = time.time() | |
| model = get_model("segmentation", seg_model, device="cpu") | |
| logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.") | |
| mask = model.predict(image) | |
| overlay = model.draw(image, mask, alpha=blend) | |
| combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0) | |
| buf = io.BytesIO() | |
| overlay.save(buf, format="PNG") | |
| outputs["segmentation.png"] = buf.getvalue() | |
| scene["segmentation"] = mask.tolist() | |
| # Depth Estimation | |
| if run_depth: | |
| logger.info(f"Running depth estimation with model: {depth_model}") | |
| load_start = time.time() | |
| model = get_model("depth", depth_model, device="cpu") | |
| logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.") | |
| dmap = model.predict(image) | |
| norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8) | |
| d_pil = Image.fromarray(norm_dmap) | |
| combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0) | |
| buf = io.BytesIO() | |
| d_pil.save(buf, format="PNG") | |
| outputs["depth_map.png"] = buf.getvalue() | |
| scene["depth"] = dmap.tolist() | |
| # Final image overlay | |
| final_img = Image.fromarray(combined_np) | |
| buf = io.BytesIO() | |
| final_img.save(buf, format="PNG") | |
| outputs["scene_blueprint.png"] = buf.getvalue() | |
| # Scene description | |
| try: | |
| scene_json = describe_scene(**scene) | |
| except Exception as e: | |
| logger.warning(f"describe_scene failed: {e}") | |
| scene_json = {"error": str(e)} | |
| telemetry = { | |
| "session_id": generate_session_id(), | |
| "runtime_sec": round(log_runtime(start_time), 2), | |
| "used_models": { | |
| "detection": det_model if run_det else None, | |
| "segmentation": seg_model if run_seg else None, | |
| "depth": depth_model if run_depth else None | |
| } | |
| } | |
| scene_json["telemetry"] = telemetry | |
| outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8") | |
| # ZIP file creation | |
| zip_buf = io.BytesIO() | |
| with zipfile.ZipFile(zip_buf, "w") as zipf: | |
| for name, data in outputs.items(): | |
| zipf.writestr(name, data) | |
| elapsed = log_runtime(start_time) | |
| logger.info(f"Image processing completed in {elapsed:.2f} seconds.") | |
| #return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue()) | |
| # Save ZIP to disk for Gradio file output | |
| zip_path = "outputs/uvis_results.zip" | |
| os.makedirs("outputs", exist_ok=True) | |
| with open(zip_path, "wb") as f: | |
| f.write(zip_buf.getvalue()) | |
| return final_img, scene_json, zip_path | |
| except Exception as e: | |
| logger.error(f"Error in processing pipeline: {e}") | |
| return None, {"error": str(e)}, None |