MogensR's picture
Update processing/video/video_processor.py
ee9591a
raw
history blame
23 kB
#!/usr/bin/env python3
"""
Compatibility shim: CoreVideoProcessor (stabilized + crisper edges)
- Accepts background configs:
{"custom_path": "/path/to/image.png"}
{"background_choice": "<preset_key>"}
{"gradient": {type, start, end, angle_deg}}
- Model-only downscale (max_model_size) for speed, full-res render.
- FFmpeg pipe writer with encoder fallbacks and stderr surfacing; falls back
to OpenCV VideoWriter if FFmpeg isn't available or fails mid-run.
- Temporal smoothing + mask hardening to avoid flicker/ghosting.
Requirements for the models provider:
- get_sam2() -> predictor or None
- get_matanyone() -> processor or None
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Dict, Any, Callable
import time
import threading
import shutil
import subprocess
import shlex
import cv2
import numpy as np
# Try project logger; fall back to std logging
try:
from utils.logging_setup import make_logger
_log = make_logger(__name__)
except Exception:
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s")
_log = logging.getLogger(__name__)
# Import directly from utils.cv_processing to avoid circular imports via utils/__init__.py
from utils.cv_processing import (
segment_person_hq,
refine_mask_hq,
replace_background_hq,
create_professional_background,
validate_video_file,
PROFESSIONAL_BACKGROUNDS,
)
# ---------- local gradient helper (no extra imports needed) ----------
def _to_rgb(c):
if isinstance(c, (list, tuple)) and len(c) == 3:
return tuple(int(x) for x in c)
if isinstance(c, str) and c.startswith("#") and len(c) == 7:
return tuple(int(c[i:i+2], 16) for i in (1, 3, 5))
return (255, 255, 255)
def _create_gradient_background_local(spec: Dict[str, Any], width: int, height: int) -> np.ndarray:
"""
Minimal gradient generator for backgrounds (linear with rotation).
spec = {"type": "linear"|"radial"(ignored), "start": (r,g,b)|"#rrggbb", "end": ..., "angle_deg": float}
Returns RGB np.uint8 (H,W,3)
"""
start = _to_rgb(spec.get("start", "#222222"))
end = _to_rgb(spec.get("end", "#888888"))
angle = float(spec.get("angle_deg", 0))
# build vertical gradient
bg = np.zeros((height, width, 3), np.uint8)
for y in range(height):
t = y / max(1, height - 1)
r = int(start[0]*(1-t) + end[0]*t)
g = int(start[1]*(1-t) + end[1]*t)
b = int(start[2]*(1-t) + end[2]*t)
bg[y, :] = (r, g, b)
if abs(angle) % 360 < 1e-6:
return bg
# rotate by angle using OpenCV (RGB-safe)
center = (width / 2, height / 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rot = cv2.warpAffine(bg, M, (width, height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
return rot
@dataclass
class ProcessorConfig:
background_preset: str = "office" # key in PROFESSIONAL_BACKGROUNDS
write_fps: Optional[float] = None # None -> keep source fps
# Model-only downscale (speedup without changing output resolution)
max_model_size: Optional[int] = 1280
# FFmpeg / NVENC output (pipe). If disabled or unavailable, use OpenCV writer.
use_nvenc: bool = True
nvenc_codec: str = "h264" # "h264" or "hevc"
nvenc_preset: str = "p5" # NVENC preset string
nvenc_cq: int = 18 # constant quality (lower = higher quality)
nvenc_tune_hq: bool = True
nvenc_pix_fmt: str = "yuv420p" # browser-safe
# libx264 fallback
x264_preset: str = "medium"
x264_crf: int = 18
x264_pix_fmt: str = "yuv420p"
movflags_faststart: bool = True
# ---------- stability & edge quality ----------
temporal_ema_alpha: float = 0.75 # higher = calmer (0.6–0.85 typical)
min_iou_to_accept: float = 0.05 # reject sudden mask jumps
dilate_px: int = 6 # pad edges to keep hair/ears/shoulders
edge_blur_px: int = 1 # tiny blur to calm edge shimmer
# hardening (turn soft mask into crisper 0/1)
hard_low: float = 0.35 # values below -> 0
hard_high: float = 0.70 # values above -> 1
mask_gamma: float = 0.90 # <1 boosts mid-tones slightly
class _FFmpegPipe:
"""
Wrapper around an FFmpeg stdin pipe with encoder fallbacks and good error messages.
"""
def __init__(self, width: int, height: int, fps: float, out_path: str, cfg: ProcessorConfig, log=_log):
self.width = int(width)
self.height = int(height)
self.fps = float(fps) if fps and fps > 0 else 25.0
self.out_path = out_path
self.cfg = cfg
self.log = log
self.proc: Optional[subprocess.Popen] = None
self.encoder_used: Optional[str] = None
self._stderr: bytes | None = None
self._ffmpeg = shutil.which("ffmpeg")
if not self._ffmpeg:
raise RuntimeError("ffmpeg not found on PATH")
self._start_with_fallbacks()
def _cmd_for_encoder(self, encoder: str) -> list[str]:
base = [
self._ffmpeg,
"-hide_banner", "-loglevel", "error",
"-y",
# rawvideo input from stdin
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{self.width}x{self.height}",
"-r", f"{self.fps}",
"-i", "-", # stdin
"-an", # no audio here
]
if self.cfg.movflags_faststart:
base += ["-movflags", "+faststart"]
if encoder == "h264_nvenc":
base += [
"-c:v", "h264_nvenc",
"-preset", self.cfg.nvenc_preset,
"-cq", str(int(self.cfg.nvenc_cq)),
"-pix_fmt", self.cfg.nvenc_pix_fmt,
]
if self.cfg.nvenc_tune_hq:
base += ["-tune", "hq"]
elif encoder == "hevc_nvenc":
base += [
"-c:v", "hevc_nvenc",
"-preset", self.cfg.nvenc_preset,
"-cq", str(int(self.cfg.nvenc_cq)),
"-pix_fmt", self.cfg.nvenc_pix_fmt,
]
if self.cfg.nvenc_tune_hq:
base += ["-tune", "hq"]
elif encoder == "libx264":
base += [
"-c:v", "libx264",
"-preset", self.cfg.x264_preset,
"-crf", str(int(self.cfg.x264_crf)),
"-pix_fmt", self.cfg.x264_pix_fmt,
]
elif encoder == "mpeg4":
base += [
"-c:v", "mpeg4",
"-q:v", "2",
"-pix_fmt", "yuv420p",
]
else:
base += ["-c:v", "libx264", "-preset", self.cfg.x264_preset, "-crf", str(int(self.cfg.x264_crf)), "-pix_fmt", self.cfg.x264_pix_fmt]
base += [self.out_path]
return base
def _try_start(self, enc: str) -> bool:
cmd = self._cmd_for_encoder(enc)
try:
self.proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**7,
)
self.encoder_used = enc
self.log.info("FFmpeg started: %s", " ".join(shlex.quote(c) for c in cmd))
# quick poll: if ffmpeg dies immediately, fail fast
time.sleep(0.05)
if self.proc.poll() is not None:
self._stderr = self.proc.stderr.read() if self.proc.stderr else b""
self.log.warning("FFmpeg exited on start with %s: %s", enc, (self._stderr or b"").decode(errors="ignore"))
self.proc = None
return False
return True
except Exception as e:
self.log.warning("Failed to start FFmpeg with %s: %s", enc, e)
self.proc = None
return False
def _start_with_fallbacks(self):
encoders = []
if self.cfg.use_nvenc:
encoders += ["h264_nvenc"] if self.cfg.nvenc_codec.lower() == "h264" else ["hevc_nvenc"]
encoders += ["libx264", "mpeg4"]
for enc in encoders:
if self._try_start(enc):
return
msg = "Could not start FFmpeg with any encoder (nvenc/libx264/mpeg4). Is ffmpeg present and codecs available?"
if self._stderr:
msg += f" Stderr: {(self._stderr or b'').decode(errors='ignore')[:500]}"
raise RuntimeError(msg)
def write(self, frame_bgr: np.ndarray):
if self.proc is None or self.proc.stdin is None:
raise RuntimeError("FFmpeg process is not running (stdin is None).")
if not isinstance(frame_bgr, np.ndarray) or frame_bgr.dtype != np.uint8:
raise ValueError("Frame must be a np.ndarray of dtype uint8.")
if frame_bgr.ndim != 3 or frame_bgr.shape[2] != 3:
raise ValueError("Frame must have shape (H, W, 3).")
if frame_bgr.shape[0] != self.height or frame_bgr.shape[1] != self.width:
raise ValueError(f"Frame size mismatch. Expected {self.width}x{self.height}, got {frame_bgr.shape[1]}x{frame_bgr.shape[0]}.")
# ensure contiguous for tobytes()
frame_bgr = np.ascontiguousarray(frame_bgr)
try:
self.proc.stdin.write(frame_bgr.tobytes())
except Exception as e:
# collect stderr for diagnostics
stderr = b""
try:
if self.proc and self.proc.stderr:
stderr = self.proc.stderr.read()
except Exception:
pass
msg = f"FFmpeg pipe write failed: {e}"
if stderr:
msg += f"\nffmpeg stderr: {(stderr or b'').decode(errors='ignore')[:1000]}"
raise BrokenPipeError(msg)
def close(self):
if self.proc is None:
return
try:
if self.proc.stdin:
try:
self.proc.stdin.flush()
except Exception:
pass
try:
self.proc.stdin.close()
except Exception:
pass
# drain a bit of stderr for logs
if self.proc.stderr:
try:
err = self.proc.stderr.read()
if err:
self.log.debug("FFmpeg stderr (tail): %s", err.decode(errors="ignore")[-2000:])
except Exception:
pass
self.proc.wait(timeout=10)
except Exception:
try:
self.proc.kill()
except Exception:
pass
finally:
self.proc = None
class CoreVideoProcessor:
"""
Minimal, safe implementation used by core/app.py.
It relies on a models provider (e.g., ModelLoader) that implements:
- get_sam2()
- get_matanyone()
and uses utils.cv_processing for the pipeline.
Supports progress callback and cancellation via stop_event.
"""
def __init__(self, config: Optional[ProcessorConfig] = None, models: Optional[Any] = None):
self.log = _log
self.config = config or ProcessorConfig()
self.models = models # do NOT load here; core/app handles loading
if self.models is None:
self.log.warning("CoreVideoProcessor initialized without a models provider; will use fallbacks.")
self._ffmpeg = shutil.which("ffmpeg")
# state for temporal smoothing
self._prev_mask: Optional[np.ndarray] = None
# ---------- mask post-processing (stability + crispness) ----------
def _iou(self, a: np.ndarray, b: np.ndarray, thr: float = 0.5) -> float:
a_bin = (a >= thr).astype(np.uint8)
b_bin = (b >= thr).astype(np.uint8)
inter = np.count_nonzero(cv2.bitwise_and(a_bin, b_bin))
union = np.count_nonzero(cv2.bitwise_or(a_bin, b_bin))
return (inter / union) if union else 0.0
def _harden(self, m: np.ndarray) -> np.ndarray:
# optional gamma
g = float(self.config.mask_gamma)
if abs(g - 1.0) > 1e-6:
m = np.clip(m, 0, 1) ** g
lo = float(self.config.hard_low)
hi = float(self.config.hard_high)
if hi > lo + 1e-6:
m = (m - lo) / (hi - lo)
m = np.clip(m, 0.0, 1.0)
# pad edges then tiny blur
k = int(self.config.dilate_px)
if k > 0:
se = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*k+1, 2*k+1))
m = cv2.dilate(m, se, iterations=1)
eb = int(self.config.edge_blur_px)
if eb > 0:
m = cv2.GaussianBlur(m, (2*eb+1, 2*eb+1), 0)
return np.clip(m, 0.0, 1.0)
def _stabilize(self, m: np.ndarray) -> np.ndarray:
if self._prev_mask is None:
self._prev_mask = m
return m
# outlier rejection
if self._iou(self._prev_mask, m, 0.5) < float(self.config.min_iou_to_accept):
# ignore this frame's mask → keep previous
return self._prev_mask
# EMA
a = float(self.config.temporal_ema_alpha)
m_ema = a * self._prev_mask + (1.0 - a) * m
self._prev_mask = m_ema
return m_ema
# ---------- Single frame ----------
def process_frame(self, frame_bgr: np.ndarray, background_rgb: np.ndarray) -> Dict[str, Any]:
"""
Process one frame:
- optionally downscale for model work,
- segment + refine,
- temporal stabilize + harden,
- upsample mask,
- composite full-res.
Returns dict with composited frame (BGR for writer) and mask (H,W float).
"""
H, W = frame_bgr.shape[:2]
max_side = max(H, W)
scale = 1.0
proc_frame_bgr = frame_bgr
# Model-only downscale
if self.config.max_model_size and max_side > self.config.max_model_size:
scale = self.config.max_model_size / float(max_side)
newW = int(round(W * scale))
newH = int(round(H * scale))
proc_frame_bgr = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA)
self.log.debug(f"Model-only downscale: {W}x{H} -> {newW}x{newH} (scale={scale:.3f})")
# RGB for models
proc_frame_rgb = cv2.cvtColor(proc_frame_bgr, cv2.COLOR_BGR2RGB)
predictor = None
try:
if self.models and hasattr(self.models, "get_sam2"):
predictor = self.models.get_sam2()
except Exception as e:
self.log.warning(f"SAM2 predictor unavailable: {e}")
# 1) segmentation (with internal fallbacks)
mask_small = segment_person_hq(proc_frame_rgb, predictor, use_sam2=True)
# 2) refinement (MatAnyOne if available)
matanyone = None
try:
if self.models and hasattr(self.models, "get_matanyone"):
matanyone = self.models.get_matanyone()
except Exception as e:
self.log.warning(f"MatAnyOne unavailable: {e}")
# IMPORTANT: call order is (frame, mask, matanyone=...)
mask_small_ref = refine_mask_hq(proc_frame_rgb, mask_small, matanyone=matanyone, use_matanyone=True)
# Stabilize + harden at model scale
mask_small_ref = np.clip(mask_small_ref.astype(np.float32), 0.0, 1.0)
mask_stable = self._stabilize(mask_small_ref)
mask_stable = self._harden(mask_stable)
# Upsample mask back to full-res
if scale != 1.0:
mask_full = cv2.resize(mask_stable, (W, H), interpolation=cv2.INTER_LINEAR)
else:
mask_full = mask_stable
# 3) compositing (helpers expect RGB inputs; return RGB)
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
out_rgb = replace_background_hq(frame_rgb, mask_full, background_rgb)
# Convert to BGR for writer
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR)
return {"frame": out_bgr, "mask": mask_full}
# ---------- Build background once per video ----------
def _prepare_background_from_config(
self,
bg_config: Optional[Dict[str, Any]],
width: int,
height: int
) -> np.ndarray:
"""
Accepts either:
- {"custom_path": "/path/to/image.png"} → load image (RGB out)
- {"background_choice": "office"} → preset
- {"gradient": {type,start,end,angle_deg}} → generated gradient
Returns RGB np.uint8
"""
# 1) custom image?
if bg_config and bg_config.get("custom_path"):
path = bg_config["custom_path"]
img_bgr = cv2.imread(path, cv2.IMREAD_COLOR)
if img_bgr is None:
self.log.warning(f"Custom background at '{path}' could not be read. Falling back to preset.")
else:
img_bgr = cv2.resize(img_bgr, (width, height), interpolation=cv2.INTER_LANCZOS4)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# 2) gradient?
if bg_config and isinstance(bg_config.get("gradient"), dict):
try:
return _create_gradient_background_local(bg_config["gradient"], width, height)
except Exception as e:
self.log.warning(f"Gradient generation failed: {e}. Falling back to preset.")
# 3) preset (explicit choice or default)
choice = None
if bg_config and "background_choice" in bg_config:
choice = bg_config["background_choice"]
if not choice:
choice = self.config.background_preset
if choice not in PROFESSIONAL_BACKGROUNDS:
self.log.warning(f"Unknown background preset '{choice}'; using 'office'.")
choice = "office"
return create_professional_background(choice, width, height) # RGB
# ---------- Full video ----------
def process_video(
self,
input_path: str,
output_path: str,
bg_config: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable[[int, int, float], None]] = None,
stop_event: Optional[threading.Event] = None
) -> Dict[str, Any]:
"""
Process a full video with live progress and optional cancel.
progress_callback(current_frame, total_frames, fps_live)
"""
ok, msg = validate_video_file(input_path)
if not ok:
raise ValueError(f"Invalid or unreadable video: {msg}")
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise RuntimeError(f"Could not open video: {input_path}")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps_out = self.config.write_fps or (fps if fps and fps > 0 else 25.0)
# Background once (RGB)
background_rgb = self._prepare_background_from_config(bg_config, width, height)
# reset temporal state for a new video
self._prev_mask = None
# Writer selection
ffmpeg_pipe: _FFmpegPipe | None = None
writer: cv2.VideoWriter | None = None
ffmpeg_failed_reason = None
if self.config.use_nvenc and self._ffmpeg:
try:
ffmpeg_pipe = _FFmpegPipe(width, height, float(fps_out), output_path, self.config, log=self.log)
except Exception as e:
ffmpeg_failed_reason = str(e)
self.log.warning("FFmpeg NVENC pipeline unavailable. Falling back to OpenCV. Reason: %s", e)
if ffmpeg_pipe is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height))
if not writer.isOpened():
cap.release()
raise RuntimeError(f"Could not open VideoWriter for: {output_path}")
frame_count = 0
start_time = time.time()
try:
while True:
ret, frame_bgr = cap.read()
if not ret:
break
if stop_event is not None and stop_event.is_set():
self.log.info("Processing stopped by user request.")
break
# Process single frame
result = self.process_frame(frame_bgr, background_rgb)
out_bgr = result["frame"]
out_bgr = np.ascontiguousarray(out_bgr) # ensure contiguous for tobytes()
# Write
if ffmpeg_pipe is not None:
try:
ffmpeg_pipe.write(out_bgr)
except Exception as e:
# Switch to OpenCV writer mid-run and continue
self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e)
try:
ffmpeg_pipe.close()
except Exception:
pass
ffmpeg_pipe = None
if writer is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height))
if not writer.isOpened():
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}")
writer.write(out_bgr)
else:
writer.write(out_bgr)
frame_count += 1
# Progress
if progress_callback:
elapsed = time.time() - start_time
fps_live = frame_count / elapsed if elapsed > 0 else 0.0
try:
progress_callback(frame_count, total_frames, fps_live)
except Exception:
pass
finally:
cap.release()
if writer is not None:
writer.release()
if ffmpeg_pipe is not None:
try:
ffmpeg_pipe.close()
except Exception:
pass
if ffmpeg_failed_reason:
self.log.info("Completed via OpenCV writer (FFmpeg initially failed): %s", ffmpeg_failed_reason)
self.log.info("Processed %d frames → %s", frame_count, output_path)
return {
"frames": frame_count,
"width": width,
"height": height,
"fps_out": float(fps_out),
"output_path": output_path,
}
# Backward-compat alias used elsewhere
VideoProcessor = CoreVideoProcessor