#!/usr/bin/env python3 """ cv_processing.py · slim orchestrator layer (self-contained, backward-compatible) ────────────────────────────────────────────────────────────────────────────── Public API (unchanged): - segment_person_hq(frame, predictor=None, fallback_enabled=True, **compat) - segment_person_hq_original(...) - refine_mask_hq(frame, mask, matanyone=None, fallback_enabled=True, **compat) - replace_background_hq(frame, mask, background, fallback_enabled=True) - create_professional_background(key_or_cfg, width, height) - validate_video_file(video_path) -> (bool, reason) """ from __future__ import annotations import logging from pathlib import Path from typing import Any, Dict, Optional, Tuple import cv2 import numpy as np logger = logging.getLogger(__name__) # ---------------------------------------------------------------------------- # Background presets (local copy; safe defaults) # ---------------------------------------------------------------------------- PROFESSIONAL_BACKGROUNDS_LOCAL: Dict[str, Dict[str, Any]] = { "office": {"color": (240, 248, 255), "gradient": True}, "studio": {"color": (32, 32, 32), "gradient": False}, "nature": {"color": (34, 139, 34), "gradient": True}, "abstract": {"color": (75, 0, 130), "gradient": True}, "white": {"color": (255, 255, 255), "gradient": False}, "black": {"color": (0, 0, 0), "gradient": False}, } PROFESSIONAL_BACKGROUNDS = PROFESSIONAL_BACKGROUNDS_LOCAL # alias for callers # ---------------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------------- def _ensure_rgb(img: np.ndarray) -> np.ndarray: if img is None: return img if img.ndim == 3 and img.shape[2] == 3: # Assume OpenCV BGR → convert to RGB return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) return img def _to_mask01(m: np.ndarray) -> np.ndarray: if m is None: return None if m.ndim == 3 and m.shape[2] in (1, 3): m = m[..., 0] m = m.astype(np.float32) if m.max() > 1.0: m = m / 255.0 return np.clip(m, 0.0, 1.0) def _feather(mask01: np.ndarray, k: int = 2) -> np.ndarray: if mask01.ndim == 3: mask01 = mask01[..., 0] k = max(1, int(k) * 2 + 1) m = cv2.GaussianBlur((mask01 * 255.0).astype(np.uint8), (k, k), 0) return (m.astype(np.float32) / 255.0) def _vertical_gradient(top: Tuple[int,int,int], bottom: Tuple[int,int,int], width: int, height: int) -> np.ndarray: bg = np.zeros((height, width, 3), dtype=np.uint8) for y in range(height): t = y / max(1, height - 1) r = int(top[0] * (1 - t) + bottom[0] * t) g = int(top[1] * (1 - t) + bottom[1] * t) b = int(top[2] * (1 - t) + bottom[2] * t) bg[y, :] = (r, g, b) return bg def _looks_like_mask(x: Any) -> bool: return ( isinstance(x, np.ndarray) and x.ndim in (2, 3) and (x.ndim == 2 or (x.ndim == 3 and x.shape[2] in (1, 3))) and x.dtype != object ) # ---------------------------------------------------------------------------- # Background creation (RGB) # ---------------------------------------------------------------------------- def create_professional_background(key_or_cfg: Any, width: int, height: int) -> np.ndarray: if isinstance(key_or_cfg, str): cfg = PROFESSIONAL_BACKGROUNDS_LOCAL.get(key_or_cfg, PROFESSIONAL_BACKGROUNDS_LOCAL["office"]) elif isinstance(key_or_cfg, dict): cfg = key_or_cfg else: cfg = PROFESSIONAL_BACKGROUNDS_LOCAL["office"] color = tuple(int(x) for x in cfg.get("color", (255, 255, 255))) use_grad = bool(cfg.get("gradient", False)) if not use_grad: return np.full((height, width, 3), color, dtype=np.uint8) dark = (int(color[0]*0.7), int(color[1]*0.7), int(color[2]*0.7)) return _vertical_gradient(dark, color, width, height) # ---------------------------------------------------------------------------- # Segmentation # ---------------------------------------------------------------------------- def _simple_person_segmentation(frame_bgr: np.ndarray) -> np.ndarray: hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV) lower_green = np.array([40, 40, 40], dtype=np.uint8) upper_green = np.array([80, 255, 255], dtype=np.uint8) green_mask = cv2.inRange(hsv, lower_green, upper_green) lower_white = np.array([0, 0, 200], dtype=np.uint8) upper_white = np.array([180, 30, 255], dtype=np.uint8) white_mask = cv2.inRange(hsv, lower_white, upper_white) bg_mask = cv2.bitwise_or(green_mask, white_mask) person_mask = cv2.bitwise_not(bg_mask) kernel = np.ones((5, 5), np.uint8) person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_CLOSE, kernel) person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_OPEN, kernel) return (person_mask.astype(np.float32) / 255.0) def segment_person_hq( frame: np.ndarray, predictor: Optional[Any] = None, fallback_enabled: bool = True, # backward-compat shim: use_sam2: Optional[bool] = None, **_compat_kwargs, ) -> np.ndarray: try: if use_sam2 is False: return _simple_person_segmentation(frame) if predictor is not None and hasattr(predictor, "set_image") and hasattr(predictor, "predict"): rgb = _ensure_rgb(frame) predictor.set_image(rgb) h, w = rgb.shape[:2] center = np.array([[w // 2, h // 2]]) labels = np.array([1]) masks, scores, _ = predictor.predict( point_coords=center, point_labels=labels, multimask_output=True ) m = np.array(masks) if m.ndim == 3: idx = int(np.argmax(scores)) if scores is not None else 0 m = m[idx] elif m.ndim != 2: raise RuntimeError(f"Unexpected SAM2 mask shape: {m.shape}") return _to_mask01(m) except Exception as e: logger.warning("SAM2 segmentation failed: %s", e) return _simple_person_segmentation(frame) if fallback_enabled else np.ones(frame.shape[:2], dtype=np.float32) segment_person_hq_original = segment_person_hq # back-compat alias # ---------------------------------------------------------------------------- # MatAnyOne helpers # ---------------------------------------------------------------------------- def _to_tensor_chw(img_uint8_bgr: np.ndarray) -> "torch.Tensor": import torch rgb = cv2.cvtColor(img_uint8_bgr, cv2.COLOR_BGR2RGB) return torch.from_numpy(rgb).permute(2, 0, 1).contiguous().float() / 255.0 # (3,H,W) def _mask_to_tensor01(mask01: np.ndarray) -> "torch.Tensor": import torch return torch.from_numpy(mask01.astype(np.float32)).unsqueeze(0).unsqueeze(0) # (1,1,H,W) def _tensor_to_mask01(t: "torch.Tensor") -> np.ndarray: import torch if t.ndim == 4: t = t[0, 0] elif t.ndim == 3: t = t[0] return np.clip(t.detach().float().cpu().numpy(), 0.0, 1.0) def _remap_harden(mask01: np.ndarray, inside: float = 0.70, outside: float = 0.35) -> np.ndarray: """ Pull the mask toward {0,1} to avoid 'ghost' translucency. Values <= outside -> 0; >= inside -> 1; linear in between. """ m = mask01.astype(np.float32) if inside <= outside: return m m = (m - outside) / max(1e-6, (inside - outside)) return np.clip(m, 0.0, 1.0) def _pad_and_smooth_edges(mask01: np.ndarray, dilate_px: int = 6, edge_blur_px: int = 2) -> np.ndarray: m = (mask01 * 255.0).astype(np.uint8) if dilate_px > 0: k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dilate_px, dilate_px)) m = cv2.dilate(m, k, iterations=1) if edge_blur_px > 0: ksize = edge_blur_px * 2 + 1 m = cv2.GaussianBlur(m, (ksize, ksize), 0) return (m.astype(np.float32) / 255.0) def _try_matanyone_refine( matanyone: Any, frame_bgr: np.ndarray, mask01: np.ndarray ) -> Optional[np.ndarray]: """ Try several MatAnyOne interfaces: 1) InferenceCore.infer(PIL_image, PIL_mask) 2) .step(image_tensor=NCHW, mask_tensor=NCHW) 3) .process(image_np, mask_np) 4) callable(image_tensor, mask_tensor) → tensor Returns refined mask01 (np.ndarray) or None if not usable. """ try: # --- (1) PIL infer path ------------------------------------------------ if hasattr(matanyone, "infer"): try: from PIL import Image img_pil = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)) m_pil = Image.fromarray((mask01 * 255.0).astype(np.uint8)) out_pil = matanyone.infer(img_pil, m_pil) out_np = np.asarray(out_pil).astype(np.float32) return _to_mask01(out_np) except Exception as e: logger.debug("MatAnyOne.infer path failed: %s", e) # --- (2) tensor .step path -------------------------------------------- if hasattr(matanyone, "step"): import torch device = "cuda" if torch.cuda.is_available() else "cpu" img_t = _to_tensor_chw(frame_bgr).unsqueeze(0).to(device) # (1,3,H,W) mask_t = _mask_to_tensor01(mask01).to(device) # (1,1,H,W) with torch.inference_mode(): out = matanyone.step( image_tensor=img_t, mask_tensor=mask_t, objects=None, first_frame_pred=True ) if hasattr(matanyone, "output_prob_to_mask"): out = matanyone.output_prob_to_mask(out) return _tensor_to_mask01(out) # --- (3) numpy .process path ------------------------------------------ if hasattr(matanyone, "process"): out = matanyone.process(frame_bgr, mask01) return _to_mask01(np.asarray(out)) # --- (4) callable / nn.Module path ------------------------------------ if callable(matanyone): import torch device = "cuda" if torch.cuda.is_available() else "cpu" img_t = _to_tensor_chw(frame_bgr).unsqueeze(0).to(device) mask_t = _mask_to_tensor01(mask01).to(device) with torch.inference_mode(): out = matanyone(img_t, mask_t) return _tensor_to_mask01(out) except Exception as e: logger.warning("MatAnyOne refine error: %s", e) return None # ---------------------------------------------------------------------------- # Refinement (MatAnyOne) # ---------------------------------------------------------------------------- def refine_mask_hq( frame: np.ndarray, mask: np.ndarray, matanyone: Optional[Any] = None, fallback_enabled: bool = True, # backward-compat shims: use_matanyone: Optional[bool] = None, **_compat_kwargs, ) -> np.ndarray: """ Refine single-channel mask with MatAnyOne if available. Backward-compat: - accepts use_matanyone (False → skip model) - tolerates legacy arg order refine_mask_hq(mask, frame, ...) """ # tolerate legacy order: refine_mask_hq(mask, frame, ...) if _looks_like_mask(frame) and _looks_like_mask(mask) and mask.ndim == 3 and mask.shape[2] == 3: frame, mask = mask, frame # swap mask01 = _to_mask01(mask) # Use MatAnyOne when possible if use_matanyone is not False and matanyone is not None: refined = _try_matanyone_refine(matanyone, frame, mask01) if refined is not None: # Hardening + edge handling to avoid translucent body/halo refined = _remap_harden(refined, inside=0.70, outside=0.35) refined = _pad_and_smooth_edges(refined, dilate_px=4, edge_blur_px=1) return refined else: logger.warning("MatAnyOne provided but no usable interface found; falling back.") # Simple refinement fallback m = (mask01 * 255.0).astype(np.uint8) m = cv2.GaussianBlur(m, (5, 5), 0) m = cv2.bilateralFilter(m, 9, 75, 75) m = (m.astype(np.float32) / 255.0) m = _remap_harden(m, inside=0.68, outside=0.40) m = _pad_and_smooth_edges(m, dilate_px=3, edge_blur_px=1) return m if fallback_enabled else mask01 # ---------------------------------------------------------------------------- # Compositing # ---------------------------------------------------------------------------- def replace_background_hq( frame: np.ndarray, mask01: np.ndarray, background: np.ndarray, fallback_enabled: bool = True, **_compat, ) -> np.ndarray: try: H, W = frame.shape[:2] if background.shape[:2] != (H, W): background = cv2.resize(background, (W, H), interpolation=cv2.INTER_LANCZOS4) m = _to_mask01(mask01) # Very light feather to hide stair-steps; most shaping already done m = _feather(m, k=1) m3 = np.repeat(m[:, :, None], 3, axis=2) comp = frame.astype(np.float32) * m3 + background.astype(np.float32) * (1.0 - m3) return np.clip(comp, 0, 255).astype(np.uint8) except Exception as e: if fallback_enabled: logger.warning("Compositing failed (%s) – returning original frame", e) return frame raise # ---------------------------------------------------------------------------- # Video validation # ---------------------------------------------------------------------------- def validate_video_file(video_path: str) -> Tuple[bool, str]: if not video_path or not Path(video_path).exists(): return False, "Video file not found" try: size = Path(video_path).stat().st_size if size == 0: return False, "File is empty" if size > 2 * 1024 * 1024 * 1024: return False, "File > 2 GB — too large for the Space quota" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return False, "OpenCV cannot read the file" n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() if n_frames == 0: return False, "No frames detected" if fps <= 0 or fps > 120: return False, f"Suspicious FPS: {fps}" if w <= 0 or h <= 0: return False, "Zero resolution" if w > 4096 or h > 4096: return False, f"Resolution {w}×{h} too high (max 4 096²)" if (n_frames / fps) > 300: return False, "Video longer than 5 minutes" return True, f"OK → {w}×{h}, {fps:.1f} fps, {n_frames/fps:.1f} s" except Exception as e: logger.error(f"validate_video_file: {e}") return False, f"Validation error: {e}" # ---------------------------------------------------------------------------- # Public symbols # ---------------------------------------------------------------------------- __all__ = [ "segment_person_hq", "segment_person_hq_original", "refine_mask_hq", "replace_background_hq", "create_professional_background", "validate_video_file", "PROFESSIONAL_BACKGROUNDS", ]