MogensR's picture
Update processing/video/video_processor.py
fb41e40
raw
history blame
26.9 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Core Video Processor for BackgroundFX Pro
- Minimal, safe implementation used by core/app.py
- Works with split/legacy loaders
- Keeps exact behavior you shared; only fixes typing imports + integrates
prepare_background() and related helpers.
NOTE:
- Requires utils.cv_processing helpers already present in your project:
segment_person_hq, refine_mask_hq, replace_background_hq,
create_professional_background, PROFESSIONAL_BACKGROUNDS,
_create_gradient_background_local, validate_video_file
"""
from __future__ import annotations
import os
import time
import shutil
import logging
import threading
from typing import Optional, Any, Dict, Callable, Tuple, List
import numpy as np
import cv2
# ---------------------------------------------------------------------
# Project logger (non-fatal fallback to std logging)
# ---------------------------------------------------------------------
try:
from utils.logger import get_logger
_log = get_logger("processing.video.video_processor")
except Exception:
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger("processing.video.video_processor")
# ---------------------------------------------------------------------
# Config type (import if available; otherwise annotations are postponed)
# ---------------------------------------------------------------------
try:
from config.processor_config import ProcessorConfig # your project config
except Exception: # keep runtime happy if only used for typing
ProcessorConfig = Any # type: ignore
# ---------------------------------------------------------------------
# Small env helpers (use project ones if you have them)
# ---------------------------------------------------------------------
try:
from utils.system.env_utils import env_bool as _env_bool # type: ignore
from utils.system.env_utils import env_int as _env_int # type: ignore
except Exception:
def _env_bool(name: str, default: bool = False) -> bool:
v = os.environ.get(name)
if v is None:
return bool(default)
return str(v).strip().lower() in ("1", "true", "yes", "y", "on")
def _env_int(name: str, default: int = 0) -> int:
try:
return int(os.environ.get(name, default))
except Exception:
return int(default)
# ---------------------------------------------------------------------
# CV helpers from your utils module
# ---------------------------------------------------------------------
from utils.cv_processing import (
segment_person_hq,
refine_mask_hq,
replace_background_hq,
create_professional_background,
PROFESSIONAL_BACKGROUNDS,
validate_video_file,
)
# Optional local gradient helper (present in some layouts)
try:
from utils.cv_processing import _create_gradient_background_local # type: ignore
except Exception:
_create_gradient_background_local = None # type: ignore
# ---------------------------------------------------------------------
# Optional FFmpeg pipe; code falls back to OpenCV if unavailable
# ---------------------------------------------------------------------
try:
from utils.video.ffmpeg_pipe import FFmpegPipe as _FFmpegPipe # type: ignore
except Exception:
_FFmpegPipe = None # type: ignore
class CoreVideoProcessor:
"""
Minimal, safe implementation used by core/app.py.
Orchestrates SAM2 → MatAnyone refinement → background compositing,
with robust fallbacks (OpenCV writer when FFmpeg/NVENC unavailable).
"""
def __init__(self, config: Optional[ProcessorConfig] = None, models: Optional[Any] = None):
self.log = _log
self.config = config or ProcessorConfig()
self.models = models
if self.models is None:
self.log.warning("CoreVideoProcessor initialized without a models provider; will use fallbacks.")
self._ffmpeg = shutil.which("ffmpeg")
# -------- Back-compat safe config flags (do not require attrs on user config)
self._use_windowed = _env_bool(
"MATANYONE_WINDOWED",
bool(getattr(self.config, "use_windowed", False)),
)
self._window_size = max(1, _env_int("MATANYONE_WINDOW", int(getattr(self.config, "window_size", 8))))
self._max_model_size = int(os.environ.get("MAX_MODEL_SIZE", getattr(self.config, "max_model_size", 1280) or 0)) or None
# state for temporal smoothing
self._prev_mask: Optional[np.ndarray] = None
# Legacy per-frame stateful chunking (used only if windowed=False)
try:
self._chunk_size = max(1, int(os.environ.get("MATANYONE_CHUNK", "12")))
except Exception:
self._chunk_size = 12
self._chunk_idx = 0
# ---------------- ADDED METHOD ----------------
def prepare_background(self, background_choice: str, custom_background_path: Optional[str], width: int, height: int) -> np.ndarray:
"""
Prepares a background image for compositing.
If a valid custom background path is given, loads and resizes it. Otherwise, uses a preset.
Returns: np.ndarray RGB (H, W, 3) uint8
"""
from utils.cv_processing import create_professional_background
if custom_background_path:
try:
img = cv2.imread(custom_background_path, cv2.IMREAD_COLOR)
if img is not None:
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (width, height), interpolation=cv2.INTER_LANCZOS4)
return img
else:
self.log.warning(f"Failed to load custom background from '{custom_background_path}', using preset.")
except Exception as e:
self.log.warning(f"Exception loading custom background: {e}, using preset.")
# fallback to preset
return create_professional_background(background_choice, width, height)
# ---------- mask post-processing (stability + crispness) ----------
def _iou(self, a: np.ndarray, b: np.ndarray, thr: float = 0.5) -> float:
a_bin = (a >= thr).astype(np.uint8)
b_bin = (b >= thr).astype(np.uint8)
inter = np.count_nonzero(cv2.bitwise_and(a_bin, b_bin))
union = np.count_nonzero(cv2.bitwise_or(a_bin, b_bin))
return (inter / union) if union else 0.0
def _harden(self, m: np.ndarray) -> np.ndarray:
g = float(getattr(self.config, "mask_gamma", 0.90))
if abs(g - 1.0) > 1e-6:
m = np.clip(m, 0, 1) ** g
lo = float(getattr(self.config, "hard_low", 0.35))
hi = float(getattr(self.config, "hard_high", 0.70))
if hi > lo + 1e-6:
m = (m - lo) / (hi - lo)
m = np.clip(m, 0.0, 1.0)
k = int(getattr(self.config, "dilate_px", 6))
if k > 0:
se = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*k+1, 2*k+1))
m = cv2.dilate(m, se, iterations=1)
eb = int(getattr(self.config, "edge_blur_px", 1))
if eb > 0:
m = cv2.GaussianBlur(m, (2*eb+1, 2*eb+1), 0)
return np.clip(m, 0.0, 1.0)
def _stabilize(self, m: np.ndarray) -> np.ndarray:
if self._prev_mask is None:
self._prev_mask = m
return m
thr = float(getattr(self.config, "min_iou_to_accept", 0.05))
if self._iou(self._prev_mask, m, 0.5) < thr:
return self._prev_mask
a = float(getattr(self.config, "temporal_ema_alpha", 0.75))
m_ema = a * self._prev_mask + (1.0 - a) * m
self._prev_mask = m_ema
return m_ema
# ---------- Single frame (fallback path) ----------
def process_frame(self, frame_bgr: np.ndarray, background_rgb: np.ndarray) -> Dict[str, Any]:
H, W = frame_bgr.shape[:2]
max_side = max(H, W)
scale = 1.0
proc_frame_bgr = frame_bgr
# Model-only downscale
mms = self._max_model_size
if mms and max_side > mms:
scale = mms / float(max_side)
newW = int(round(W * scale))
newH = int(round(H * scale))
proc_frame_bgr = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA)
self.log.debug(f"Model-only downscale: {W}x{H} -> {newW}x{newH} (scale={scale:.3f})")
proc_frame_rgb = cv2.cvtColor(proc_frame_bgr, cv2.COLOR_BGR2RGB)
predictor = None
try:
if self.models and hasattr(self.models, "get_sam2"):
predictor = self.models.get_sam2()
except Exception as e:
self.log.warning(f"SAM2 predictor unavailable: {e}")
mask_small = segment_person_hq(proc_frame_rgb, predictor, use_sam2=True)
matanyone = None
try:
if self.models and hasattr(self.models, "get_matanyone"):
matanyone = self.models.get_matanyone()
except Exception as e:
self.log.warning(f"MatAnyOne unavailable: {e}")
if matanyone is not None and hasattr(matanyone, "reset") and self._chunk_idx == 0:
try:
matanyone.reset()
except Exception:
pass
mask_small_ref = refine_mask_hq(
proc_frame_rgb,
mask_small,
matanyone=matanyone,
use_matanyone=True,
frame_idx=self._chunk_idx,
)
self._chunk_idx = (self._chunk_idx + 1) % max(1, self._chunk_size)
if self._chunk_idx == 0:
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
mask_small_ref = np.clip(mask_small_ref.astype(np.float32), 0.0, 1.0)
mask_stable = self._stabilize(mask_small_ref)
mask_stable = self._harden(mask_stable)
if scale != 1.0:
mask_full = cv2.resize(mask_stable, (W, H), interpolation=cv2.INTER_LINEAR)
else:
mask_full = mask_stable
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
out_rgb = replace_background_hq(frame_rgb, mask_full, background_rgb)
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR)
return {"frame": out_bgr, "mask": mask_full}
# ---------- Build background once per video ----------
def _prepare_background_from_config(
self, bg_config: Optional[Dict[str, Any]], width: int, height: int
) -> np.ndarray:
if bg_config and bg_config.get("custom_path"):
path = bg_config["custom_path"]
img_bgr = cv2.imread(path, cv2.IMREAD_COLOR)
if img_bgr is None:
self.log.warning(f"Custom background at '{path}' could not be read. Falling back to preset.")
else:
img_bgr = cv2.resize(img_bgr, (width, height), interpolation=cv2.INTER_LANCZOS4)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
if bg_config and isinstance(bg_config.get("gradient"), dict) and _create_gradient_background_local:
try:
return _create_gradient_background_local(bg_config["gradient"], width, height)
except Exception as e:
self.log.warning(f"Gradient generation failed: {e}. Falling back to preset.")
choice = None
if bg_config and "background_choice" in bg_config:
choice = bg_config["background_choice"]
if not choice:
choice = getattr(self.config, "background_preset", "office")
if choice not in PROFESSIONAL_BACKGROUNDS:
self.log.warning(f"Unknown background preset '{choice}'; using 'office'.")
choice = "office"
return create_professional_background(choice, width, height) # RGB
# ---------- Windowed two-phase helpers ----------
def _model_downscale(self, frame_bgr: np.ndarray) -> Tuple[np.ndarray, float]:
H, W = frame_bgr.shape[:2]
max_side = max(H, W)
mms = self._max_model_size
if mms and max_side > mms:
s = mms / float(max_side)
newW = int(round(W * s))
newH = int(round(H * s))
small = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA)
return small, s
return frame_bgr, 1.0
def _prepare_sam2_gpu(self, predictor):
try:
import torch
if predictor is None or not torch.cuda.is_available():
return
if hasattr(predictor, "to"):
try:
predictor.to("cuda") # type: ignore[attr-defined]
return
except Exception:
pass
if hasattr(predictor, "model") and hasattr(predictor.model, "to"):
try:
predictor.model.to("cuda") # type: ignore[attr-defined]
except Exception:
pass
except Exception:
pass
def _release_sam2_gpu(self, predictor):
try:
if predictor is None:
return
for name in ("reset_image", "release_image", "clear_image", "clear_state"):
if hasattr(predictor, name) and callable(getattr(predictor, name)):
try:
getattr(predictor, name)()
except Exception:
pass
for name in ("to", "cpu"):
if hasattr(predictor, name):
try:
if name == "to":
predictor.to("cpu") # type: ignore[attr-defined]
else:
predictor.cpu() # type: ignore[attr-defined]
except Exception:
pass
except Exception:
pass
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
# ---------- Full video ----------
def process_video(
self,
input_path: str,
output_path: str,
bg_config: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable[[int, int, float], None]] = None,
stop_event: Optional[threading.Event] = None
) -> Dict[str, Any]:
ok, msg = validate_video_file(input_path)
if not ok:
raise ValueError(f"Invalid or unreadable video: {msg}")
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise RuntimeError(f"Could not open video: {input_path}")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps_out = getattr(self.config, "write_fps", None) or (fps if fps and fps > 0 else 25.0)
background_rgb = self._prepare_background_from_config(bg_config, width, height)
self._prev_mask = None
ffmpeg_pipe: _FFmpegPipe | None = None # type: ignore
writer: cv2.VideoWriter | None = None
ffmpeg_failed_reason = None
if getattr(self.config, "use_nvenc", True) and shutil.which("ffmpeg") and _FFmpegPipe is not None:
try:
ffmpeg_pipe = _FFmpegPipe(width, height, float(fps_out), output_path, self.config, log=self.log) # type: ignore
except Exception as e:
ffmpeg_failed_reason = str(e)
self.log.warning("FFmpeg NVENC pipeline unavailable. Falling back to OpenCV. Reason: %s", e)
if ffmpeg_pipe is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height))
if not writer.isOpened():
cap.release()
raise RuntimeError(f"Could not open VideoWriter for: {output_path}")
predictor = None
matanyone = None
try:
if self.models and hasattr(self.models, "get_sam2"):
predictor = self.models.get_sam2()
except Exception as e:
self.log.warning(f"SAM2 predictor unavailable: {e}")
try:
if self.models and hasattr(self.models, "get_matanyone"):
matanyone = self.models.get_matanyone()
except Exception as e:
self.log.warning(f"MatAnyOne unavailable: {e}")
use_windowed = bool(self._use_windowed and predictor is not None and matanyone is not None)
frame_count = 0
start_time = time.time()
try:
if not use_windowed:
while True:
ret, frame_bgr = cap.read()
if not ret:
break
if stop_event is not None and stop_event.is_set():
self.log.info("Processing stopped by user request.")
break
result = self.process_frame(frame_bgr, background_rgb)
out_bgr = np.ascontiguousarray(result["frame"])
if ffmpeg_pipe is not None:
try:
ffmpeg_pipe.write(out_bgr) # type: ignore[attr-defined]
except Exception as e:
self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e)
try:
ffmpeg_pipe.close() # type: ignore[attr-defined]
except Exception:
pass
ffmpeg_pipe = None
if writer is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height))
if not writer.isOpened():
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}")
writer.write(out_bgr)
else:
writer.write(out_bgr)
frame_count += 1
if progress_callback:
elapsed = time.time() - start_time
fps_live = frame_count / elapsed if elapsed > 0 else 0.0
try:
progress_callback(frame_count, total_frames, fps_live)
except Exception:
pass
else:
WINDOW = max(1, int(self._window_size))
while True:
frames_bgr: List[np.ndarray] = []
for _ in range(WINDOW):
ret, fr = cap.read()
if not ret:
break
frames_bgr.append(fr)
if not frames_bgr:
break
if stop_event is not None and stop_event.is_set():
self.log.info("Processing stopped by user request.")
break
frames_small_bgr: List[np.ndarray] = []
scales: List[float] = []
for fr in frames_bgr:
fr_small, s = self._model_downscale(fr)
frames_small_bgr.append(fr_small)
scales.append(s)
scale = scales[0] if scales else 1.0
frames_small_rgb = [cv2.cvtColor(fb, cv2.COLOR_BGR2RGB) for fb in frames_small_bgr]
self._prepare_sam2_gpu(predictor)
try:
mask_small = segment_person_hq(frames_small_rgb[0], predictor, use_sam2=True)
except Exception as e:
self.log.warning(f"SAM2 segmentation error on window start: {e}")
mask_small = segment_person_hq(frames_small_rgb[0], None, use_sam2=False)
self._release_sam2_gpu(predictor)
if hasattr(matanyone, "reset"):
try:
matanyone.reset()
except Exception:
pass
for j, fr_rgb_small in enumerate(frames_small_rgb):
try:
if j == 0:
m2d = mask_small
if m2d.ndim == 3:
m2d = m2d[..., 0]
alpha_small = matanyone(fr_rgb_small, m2d)
else:
alpha_small = matanyone(fr_rgb_small)
alpha_small = np.clip(alpha_small.astype(np.float32), 0.0, 1.0)
alpha_stable = self._stabilize(alpha_small)
alpha_harden = self._harden(alpha_stable)
if scale != 1.0:
H, W = frames_bgr[j].shape[:2]
alpha_full = cv2.resize(alpha_harden, (W, H), interpolation=cv2.INTER_LINEAR)
else:
alpha_full = alpha_harden
frame_rgb_full = cv2.cvtColor(frames_bgr[j], cv2.COLOR_BGR2RGB)
out_rgb = replace_background_hq(frame_rgb_full, alpha_full, background_rgb)
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR)
out_bgr = np.ascontiguousarray(out_bgr)
if ffmpeg_pipe is not None:
try:
ffmpeg_pipe.write(out_bgr) # type: ignore[attr-defined]
except Exception as e:
self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e)
try:
ffmpeg_pipe.close() # type: ignore[attr-defined]
except Exception:
pass
ffmpeg_pipe = None
if writer is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height))
if not writer.isOpened():
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}")
writer.write(out_bgr)
else:
writer.write(out_bgr)
frame_count += 1
except Exception as e:
self.log.warning(f"MatAnyone failed at window frame {j}: {e}")
if j == 0:
alpha_small_fb = np.clip(mask_small.astype(np.float32), 0.0, 1.0)
else:
alpha_small_fb = self._prev_mask if self._prev_mask is not None else np.zeros_like(alpha_small, dtype=np.float32)
if scale != 1.0:
H, W = frames_bgr[j].shape[:2]
alpha_full_fb = cv2.resize(alpha_small_fb, (W, H), interpolation=cv2.INTER_LINEAR)
else:
alpha_full_fb = alpha_small_fb
frame_rgb_full = cv2.cvtColor(frames_bgr[j], cv2.COLOR_BGR2RGB)
out_rgb_fb = replace_background_hq(frame_rgb_full, alpha_full_fb, background_rgb)
out_bgr_fb = cv2.cvtColor(out_rgb_fb, cv2.COLOR_RGB2BGR)
if ffmpeg_pipe is not None:
try:
ffmpeg_pipe.write(np.ascontiguousarray(out_bgr_fb)) # type: ignore[attr-defined]
except Exception:
try:
ffmpeg_pipe.close() # type: ignore[attr-defined]
except Exception:
pass
ffmpeg_pipe = None
if writer is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height))
if not writer.isOpened():
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}")
writer.write(np.ascontiguousarray(out_bgr_fb))
else:
writer.write(np.ascontiguousarray(out_bgr_fb))
frame_count += 1
if progress_callback:
elapsed = time.time() - start_time
fps_live = frame_count / elapsed if elapsed > 0 else 0.0
try:
progress_callback(frame_count, total_frames, fps_live)
except Exception:
pass
del frames_bgr, frames_small_bgr, frames_small_rgb, mask_small
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
finally:
cap.release()
if writer is not None:
writer.release()
if ffmpeg_pipe is not None:
try:
ffmpeg_pipe.close() # type: ignore[attr-defined]
except Exception:
pass
if ffmpeg_failed_reason:
self.log.info("Completed via OpenCV writer (FFmpeg initially failed): %s", ffmpeg_failed_reason)
self.log.info("Processed %d frames → %s", frame_count, output_path)
return {
"frames": frame_count,
"width": width,
"height": height,
"fps_out": float(fps_out),
"output_path": output_path,
}