|
|
|
|
|
""" |
|
|
cv_processing.py Β· slim orchestrator layer (self-contained, backward-compatible) |
|
|
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
|
|
Public API (unchanged): |
|
|
- segment_person_hq(frame, predictor=None, fallback_enabled=True, **compat) |
|
|
- segment_person_hq_original(...) |
|
|
- refine_mask_hq(frame, mask, matanyone=None, fallback_enabled=True, **compat) |
|
|
- replace_background_hq(frame, mask, background, fallback_enabled=True) |
|
|
- create_professional_background(key_or_cfg, width, height) |
|
|
- validate_video_file(video_path) -> (bool, reason) |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional, Tuple |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PROFESSIONAL_BACKGROUNDS_LOCAL: Dict[str, Dict[str, Any]] = { |
|
|
"office": {"color": (240, 248, 255), "gradient": True}, |
|
|
"studio": {"color": (32, 32, 32), "gradient": False}, |
|
|
"nature": {"color": (34, 139, 34), "gradient": True}, |
|
|
"abstract": {"color": (75, 0, 130), "gradient": True}, |
|
|
"white": {"color": (255, 255, 255), "gradient": False}, |
|
|
"black": {"color": (0, 0, 0), "gradient": False}, |
|
|
} |
|
|
PROFESSIONAL_BACKGROUNDS = PROFESSIONAL_BACKGROUNDS_LOCAL |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_rgb(img: np.ndarray) -> np.ndarray: |
|
|
if img is None: |
|
|
return img |
|
|
if img.ndim == 3 and img.shape[2] == 3: |
|
|
|
|
|
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
return img |
|
|
|
|
|
def _to_mask01(m: np.ndarray) -> np.ndarray: |
|
|
if m is None: |
|
|
return None |
|
|
if m.ndim == 3 and m.shape[2] in (1, 3): |
|
|
m = m[..., 0] |
|
|
m = m.astype(np.float32) |
|
|
if m.max() > 1.0: |
|
|
m = m / 255.0 |
|
|
return np.clip(m, 0.0, 1.0) |
|
|
|
|
|
def _feather(mask01: np.ndarray, k: int = 2) -> np.ndarray: |
|
|
if mask01.ndim == 3: |
|
|
mask01 = mask01[..., 0] |
|
|
k = max(1, int(k) * 2 + 1) |
|
|
m = cv2.GaussianBlur((mask01 * 255.0).astype(np.uint8), (k, k), 0) |
|
|
return (m.astype(np.float32) / 255.0) |
|
|
|
|
|
def _vertical_gradient(top: Tuple[int,int,int], bottom: Tuple[int,int,int], width: int, height: int) -> np.ndarray: |
|
|
bg = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
for y in range(height): |
|
|
t = y / max(1, height - 1) |
|
|
r = int(top[0] * (1 - t) + bottom[0] * t) |
|
|
g = int(top[1] * (1 - t) + bottom[1] * t) |
|
|
b = int(top[2] * (1 - t) + bottom[2] * t) |
|
|
bg[y, :] = (r, g, b) |
|
|
return bg |
|
|
|
|
|
def _looks_like_mask(x: Any) -> bool: |
|
|
return ( |
|
|
isinstance(x, np.ndarray) |
|
|
and x.ndim in (2, 3) |
|
|
and (x.ndim == 2 or (x.ndim == 3 and x.shape[2] in (1, 3))) |
|
|
and x.dtype != object |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_professional_background(key_or_cfg: Any, width: int, height: int) -> np.ndarray: |
|
|
if isinstance(key_or_cfg, str): |
|
|
cfg = PROFESSIONAL_BACKGROUNDS_LOCAL.get(key_or_cfg, PROFESSIONAL_BACKGROUNDS_LOCAL["office"]) |
|
|
elif isinstance(key_or_cfg, dict): |
|
|
cfg = key_or_cfg |
|
|
else: |
|
|
cfg = PROFESSIONAL_BACKGROUNDS_LOCAL["office"] |
|
|
|
|
|
color = tuple(int(x) for x in cfg.get("color", (255, 255, 255))) |
|
|
use_grad = bool(cfg.get("gradient", False)) |
|
|
|
|
|
if not use_grad: |
|
|
return np.full((height, width, 3), color, dtype=np.uint8) |
|
|
|
|
|
dark = (int(color[0]*0.7), int(color[1]*0.7), int(color[2]*0.7)) |
|
|
return _vertical_gradient(dark, color, width, height) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _simple_person_segmentation(frame_bgr: np.ndarray) -> np.ndarray: |
|
|
hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
lower_green = np.array([40, 40, 40], dtype=np.uint8) |
|
|
upper_green = np.array([80, 255, 255], dtype=np.uint8) |
|
|
green_mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
|
|
|
lower_white = np.array([0, 0, 200], dtype=np.uint8) |
|
|
upper_white = np.array([180, 30, 255], dtype=np.uint8) |
|
|
white_mask = cv2.inRange(hsv, lower_white, upper_white) |
|
|
|
|
|
bg_mask = cv2.bitwise_or(green_mask, white_mask) |
|
|
person_mask = cv2.bitwise_not(bg_mask) |
|
|
|
|
|
kernel = np.ones((5, 5), np.uint8) |
|
|
person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_CLOSE, kernel) |
|
|
person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_OPEN, kernel) |
|
|
|
|
|
return (person_mask.astype(np.float32) / 255.0) |
|
|
|
|
|
def segment_person_hq( |
|
|
frame: np.ndarray, |
|
|
predictor: Optional[Any] = None, |
|
|
fallback_enabled: bool = True, |
|
|
|
|
|
use_sam2: Optional[bool] = None, |
|
|
**_compat_kwargs, |
|
|
) -> np.ndarray: |
|
|
try: |
|
|
if use_sam2 is False: |
|
|
return _simple_person_segmentation(frame) |
|
|
|
|
|
if predictor is not None and hasattr(predictor, "set_image") and hasattr(predictor, "predict"): |
|
|
rgb = _ensure_rgb(frame) |
|
|
predictor.set_image(rgb) |
|
|
h, w = rgb.shape[:2] |
|
|
center = np.array([[w // 2, h // 2]]) |
|
|
labels = np.array([1]) |
|
|
masks, scores, _ = predictor.predict( |
|
|
point_coords=center, |
|
|
point_labels=labels, |
|
|
multimask_output=True |
|
|
) |
|
|
m = np.array(masks) |
|
|
if m.ndim == 3: |
|
|
idx = int(np.argmax(scores)) if scores is not None else 0 |
|
|
m = m[idx] |
|
|
elif m.ndim != 2: |
|
|
raise RuntimeError(f"Unexpected SAM2 mask shape: {m.shape}") |
|
|
return _to_mask01(m) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning("SAM2 segmentation failed: %s", e) |
|
|
|
|
|
return _simple_person_segmentation(frame) if fallback_enabled else np.ones(frame.shape[:2], dtype=np.float32) |
|
|
|
|
|
segment_person_hq_original = segment_person_hq |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _to_tensor_chw(img_uint8_bgr: np.ndarray) -> "torch.Tensor": |
|
|
import torch |
|
|
rgb = cv2.cvtColor(img_uint8_bgr, cv2.COLOR_BGR2RGB) |
|
|
return torch.from_numpy(rgb).permute(2, 0, 1).contiguous().float() / 255.0 |
|
|
|
|
|
def _mask_to_tensor01(mask01: np.ndarray) -> "torch.Tensor": |
|
|
import torch |
|
|
return torch.from_numpy(mask01.astype(np.float32)).unsqueeze(0).unsqueeze(0) |
|
|
|
|
|
def _tensor_to_mask01(t: "torch.Tensor") -> np.ndarray: |
|
|
import torch |
|
|
if t.ndim == 4: |
|
|
t = t[0, 0] |
|
|
elif t.ndim == 3: |
|
|
t = t[0] |
|
|
return np.clip(t.detach().float().cpu().numpy(), 0.0, 1.0) |
|
|
|
|
|
def _remap_harden(mask01: np.ndarray, inside: float = 0.70, outside: float = 0.35) -> np.ndarray: |
|
|
""" |
|
|
Pull the mask toward {0,1} to avoid 'ghost' translucency. |
|
|
Values <= outside -> 0; >= inside -> 1; linear in between. |
|
|
""" |
|
|
m = mask01.astype(np.float32) |
|
|
if inside <= outside: |
|
|
return m |
|
|
m = (m - outside) / max(1e-6, (inside - outside)) |
|
|
return np.clip(m, 0.0, 1.0) |
|
|
|
|
|
def _pad_and_smooth_edges(mask01: np.ndarray, dilate_px: int = 6, edge_blur_px: int = 2) -> np.ndarray: |
|
|
m = (mask01 * 255.0).astype(np.uint8) |
|
|
if dilate_px > 0: |
|
|
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (dilate_px, dilate_px)) |
|
|
m = cv2.dilate(m, k, iterations=1) |
|
|
if edge_blur_px > 0: |
|
|
ksize = edge_blur_px * 2 + 1 |
|
|
m = cv2.GaussianBlur(m, (ksize, ksize), 0) |
|
|
return (m.astype(np.float32) / 255.0) |
|
|
|
|
|
def _try_matanyone_refine( |
|
|
matanyone: Any, |
|
|
frame_bgr: np.ndarray, |
|
|
mask01: np.ndarray |
|
|
) -> Optional[np.ndarray]: |
|
|
""" |
|
|
Try several MatAnyOne interfaces: |
|
|
1) InferenceCore.infer(PIL_image, PIL_mask) |
|
|
2) .step(image_tensor=NCHW, mask_tensor=NCHW) |
|
|
3) .process(image_np, mask_np) |
|
|
4) callable(image_tensor, mask_tensor) β tensor |
|
|
Returns refined mask01 (np.ndarray) or None if not usable. |
|
|
""" |
|
|
try: |
|
|
|
|
|
if hasattr(matanyone, "infer"): |
|
|
try: |
|
|
from PIL import Image |
|
|
img_pil = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)) |
|
|
m_pil = Image.fromarray((mask01 * 255.0).astype(np.uint8)) |
|
|
out_pil = matanyone.infer(img_pil, m_pil) |
|
|
out_np = np.asarray(out_pil).astype(np.float32) |
|
|
return _to_mask01(out_np) |
|
|
except Exception as e: |
|
|
logger.debug("MatAnyOne.infer path failed: %s", e) |
|
|
|
|
|
|
|
|
if hasattr(matanyone, "step"): |
|
|
import torch |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
img_t = _to_tensor_chw(frame_bgr).unsqueeze(0).to(device) |
|
|
mask_t = _mask_to_tensor01(mask01).to(device) |
|
|
with torch.inference_mode(): |
|
|
out = matanyone.step( |
|
|
image_tensor=img_t, |
|
|
mask_tensor=mask_t, |
|
|
objects=None, |
|
|
first_frame_pred=True |
|
|
) |
|
|
if hasattr(matanyone, "output_prob_to_mask"): |
|
|
out = matanyone.output_prob_to_mask(out) |
|
|
return _tensor_to_mask01(out) |
|
|
|
|
|
|
|
|
if hasattr(matanyone, "process"): |
|
|
out = matanyone.process(frame_bgr, mask01) |
|
|
return _to_mask01(np.asarray(out)) |
|
|
|
|
|
|
|
|
if callable(matanyone): |
|
|
import torch |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
img_t = _to_tensor_chw(frame_bgr).unsqueeze(0).to(device) |
|
|
mask_t = _mask_to_tensor01(mask01).to(device) |
|
|
with torch.inference_mode(): |
|
|
out = matanyone(img_t, mask_t) |
|
|
return _tensor_to_mask01(out) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning("MatAnyOne refine error: %s", e) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def refine_mask_hq( |
|
|
frame: np.ndarray, |
|
|
mask: np.ndarray, |
|
|
matanyone: Optional[Any] = None, |
|
|
fallback_enabled: bool = True, |
|
|
|
|
|
use_matanyone: Optional[bool] = None, |
|
|
**_compat_kwargs, |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Refine single-channel mask with MatAnyOne if available. |
|
|
Backward-compat: |
|
|
- accepts use_matanyone (False β skip model) |
|
|
- tolerates legacy arg order refine_mask_hq(mask, frame, ...) |
|
|
""" |
|
|
|
|
|
if _looks_like_mask(frame) and _looks_like_mask(mask) and mask.ndim == 3 and mask.shape[2] == 3: |
|
|
frame, mask = mask, frame |
|
|
|
|
|
mask01 = _to_mask01(mask) |
|
|
|
|
|
|
|
|
if use_matanyone is not False and matanyone is not None: |
|
|
refined = _try_matanyone_refine(matanyone, frame, mask01) |
|
|
if refined is not None: |
|
|
|
|
|
refined = _remap_harden(refined, inside=0.70, outside=0.35) |
|
|
refined = _pad_and_smooth_edges(refined, dilate_px=4, edge_blur_px=1) |
|
|
return refined |
|
|
else: |
|
|
logger.warning("MatAnyOne provided but no usable interface found; falling back.") |
|
|
|
|
|
|
|
|
m = (mask01 * 255.0).astype(np.uint8) |
|
|
m = cv2.GaussianBlur(m, (5, 5), 0) |
|
|
m = cv2.bilateralFilter(m, 9, 75, 75) |
|
|
m = (m.astype(np.float32) / 255.0) |
|
|
m = _remap_harden(m, inside=0.68, outside=0.40) |
|
|
m = _pad_and_smooth_edges(m, dilate_px=3, edge_blur_px=1) |
|
|
return m if fallback_enabled else mask01 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def replace_background_hq( |
|
|
frame: np.ndarray, |
|
|
mask01: np.ndarray, |
|
|
background: np.ndarray, |
|
|
fallback_enabled: bool = True, |
|
|
**_compat, |
|
|
) -> np.ndarray: |
|
|
try: |
|
|
H, W = frame.shape[:2] |
|
|
if background.shape[:2] != (H, W): |
|
|
background = cv2.resize(background, (W, H), interpolation=cv2.INTER_LANCZOS4) |
|
|
|
|
|
m = _to_mask01(mask01) |
|
|
|
|
|
m = _feather(m, k=1) |
|
|
m3 = np.repeat(m[:, :, None], 3, axis=2) |
|
|
|
|
|
comp = frame.astype(np.float32) * m3 + background.astype(np.float32) * (1.0 - m3) |
|
|
return np.clip(comp, 0, 255).astype(np.uint8) |
|
|
except Exception as e: |
|
|
if fallback_enabled: |
|
|
logger.warning("Compositing failed (%s) β returning original frame", e) |
|
|
return frame |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_video_file(video_path: str) -> Tuple[bool, str]: |
|
|
if not video_path or not Path(video_path).exists(): |
|
|
return False, "Video file not found" |
|
|
|
|
|
try: |
|
|
size = Path(video_path).stat().st_size |
|
|
if size == 0: |
|
|
return False, "File is empty" |
|
|
if size > 2 * 1024 * 1024 * 1024: |
|
|
return False, "File > 2 GB β too large for the Space quota" |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return False, "OpenCV cannot read the file" |
|
|
|
|
|
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
cap.release() |
|
|
|
|
|
if n_frames == 0: |
|
|
return False, "No frames detected" |
|
|
if fps <= 0 or fps > 120: |
|
|
return False, f"Suspicious FPS: {fps}" |
|
|
if w <= 0 or h <= 0: |
|
|
return False, "Zero resolution" |
|
|
if w > 4096 or h > 4096: |
|
|
return False, f"Resolution {w}Γ{h} too high (max 4 096Β²)" |
|
|
if (n_frames / fps) > 300: |
|
|
return False, "Video longer than 5 minutes" |
|
|
|
|
|
return True, f"OK β {w}Γ{h}, {fps:.1f} fps, {n_frames/fps:.1f} s" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"validate_video_file: {e}") |
|
|
return False, f"Validation error: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"segment_person_hq", |
|
|
"segment_person_hq_original", |
|
|
"refine_mask_hq", |
|
|
"replace_background_hq", |
|
|
"create_professional_background", |
|
|
"validate_video_file", |
|
|
"PROFESSIONAL_BACKGROUNDS", |
|
|
] |
|
|
|