VideoBackgroundReplacer / utils /cv_processing.py
MogensR's picture
Update utils/cv_processing.py
04ca462
raw
history blame
15.6 kB
#!/usr/bin/env python3
"""
cv_processing.py Β· slim orchestrator layer (self-contained, backward-compatible)
──────────────────────────────────────────────────────────────────────────────
Public API (unchanged):
- segment_person_hq(frame, predictor=None, fallback_enabled=True, **compat)
- segment_person_hq_original(...)
- refine_mask_hq(frame, mask, matanyone=None, fallback_enabled=True, **compat)
- replace_background_hq(frame, mask, background, fallback_enabled=True)
- create_professional_background(key_or_cfg, width, height)
- validate_video_file(video_path) -> (bool, reason)
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import cv2
import numpy as np
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# Background presets (local copy; safe defaults)
# ----------------------------------------------------------------------------
PROFESSIONAL_BACKGROUNDS_LOCAL: Dict[str, Dict[str, Any]] = {
"office": {"color": (240, 248, 255), "gradient": True},
"studio": {"color": (32, 32, 32), "gradient": False},
"nature": {"color": (34, 139, 34), "gradient": True},
"abstract": {"color": (75, 0, 130), "gradient": True},
"white": {"color": (255, 255, 255), "gradient": False},
"black": {"color": (0, 0, 0), "gradient": False},
}
PROFESSIONAL_BACKGROUNDS = PROFESSIONAL_BACKGROUNDS_LOCAL # alias for callers
# ----------------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------------
def _ensure_rgb(img: np.ndarray) -> np.ndarray:
if img is None:
return img
if img.ndim == 3 and img.shape[2] == 3:
# Assume OpenCV BGR β†’ convert to RGB
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
return img
def _to_mask01(m: np.ndarray) -> np.ndarray:
if m is None:
return None
if m.ndim == 3 and m.shape[2] in (1, 3):
m = m[..., 0]
m = m.astype(np.float32)
if m.max() > 1.0:
m = m / 255.0
return np.clip(m, 0.0, 1.0)
def _feather(mask01: np.ndarray, k: int = 2) -> np.ndarray:
if mask01.ndim == 3:
mask01 = mask01[..., 0]
k = max(1, int(k) * 2 + 1)
m = cv2.GaussianBlur((mask01 * 255.0).astype(np.uint8), (k, k), 0)
return (m.astype(np.float32) / 255.0)
def _vertical_gradient(top: Tuple[int,int,int], bottom: Tuple[int,int,int], width: int, height: int) -> np.ndarray:
bg = np.zeros((height, width, 3), dtype=np.uint8)
for y in range(height):
t = y / max(1, height - 1)
r = int(top[0] * (1 - t) + bottom[0] * t)
g = int(top[1] * (1 - t) + bottom[1] * t)
b = int(top[2] * (1 - t) + bottom[2] * t)
bg[y, :] = (r, g, b)
return bg
def _looks_like_mask(x: Any) -> bool:
return (
isinstance(x, np.ndarray)
and x.ndim in (2, 3)
and (x.ndim == 2 or (x.ndim == 3 and x.shape[2] in (1, 3)))
and x.dtype != object
)
# ----------------------------------------------------------------------------
# Background creation (RGB)
# ----------------------------------------------------------------------------
def create_professional_background(key_or_cfg: Any, width: int, height: int) -> np.ndarray:
if isinstance(key_or_cfg, str):
cfg = PROFESSIONAL_BACKGROUNDS_LOCAL.get(key_or_cfg, PROFESSIONAL_BACKGROUNDS_LOCAL["office"])
elif isinstance(key_or_cfg, dict):
cfg = key_or_cfg
else:
cfg = PROFESSIONAL_BACKGROUNDS_LOCAL["office"]
color = tuple(int(x) for x in cfg.get("color", (255, 255, 255)))
use_grad = bool(cfg.get("gradient", False))
if not use_grad:
return np.full((height, width, 3), color, dtype=np.uint8)
dark = (int(color[0]*0.7), int(color[1]*0.7), int(color[2]*0.7))
return _vertical_gradient(dark, color, width, height)
# ----------------------------------------------------------------------------
# Segmentation
# ----------------------------------------------------------------------------
def _simple_person_segmentation(frame_bgr: np.ndarray) -> np.ndarray:
hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
lower_green = np.array([40, 40, 40], dtype=np.uint8)
upper_green = np.array([80, 255, 255], dtype=np.uint8)
green_mask = cv2.inRange(hsv, lower_green, upper_green)
lower_white = np.array([0, 0, 200], dtype=np.uint8)
upper_white = np.array([180, 30, 255], dtype=np.uint8)
white_mask = cv2.inRange(hsv, lower_white, upper_white)
bg_mask = cv2.bitwise_or(green_mask, white_mask)
person_mask = cv2.bitwise_not(bg_mask)
kernel = np.ones((5, 5), np.uint8)
person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_CLOSE, kernel)
person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_OPEN, kernel)
return (person_mask.astype(np.float32) / 255.0)
def segment_person_hq(
frame: np.ndarray,
predictor: Optional[Any] = None,
fallback_enabled: bool = True,
# backward-compat shim:
use_sam2: Optional[bool] = None,
**_compat_kwargs,
) -> np.ndarray:
try:
if use_sam2 is False:
return _simple_person_segmentation(frame)
if predictor is not None and hasattr(predictor, "set_image") and hasattr(predictor, "predict"):
rgb = _ensure_rgb(frame)
predictor.set_image(rgb)
h, w = rgb.shape[:2]
center = np.array([[w // 2, h // 2]])
labels = np.array([1])
masks, scores, _ = predictor.predict(
point_coords=center,
point_labels=labels,
multimask_output=True
)
m = np.array(masks)
if m.ndim == 3:
idx = int(np.argmax(scores)) if scores is not None else 0
m = m[idx]
elif m.ndim != 2:
raise RuntimeError(f"Unexpected SAM2 mask shape: {m.shape}")
return _to_mask01(m)
except Exception as e:
logger.warning("SAM2 segmentation failed: %s", e)
return _simple_person_segmentation(frame) if fallback_enabled else np.ones(frame.shape[:2], dtype=np.float32)
segment_person_hq_original = segment_person_hq # back-compat alias
# ----------------------------------------------------------------------------
# MatAnyOne helpers
# ----------------------------------------------------------------------------
def _to_tensor_chw(img_uint8_bgr: np.ndarray) -> "torch.Tensor":
import torch
rgb = cv2.cvtColor(img_uint8_bgr, cv2.COLOR_BGR2RGB)
return torch.from_numpy(rgb).permute(2, 0, 1).contiguous().float() / 255.0 # (3,H,W)
def _mask_to_tensor01(mask01: np.ndarray) -> "torch.Tensor":
import torch
return torch.from_numpy(mask01.astype(np.float32)).unsqueeze(0).unsqueeze(0) # (1,1,H,W)
def _tensor_to_mask01(t: "torch.Tensor") -> np.ndarray:
import torch
if t.ndim == 4:
t = t[0, 0]
elif t.ndim == 3:
t = t[0]
return np.clip(t.detach().float().cpu().numpy(), 0.0, 1.0)
def _remap_harden(mask01: np.ndarray, inside: float = 0.70, outside: float = 0.35) -> np.ndarray:
"""
Pull the mask toward {0,1} to avoid 'ghost' translucency.
Values <= outside -> 0; >= inside -> 1; linear in between.
"""
m = mask01.astype(np.float32)
if inside <= outside:
return m
m = (m - outside) / max(1e-6, (inside - outside))
return np.clip(m, 0.0, 1.0)
def _pad_and_smooth_edges(mask01: np.ndarray, dilate_px: int = 6, edge_blur_px: int = 2) -> np.ndarray:
m = (mask01 * 255.0).astype(np.uint8)
if dilate_px > 0:
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dilate_px, dilate_px))
m = cv2.dilate(m, k, iterations=1)
if edge_blur_px > 0:
ksize = edge_blur_px * 2 + 1
m = cv2.GaussianBlur(m, (ksize, ksize), 0)
return (m.astype(np.float32) / 255.0)
def _try_matanyone_refine(
matanyone: Any,
frame_bgr: np.ndarray,
mask01: np.ndarray
) -> Optional[np.ndarray]:
"""
Try several MatAnyOne interfaces:
1) InferenceCore.infer(PIL_image, PIL_mask)
2) .step(image_tensor=NCHW, mask_tensor=NCHW)
3) .process(image_np, mask_np)
4) callable(image_tensor, mask_tensor) β†’ tensor
Returns refined mask01 (np.ndarray) or None if not usable.
"""
try:
# --- (1) PIL infer path ------------------------------------------------
if hasattr(matanyone, "infer"):
try:
from PIL import Image
img_pil = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
m_pil = Image.fromarray((mask01 * 255.0).astype(np.uint8))
out_pil = matanyone.infer(img_pil, m_pil)
out_np = np.asarray(out_pil).astype(np.float32)
return _to_mask01(out_np)
except Exception as e:
logger.debug("MatAnyOne.infer path failed: %s", e)
# --- (2) tensor .step path --------------------------------------------
if hasattr(matanyone, "step"):
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
img_t = _to_tensor_chw(frame_bgr).unsqueeze(0).to(device) # (1,3,H,W)
mask_t = _mask_to_tensor01(mask01).to(device) # (1,1,H,W)
with torch.inference_mode():
out = matanyone.step(
image_tensor=img_t,
mask_tensor=mask_t,
objects=None,
first_frame_pred=True
)
if hasattr(matanyone, "output_prob_to_mask"):
out = matanyone.output_prob_to_mask(out)
return _tensor_to_mask01(out)
# --- (3) numpy .process path ------------------------------------------
if hasattr(matanyone, "process"):
out = matanyone.process(frame_bgr, mask01)
return _to_mask01(np.asarray(out))
# --- (4) callable / nn.Module path ------------------------------------
if callable(matanyone):
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
img_t = _to_tensor_chw(frame_bgr).unsqueeze(0).to(device)
mask_t = _mask_to_tensor01(mask01).to(device)
with torch.inference_mode():
out = matanyone(img_t, mask_t)
return _tensor_to_mask01(out)
except Exception as e:
logger.warning("MatAnyOne refine error: %s", e)
return None
# ----------------------------------------------------------------------------
# Refinement (MatAnyOne)
# ----------------------------------------------------------------------------
def refine_mask_hq(
frame: np.ndarray,
mask: np.ndarray,
matanyone: Optional[Any] = None,
fallback_enabled: bool = True,
# backward-compat shims:
use_matanyone: Optional[bool] = None,
**_compat_kwargs,
) -> np.ndarray:
"""
Refine single-channel mask with MatAnyOne if available.
Backward-compat:
- accepts use_matanyone (False β†’ skip model)
- tolerates legacy arg order refine_mask_hq(mask, frame, ...)
"""
# tolerate legacy order: refine_mask_hq(mask, frame, ...)
if _looks_like_mask(frame) and _looks_like_mask(mask) and mask.ndim == 3 and mask.shape[2] == 3:
frame, mask = mask, frame # swap
mask01 = _to_mask01(mask)
# Use MatAnyOne when possible
if use_matanyone is not False and matanyone is not None:
refined = _try_matanyone_refine(matanyone, frame, mask01)
if refined is not None:
# Hardening + edge handling to avoid translucent body/halo
refined = _remap_harden(refined, inside=0.70, outside=0.35)
refined = _pad_and_smooth_edges(refined, dilate_px=4, edge_blur_px=1)
return refined
else:
logger.warning("MatAnyOne provided but no usable interface found; falling back.")
# Simple refinement fallback
m = (mask01 * 255.0).astype(np.uint8)
m = cv2.GaussianBlur(m, (5, 5), 0)
m = cv2.bilateralFilter(m, 9, 75, 75)
m = (m.astype(np.float32) / 255.0)
m = _remap_harden(m, inside=0.68, outside=0.40)
m = _pad_and_smooth_edges(m, dilate_px=3, edge_blur_px=1)
return m if fallback_enabled else mask01
# ----------------------------------------------------------------------------
# Compositing
# ----------------------------------------------------------------------------
def replace_background_hq(
frame: np.ndarray,
mask01: np.ndarray,
background: np.ndarray,
fallback_enabled: bool = True,
**_compat,
) -> np.ndarray:
try:
H, W = frame.shape[:2]
if background.shape[:2] != (H, W):
background = cv2.resize(background, (W, H), interpolation=cv2.INTER_LANCZOS4)
m = _to_mask01(mask01)
# Very light feather to hide stair-steps; most shaping already done
m = _feather(m, k=1)
m3 = np.repeat(m[:, :, None], 3, axis=2)
comp = frame.astype(np.float32) * m3 + background.astype(np.float32) * (1.0 - m3)
return np.clip(comp, 0, 255).astype(np.uint8)
except Exception as e:
if fallback_enabled:
logger.warning("Compositing failed (%s) – returning original frame", e)
return frame
raise
# ----------------------------------------------------------------------------
# Video validation
# ----------------------------------------------------------------------------
def validate_video_file(video_path: str) -> Tuple[bool, str]:
if not video_path or not Path(video_path).exists():
return False, "Video file not found"
try:
size = Path(video_path).stat().st_size
if size == 0:
return False, "File is empty"
if size > 2 * 1024 * 1024 * 1024:
return False, "File > 2 GB β€” too large for the Space quota"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return False, "OpenCV cannot read the file"
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
if n_frames == 0:
return False, "No frames detected"
if fps <= 0 or fps > 120:
return False, f"Suspicious FPS: {fps}"
if w <= 0 or h <= 0:
return False, "Zero resolution"
if w > 4096 or h > 4096:
return False, f"Resolution {w}Γ—{h} too high (max 4 096Β²)"
if (n_frames / fps) > 300:
return False, "Video longer than 5 minutes"
return True, f"OK β†’ {w}Γ—{h}, {fps:.1f} fps, {n_frames/fps:.1f} s"
except Exception as e:
logger.error(f"validate_video_file: {e}")
return False, f"Validation error: {e}"
# ----------------------------------------------------------------------------
# Public symbols
# ----------------------------------------------------------------------------
__all__ = [
"segment_person_hq",
"segment_person_hq_original",
"refine_mask_hq",
"replace_background_hq",
"create_professional_background",
"validate_video_file",
"PROFESSIONAL_BACKGROUNDS",
]