|
|
|
|
|
""" |
|
|
Compatibility shim: CoreVideoProcessor |
|
|
|
|
|
Bridges the legacy import `from processing.video.video_processor import CoreVideoProcessor` |
|
|
to the modern pipeline functions living in `utils.cv_processing` and models in `core.models`. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from dataclasses import dataclass |
|
|
from typing import Optional, Dict, Any, Tuple, Callable |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import time |
|
|
import threading |
|
|
|
|
|
from utils.logger import get_logger |
|
|
from core.models import ModelManager |
|
|
|
|
|
|
|
|
from utils.cv_processing import ( |
|
|
segment_person_hq, |
|
|
refine_mask_hq, |
|
|
replace_background_hq, |
|
|
create_professional_background, |
|
|
validate_video_file, |
|
|
) |
|
|
|
|
|
@dataclass |
|
|
class ProcessorConfig: |
|
|
background_preset: str = "minimalist" |
|
|
write_fps: Optional[float] = None |
|
|
|
|
|
class CoreVideoProcessor: |
|
|
""" |
|
|
Minimal, safe implementation used by core/app.py. |
|
|
It relies on ModelManager (SAM2 + MatAnyone) and your cv_processing helpers. |
|
|
Now supports live progress + cancel/stop. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[ProcessorConfig] = None, models: Optional[ModelManager] = None): |
|
|
self.log = get_logger(f"{__name__}.CoreVideoProcessor") |
|
|
self.config = config or ProcessorConfig() |
|
|
self.models = models or ModelManager() |
|
|
try: |
|
|
self.models.load_all() |
|
|
except Exception as e: |
|
|
self.log.warning(f"Model load issue (will use fallbacks if needed): {e}") |
|
|
|
|
|
|
|
|
def process_frame(self, frame: np.ndarray, background: np.ndarray) -> Dict[str, Any]: |
|
|
"""Return dict with composited frame + mask; always succeeds with fallbacks.""" |
|
|
predictor = None |
|
|
try: |
|
|
sam2_model = self.models.get_sam2() |
|
|
if sam2_model is not None: |
|
|
if hasattr(sam2_model, 'predictor'): |
|
|
predictor = sam2_model.predictor |
|
|
elif hasattr(sam2_model, 'set_image'): |
|
|
predictor = sam2_model |
|
|
elif isinstance(sam2_model, dict) and 'model' in sam2_model: |
|
|
self.log.warning("SAM2 loaded as dict format, not directly usable") |
|
|
predictor = None |
|
|
if predictor is None: |
|
|
self.log.debug("SAM2 predictor not available, will use fallback") |
|
|
except Exception as e: |
|
|
self.log.warning(f"SAM2 predictor unavailable: {e}") |
|
|
|
|
|
|
|
|
mask = segment_person_hq(frame, predictor, fallback_enabled=True) |
|
|
|
|
|
|
|
|
matanyone = None |
|
|
try: |
|
|
matanyone_model = self.models.get_matanyone() |
|
|
if matanyone_model is not None: |
|
|
matanyone = matanyone_model |
|
|
except Exception as e: |
|
|
self.log.warning(f"MatAnyone unavailable: {e}") |
|
|
|
|
|
mask_refined = refine_mask_hq(frame, mask, matanyone, fallback_enabled=True) |
|
|
|
|
|
|
|
|
out = replace_background_hq(frame, mask_refined, background, fallback_enabled=True) |
|
|
|
|
|
return {"frame": out, "mask": mask_refined} |
|
|
|
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
bg_config: Optional[Dict[str, Any]] = None, |
|
|
progress_callback: Optional[Callable[[int, int, float], None]] = None, |
|
|
stop_event: Optional[threading.Event] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a full video with live progress and optional stop. |
|
|
progress_callback: function(current_frame, total_frames, fps) |
|
|
stop_event: threading.Event() - if set(), abort processing. |
|
|
Returns: dict with stats. |
|
|
""" |
|
|
ok, msg = validate_video_file(input_path) |
|
|
if not ok: |
|
|
raise ValueError(f"Invalid video: {msg}") |
|
|
self.log.info(f"Video validation: {msg}") |
|
|
|
|
|
cap = cv2.VideoCapture(input_path) |
|
|
if not cap.isOpened(): |
|
|
raise RuntimeError(f"Could not open video: {input_path}") |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps_out = self.config.write_fps or (fps if fps and fps > 0 else 25.0) |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
writer = cv2.VideoWriter(output_path, fourcc, fps_out, (width, height)) |
|
|
|
|
|
|
|
|
from utils.cv_processing import PROFESSIONAL_BACKGROUNDS |
|
|
preset = self.config.background_preset |
|
|
cfg = bg_config or PROFESSIONAL_BACKGROUNDS.get(preset, PROFESSIONAL_BACKGROUNDS["minimalist"]) |
|
|
background = create_professional_background(cfg, width, height) |
|
|
|
|
|
frame_count = 0 |
|
|
start_time = time.time() |
|
|
try: |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
if stop_event is not None and stop_event.is_set(): |
|
|
self.log.info("Processing stopped by user request") |
|
|
break |
|
|
|
|
|
res = self.process_frame(frame, background) |
|
|
writer.write(res["frame"]) |
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
elapsed = time.time() - start_time |
|
|
fps_live = frame_count / elapsed if elapsed > 0 else 0 |
|
|
progress_callback( |
|
|
frame_count, |
|
|
total_frames, |
|
|
fps_live |
|
|
) |
|
|
finally: |
|
|
cap.release() |
|
|
writer.release() |
|
|
|
|
|
self.log.info(f"Processed {frame_count} frames → {output_path}") |
|
|
return { |
|
|
"frames": frame_count, |
|
|
"width": width, |
|
|
"height": height, |
|
|
"fps_out": fps_out |
|
|
} |
|
|
|
|
|
|
|
|
VideoProcessor = CoreVideoProcessor |
|
|
|