|
|
|
|
|
|
|
|
""" |
|
|
Core Video Processor for BackgroundFX Pro |
|
|
|
|
|
- Minimal, safe implementation used by core/app.py |
|
|
- Works with split/legacy loaders |
|
|
- Keeps exact behavior you shared; only fixes typing imports + integrates |
|
|
prepare_background() and related helpers. |
|
|
|
|
|
NOTE: |
|
|
- Requires utils.cv_processing helpers already present in your project: |
|
|
segment_person_hq, refine_mask_hq, replace_background_hq, |
|
|
create_professional_background, PROFESSIONAL_BACKGROUNDS, |
|
|
_create_gradient_background_local, validate_video_file |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
import time |
|
|
import shutil |
|
|
import logging |
|
|
import threading |
|
|
from typing import Optional, Any, Dict, Callable, Tuple, List |
|
|
|
|
|
import numpy as np |
|
|
import cv2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from utils.logger import get_logger |
|
|
_log = get_logger("processing.video.video_processor") |
|
|
except Exception: |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
_log = logging.getLogger("processing.video.video_processor") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from config.processor_config import ProcessorConfig |
|
|
except Exception: |
|
|
ProcessorConfig = Any |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from utils.system.env_utils import env_bool as _env_bool |
|
|
from utils.system.env_utils import env_int as _env_int |
|
|
except Exception: |
|
|
def _env_bool(name: str, default: bool = False) -> bool: |
|
|
v = os.environ.get(name) |
|
|
if v is None: |
|
|
return bool(default) |
|
|
return str(v).strip().lower() in ("1", "true", "yes", "y", "on") |
|
|
|
|
|
def _env_int(name: str, default: int = 0) -> int: |
|
|
try: |
|
|
return int(os.environ.get(name, default)) |
|
|
except Exception: |
|
|
return int(default) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from utils.cv_processing import ( |
|
|
segment_person_hq, |
|
|
refine_mask_hq, |
|
|
replace_background_hq, |
|
|
create_professional_background, |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
validate_video_file, |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from utils.cv_processing import _create_gradient_background_local |
|
|
except Exception: |
|
|
_create_gradient_background_local = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from utils.video.ffmpeg_pipe import FFmpegPipe as _FFmpegPipe |
|
|
except Exception: |
|
|
_FFmpegPipe = None |
|
|
|
|
|
|
|
|
class CoreVideoProcessor: |
|
|
""" |
|
|
Minimal, safe implementation used by core/app.py. |
|
|
Orchestrates SAM2 → MatAnyone refinement → background compositing, |
|
|
with robust fallbacks (OpenCV writer when FFmpeg/NVENC unavailable). |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[ProcessorConfig] = None, models: Optional[Any] = None): |
|
|
self.log = _log |
|
|
self.config = config or ProcessorConfig() |
|
|
self.models = models |
|
|
if self.models is None: |
|
|
self.log.warning("CoreVideoProcessor initialized without a models provider; will use fallbacks.") |
|
|
self._ffmpeg = shutil.which("ffmpeg") |
|
|
|
|
|
|
|
|
self._use_windowed = _env_bool( |
|
|
"MATANYONE_WINDOWED", |
|
|
bool(getattr(self.config, "use_windowed", False)), |
|
|
) |
|
|
self._window_size = max(1, _env_int("MATANYONE_WINDOW", int(getattr(self.config, "window_size", 8)))) |
|
|
self._max_model_size = int(os.environ.get("MAX_MODEL_SIZE", getattr(self.config, "max_model_size", 1280) or 0)) or None |
|
|
|
|
|
|
|
|
self._prev_mask: Optional[np.ndarray] = None |
|
|
|
|
|
|
|
|
try: |
|
|
self._chunk_size = max(1, int(os.environ.get("MATANYONE_CHUNK", "12"))) |
|
|
except Exception: |
|
|
self._chunk_size = 12 |
|
|
self._chunk_idx = 0 |
|
|
|
|
|
|
|
|
def prepare_background(self, background_choice: str, custom_background_path: Optional[str], width: int, height: int) -> np.ndarray: |
|
|
""" |
|
|
Prepares a background image for compositing. |
|
|
If a valid custom background path is given, loads and resizes it. Otherwise, uses a preset. |
|
|
Returns: np.ndarray RGB (H, W, 3) uint8 |
|
|
""" |
|
|
from utils.cv_processing import create_professional_background |
|
|
|
|
|
if custom_background_path: |
|
|
try: |
|
|
img = cv2.imread(custom_background_path, cv2.IMREAD_COLOR) |
|
|
if img is not None: |
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
img = cv2.resize(img, (width, height), interpolation=cv2.INTER_LANCZOS4) |
|
|
return img |
|
|
else: |
|
|
self.log.warning(f"Failed to load custom background from '{custom_background_path}', using preset.") |
|
|
except Exception as e: |
|
|
self.log.warning(f"Exception loading custom background: {e}, using preset.") |
|
|
|
|
|
|
|
|
return create_professional_background(background_choice, width, height) |
|
|
|
|
|
|
|
|
def _iou(self, a: np.ndarray, b: np.ndarray, thr: float = 0.5) -> float: |
|
|
a_bin = (a >= thr).astype(np.uint8) |
|
|
b_bin = (b >= thr).astype(np.uint8) |
|
|
inter = np.count_nonzero(cv2.bitwise_and(a_bin, b_bin)) |
|
|
union = np.count_nonzero(cv2.bitwise_or(a_bin, b_bin)) |
|
|
return (inter / union) if union else 0.0 |
|
|
|
|
|
def _harden(self, m: np.ndarray) -> np.ndarray: |
|
|
g = float(getattr(self.config, "mask_gamma", 0.90)) |
|
|
if abs(g - 1.0) > 1e-6: |
|
|
m = np.clip(m, 0, 1) ** g |
|
|
|
|
|
lo = float(getattr(self.config, "hard_low", 0.35)) |
|
|
hi = float(getattr(self.config, "hard_high", 0.70)) |
|
|
if hi > lo + 1e-6: |
|
|
m = (m - lo) / (hi - lo) |
|
|
m = np.clip(m, 0.0, 1.0) |
|
|
|
|
|
k = int(getattr(self.config, "dilate_px", 6)) |
|
|
if k > 0: |
|
|
se = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*k+1, 2*k+1)) |
|
|
m = cv2.dilate(m, se, iterations=1) |
|
|
|
|
|
eb = int(getattr(self.config, "edge_blur_px", 1)) |
|
|
if eb > 0: |
|
|
m = cv2.GaussianBlur(m, (2*eb+1, 2*eb+1), 0) |
|
|
|
|
|
return np.clip(m, 0.0, 1.0) |
|
|
|
|
|
def _stabilize(self, m: np.ndarray) -> np.ndarray: |
|
|
if self._prev_mask is None: |
|
|
self._prev_mask = m |
|
|
return m |
|
|
|
|
|
thr = float(getattr(self.config, "min_iou_to_accept", 0.05)) |
|
|
if self._iou(self._prev_mask, m, 0.5) < thr: |
|
|
return self._prev_mask |
|
|
|
|
|
a = float(getattr(self.config, "temporal_ema_alpha", 0.75)) |
|
|
m_ema = a * self._prev_mask + (1.0 - a) * m |
|
|
self._prev_mask = m_ema |
|
|
return m_ema |
|
|
|
|
|
|
|
|
def process_frame(self, frame_bgr: np.ndarray, background_rgb: np.ndarray) -> Dict[str, Any]: |
|
|
H, W = frame_bgr.shape[:2] |
|
|
max_side = max(H, W) |
|
|
scale = 1.0 |
|
|
proc_frame_bgr = frame_bgr |
|
|
|
|
|
|
|
|
mms = self._max_model_size |
|
|
if mms and max_side > mms: |
|
|
scale = mms / float(max_side) |
|
|
newW = int(round(W * scale)) |
|
|
newH = int(round(H * scale)) |
|
|
proc_frame_bgr = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA) |
|
|
self.log.debug(f"Model-only downscale: {W}x{H} -> {newW}x{newH} (scale={scale:.3f})") |
|
|
|
|
|
proc_frame_rgb = cv2.cvtColor(proc_frame_bgr, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
predictor = None |
|
|
try: |
|
|
if self.models and hasattr(self.models, "get_sam2"): |
|
|
predictor = self.models.get_sam2() |
|
|
except Exception as e: |
|
|
self.log.warning(f"SAM2 predictor unavailable: {e}") |
|
|
|
|
|
mask_small = segment_person_hq(proc_frame_rgb, predictor, use_sam2=True) |
|
|
|
|
|
matanyone = None |
|
|
try: |
|
|
if self.models and hasattr(self.models, "get_matanyone"): |
|
|
matanyone = self.models.get_matanyone() |
|
|
except Exception as e: |
|
|
self.log.warning(f"MatAnyOne unavailable: {e}") |
|
|
|
|
|
if matanyone is not None and hasattr(matanyone, "reset") and self._chunk_idx == 0: |
|
|
try: |
|
|
matanyone.reset() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
mask_small_ref = refine_mask_hq( |
|
|
proc_frame_rgb, |
|
|
mask_small, |
|
|
matanyone=matanyone, |
|
|
use_matanyone=True, |
|
|
frame_idx=self._chunk_idx, |
|
|
) |
|
|
|
|
|
self._chunk_idx = (self._chunk_idx + 1) % max(1, self._chunk_size) |
|
|
if self._chunk_idx == 0: |
|
|
try: |
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
mask_small_ref = np.clip(mask_small_ref.astype(np.float32), 0.0, 1.0) |
|
|
mask_stable = self._stabilize(mask_small_ref) |
|
|
mask_stable = self._harden(mask_stable) |
|
|
|
|
|
if scale != 1.0: |
|
|
mask_full = cv2.resize(mask_stable, (W, H), interpolation=cv2.INTER_LINEAR) |
|
|
else: |
|
|
mask_full = mask_stable |
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
|
|
out_rgb = replace_background_hq(frame_rgb, mask_full, background_rgb) |
|
|
|
|
|
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR) |
|
|
return {"frame": out_bgr, "mask": mask_full} |
|
|
|
|
|
|
|
|
def _prepare_background_from_config( |
|
|
self, bg_config: Optional[Dict[str, Any]], width: int, height: int |
|
|
) -> np.ndarray: |
|
|
if bg_config and bg_config.get("custom_path"): |
|
|
path = bg_config["custom_path"] |
|
|
img_bgr = cv2.imread(path, cv2.IMREAD_COLOR) |
|
|
if img_bgr is None: |
|
|
self.log.warning(f"Custom background at '{path}' could not be read. Falling back to preset.") |
|
|
else: |
|
|
img_bgr = cv2.resize(img_bgr, (width, height), interpolation=cv2.INTER_LANCZOS4) |
|
|
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
if bg_config and isinstance(bg_config.get("gradient"), dict) and _create_gradient_background_local: |
|
|
try: |
|
|
return _create_gradient_background_local(bg_config["gradient"], width, height) |
|
|
except Exception as e: |
|
|
self.log.warning(f"Gradient generation failed: {e}. Falling back to preset.") |
|
|
|
|
|
choice = None |
|
|
if bg_config and "background_choice" in bg_config: |
|
|
choice = bg_config["background_choice"] |
|
|
if not choice: |
|
|
choice = getattr(self.config, "background_preset", "office") |
|
|
|
|
|
if choice not in PROFESSIONAL_BACKGROUNDS: |
|
|
self.log.warning(f"Unknown background preset '{choice}'; using 'office'.") |
|
|
choice = "office" |
|
|
|
|
|
return create_professional_background(choice, width, height) |
|
|
|
|
|
|
|
|
def _model_downscale(self, frame_bgr: np.ndarray) -> Tuple[np.ndarray, float]: |
|
|
H, W = frame_bgr.shape[:2] |
|
|
max_side = max(H, W) |
|
|
mms = self._max_model_size |
|
|
if mms and max_side > mms: |
|
|
s = mms / float(max_side) |
|
|
newW = int(round(W * s)) |
|
|
newH = int(round(H * s)) |
|
|
small = cv2.resize(frame_bgr, (newW, newH), interpolation=cv2.INTER_AREA) |
|
|
return small, s |
|
|
return frame_bgr, 1.0 |
|
|
|
|
|
def _prepare_sam2_gpu(self, predictor): |
|
|
try: |
|
|
import torch |
|
|
if predictor is None or not torch.cuda.is_available(): |
|
|
return |
|
|
if hasattr(predictor, "to"): |
|
|
try: |
|
|
predictor.to("cuda") |
|
|
return |
|
|
except Exception: |
|
|
pass |
|
|
if hasattr(predictor, "model") and hasattr(predictor.model, "to"): |
|
|
try: |
|
|
predictor.model.to("cuda") |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def _release_sam2_gpu(self, predictor): |
|
|
try: |
|
|
if predictor is None: |
|
|
return |
|
|
for name in ("reset_image", "release_image", "clear_image", "clear_state"): |
|
|
if hasattr(predictor, name) and callable(getattr(predictor, name)): |
|
|
try: |
|
|
getattr(predictor, name)() |
|
|
except Exception: |
|
|
pass |
|
|
for name in ("to", "cpu"): |
|
|
if hasattr(predictor, name): |
|
|
try: |
|
|
if name == "to": |
|
|
predictor.to("cpu") |
|
|
else: |
|
|
predictor.cpu() |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
bg_config: Optional[Dict[str, Any]] = None, |
|
|
progress_callback: Optional[Callable[[int, int, float], None]] = None, |
|
|
stop_event: Optional[threading.Event] = None |
|
|
) -> Dict[str, Any]: |
|
|
ok, msg = validate_video_file(input_path) |
|
|
if not ok: |
|
|
raise ValueError(f"Invalid or unreadable video: {msg}") |
|
|
|
|
|
cap = cv2.VideoCapture(input_path) |
|
|
if not cap.isOpened(): |
|
|
raise RuntimeError(f"Could not open video: {input_path}") |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
fps_out = getattr(self.config, "write_fps", None) or (fps if fps and fps > 0 else 25.0) |
|
|
|
|
|
background_rgb = self._prepare_background_from_config(bg_config, width, height) |
|
|
|
|
|
self._prev_mask = None |
|
|
|
|
|
ffmpeg_pipe: _FFmpegPipe | None = None |
|
|
writer: cv2.VideoWriter | None = None |
|
|
ffmpeg_failed_reason = None |
|
|
|
|
|
if getattr(self.config, "use_nvenc", True) and shutil.which("ffmpeg") and _FFmpegPipe is not None: |
|
|
try: |
|
|
ffmpeg_pipe = _FFmpegPipe(width, height, float(fps_out), output_path, self.config, log=self.log) |
|
|
except Exception as e: |
|
|
ffmpeg_failed_reason = str(e) |
|
|
self.log.warning("FFmpeg NVENC pipeline unavailable. Falling back to OpenCV. Reason: %s", e) |
|
|
|
|
|
if ffmpeg_pipe is None: |
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) |
|
|
if not writer.isOpened(): |
|
|
cap.release() |
|
|
raise RuntimeError(f"Could not open VideoWriter for: {output_path}") |
|
|
|
|
|
predictor = None |
|
|
matanyone = None |
|
|
try: |
|
|
if self.models and hasattr(self.models, "get_sam2"): |
|
|
predictor = self.models.get_sam2() |
|
|
except Exception as e: |
|
|
self.log.warning(f"SAM2 predictor unavailable: {e}") |
|
|
|
|
|
try: |
|
|
if self.models and hasattr(self.models, "get_matanyone"): |
|
|
matanyone = self.models.get_matanyone() |
|
|
except Exception as e: |
|
|
self.log.warning(f"MatAnyOne unavailable: {e}") |
|
|
|
|
|
use_windowed = bool(self._use_windowed and predictor is not None and matanyone is not None) |
|
|
|
|
|
frame_count = 0 |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
if not use_windowed: |
|
|
while True: |
|
|
ret, frame_bgr = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
if stop_event is not None and stop_event.is_set(): |
|
|
self.log.info("Processing stopped by user request.") |
|
|
break |
|
|
|
|
|
result = self.process_frame(frame_bgr, background_rgb) |
|
|
out_bgr = np.ascontiguousarray(result["frame"]) |
|
|
|
|
|
if ffmpeg_pipe is not None: |
|
|
try: |
|
|
ffmpeg_pipe.write(out_bgr) |
|
|
except Exception as e: |
|
|
self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e) |
|
|
try: |
|
|
ffmpeg_pipe.close() |
|
|
except Exception: |
|
|
pass |
|
|
ffmpeg_pipe = None |
|
|
if writer is None: |
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) |
|
|
if not writer.isOpened(): |
|
|
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}") |
|
|
writer.write(out_bgr) |
|
|
else: |
|
|
writer.write(out_bgr) |
|
|
|
|
|
frame_count += 1 |
|
|
if progress_callback: |
|
|
elapsed = time.time() - start_time |
|
|
fps_live = frame_count / elapsed if elapsed > 0 else 0.0 |
|
|
try: |
|
|
progress_callback(frame_count, total_frames, fps_live) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
else: |
|
|
WINDOW = max(1, int(self._window_size)) |
|
|
|
|
|
while True: |
|
|
frames_bgr: List[np.ndarray] = [] |
|
|
for _ in range(WINDOW): |
|
|
ret, fr = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
frames_bgr.append(fr) |
|
|
|
|
|
if not frames_bgr: |
|
|
break |
|
|
|
|
|
if stop_event is not None and stop_event.is_set(): |
|
|
self.log.info("Processing stopped by user request.") |
|
|
break |
|
|
|
|
|
frames_small_bgr: List[np.ndarray] = [] |
|
|
scales: List[float] = [] |
|
|
for fr in frames_bgr: |
|
|
fr_small, s = self._model_downscale(fr) |
|
|
frames_small_bgr.append(fr_small) |
|
|
scales.append(s) |
|
|
scale = scales[0] if scales else 1.0 |
|
|
|
|
|
frames_small_rgb = [cv2.cvtColor(fb, cv2.COLOR_BGR2RGB) for fb in frames_small_bgr] |
|
|
|
|
|
self._prepare_sam2_gpu(predictor) |
|
|
try: |
|
|
mask_small = segment_person_hq(frames_small_rgb[0], predictor, use_sam2=True) |
|
|
except Exception as e: |
|
|
self.log.warning(f"SAM2 segmentation error on window start: {e}") |
|
|
mask_small = segment_person_hq(frames_small_rgb[0], None, use_sam2=False) |
|
|
|
|
|
self._release_sam2_gpu(predictor) |
|
|
|
|
|
if hasattr(matanyone, "reset"): |
|
|
try: |
|
|
matanyone.reset() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
for j, fr_rgb_small in enumerate(frames_small_rgb): |
|
|
try: |
|
|
if j == 0: |
|
|
m2d = mask_small |
|
|
if m2d.ndim == 3: |
|
|
m2d = m2d[..., 0] |
|
|
alpha_small = matanyone(fr_rgb_small, m2d) |
|
|
else: |
|
|
alpha_small = matanyone(fr_rgb_small) |
|
|
|
|
|
alpha_small = np.clip(alpha_small.astype(np.float32), 0.0, 1.0) |
|
|
alpha_stable = self._stabilize(alpha_small) |
|
|
alpha_harden = self._harden(alpha_stable) |
|
|
|
|
|
if scale != 1.0: |
|
|
H, W = frames_bgr[j].shape[:2] |
|
|
alpha_full = cv2.resize(alpha_harden, (W, H), interpolation=cv2.INTER_LINEAR) |
|
|
else: |
|
|
alpha_full = alpha_harden |
|
|
|
|
|
frame_rgb_full = cv2.cvtColor(frames_bgr[j], cv2.COLOR_BGR2RGB) |
|
|
out_rgb = replace_background_hq(frame_rgb_full, alpha_full, background_rgb) |
|
|
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR) |
|
|
out_bgr = np.ascontiguousarray(out_bgr) |
|
|
|
|
|
if ffmpeg_pipe is not None: |
|
|
try: |
|
|
ffmpeg_pipe.write(out_bgr) |
|
|
except Exception as e: |
|
|
self.log.warning("Switching to OpenCV writer after FFmpeg error at frame %d: %s", frame_count, e) |
|
|
try: |
|
|
ffmpeg_pipe.close() |
|
|
except Exception: |
|
|
pass |
|
|
ffmpeg_pipe = None |
|
|
if writer is None: |
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) |
|
|
if not writer.isOpened(): |
|
|
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}") |
|
|
writer.write(out_bgr) |
|
|
else: |
|
|
writer.write(out_bgr) |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
except Exception as e: |
|
|
self.log.warning(f"MatAnyone failed at window frame {j}: {e}") |
|
|
if j == 0: |
|
|
alpha_small_fb = np.clip(mask_small.astype(np.float32), 0.0, 1.0) |
|
|
else: |
|
|
alpha_small_fb = self._prev_mask if self._prev_mask is not None else np.zeros_like(alpha_small, dtype=np.float32) |
|
|
|
|
|
if scale != 1.0: |
|
|
H, W = frames_bgr[j].shape[:2] |
|
|
alpha_full_fb = cv2.resize(alpha_small_fb, (W, H), interpolation=cv2.INTER_LINEAR) |
|
|
else: |
|
|
alpha_full_fb = alpha_small_fb |
|
|
|
|
|
frame_rgb_full = cv2.cvtColor(frames_bgr[j], cv2.COLOR_BGR2RGB) |
|
|
out_rgb_fb = replace_background_hq(frame_rgb_full, alpha_full_fb, background_rgb) |
|
|
out_bgr_fb = cv2.cvtColor(out_rgb_fb, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
if ffmpeg_pipe is not None: |
|
|
try: |
|
|
ffmpeg_pipe.write(np.ascontiguousarray(out_bgr_fb)) |
|
|
except Exception: |
|
|
try: |
|
|
ffmpeg_pipe.close() |
|
|
except Exception: |
|
|
pass |
|
|
ffmpeg_pipe = None |
|
|
if writer is None: |
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
writer = cv2.VideoWriter(output_path, fourcc, float(fps_out), (width, height)) |
|
|
if not writer.isOpened(): |
|
|
raise RuntimeError(f"FFmpeg failed and VideoWriter could not open: {output_path}") |
|
|
writer.write(np.ascontiguousarray(out_bgr_fb)) |
|
|
else: |
|
|
writer.write(np.ascontiguousarray(out_bgr_fb)) |
|
|
frame_count += 1 |
|
|
|
|
|
if progress_callback: |
|
|
elapsed = time.time() - start_time |
|
|
fps_live = frame_count / elapsed if elapsed > 0 else 0.0 |
|
|
try: |
|
|
progress_callback(frame_count, total_frames, fps_live) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
del frames_bgr, frames_small_bgr, frames_small_rgb, mask_small |
|
|
try: |
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
finally: |
|
|
cap.release() |
|
|
if writer is not None: |
|
|
writer.release() |
|
|
if ffmpeg_pipe is not None: |
|
|
try: |
|
|
ffmpeg_pipe.close() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if ffmpeg_failed_reason: |
|
|
self.log.info("Completed via OpenCV writer (FFmpeg initially failed): %s", ffmpeg_failed_reason) |
|
|
|
|
|
self.log.info("Processed %d frames → %s", frame_count, output_path) |
|
|
return { |
|
|
"frames": frame_count, |
|
|
"width": width, |
|
|
"height": height, |
|
|
"fps_out": float(fps_out), |
|
|
"output_path": output_path, |
|
|
} |
|
|
|