#!/usr/bin/env python3 """ BackgroundFX Pro – Main Application Entry Point Refactored modular architecture – orchestrates specialised components """ # ───────────────────────────────────────────────────────────────────────────── # 0) Early env/threading hygiene (must run first) # ───────────────────────────────────────────────────────────────────────────── import early_env # sets OMP/MKL/OPENBLAS + torch threads safely import logging import threading from pathlib import Path from typing import Optional, Tuple, Dict, Any, Callable # ───────────────────────────────────────────────────────────────────────────── # 1) Logging # ───────────────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("core.app") # ───────────────────────────────────────────────────────────────────────────── # 2) Patch Gradio schema early (HF Spaces quirk) # ───────────────────────────────────────────────────────────────────────────── try: import gradio_client.utils as gc_utils _orig_get_type = gc_utils.get_type def _patched_get_type(schema): if not isinstance(schema, dict): if isinstance(schema, bool): return "boolean" if isinstance(schema, str): return "string" if isinstance(schema, (int, float)): return "number" return "string" return _orig_get_type(schema) gc_utils.get_type = _patched_get_type logger.info("Gradio schema patch applied") except Exception as e: logger.warning(f"Gradio patch failed: {e}") # ───────────────────────────────────────────────────────────────────────────── # 3) Core config + components # ───────────────────────────────────────────────────────────────────────────── from config.app_config import get_config from core.exceptions import ModelLoadingError, VideoProcessingError from utils.hardware.device_manager import DeviceManager from utils.system.memory_manager import MemoryManager from models.loaders.model_loader import ModelLoader from processing.video.video_processor import CoreVideoProcessor from processing.audio.audio_processor import AudioProcessor from utils.monitoring.progress_tracker import ProgressTracker # Optional two-stage processor try: from processing.two_stage.two_stage_processor import ( TwoStageProcessor, CHROMA_PRESETS, ) TWO_STAGE_AVAILABLE = True except Exception: TWO_STAGE_AVAILABLE = False CHROMA_PRESETS = {"standard": {}} # Validation helper from utils.cv_processing import validate_video_file # ╔══════════════════════════════════════════════════════════════════════════╗ # ║ VideoProcessor class ║ # ╚══════════════════════════════════════════════════════════════════════════╝ class VideoProcessor: """ Main orchestrator – coordinates all specialised components. """ # ───────────────────────────────────────────────────────────────────── # Init # ───────────────────────────────────────────────────────────────────── def __init__(self): self.config = get_config() self.device_manager = DeviceManager() self.memory_manager = MemoryManager(self.device_manager.get_optimal_device()) self.model_loader = ModelLoader(self.device_manager, self.memory_manager) self.audio_processor = AudioProcessor() self.core_processor: CoreVideoProcessor | None = None self.two_stage_processor: TwoStageProcessor | None = None self.models_loaded = False self.loading_lock = threading.Lock() self.cancel_event = threading.Event() self.progress_tracker: ProgressTracker | None = None logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}") # ───────────────────────────────────────────────────────────────────── # Progress helper # ───────────────────────────────────────────────────────────────────── def _init_progress(self, video_path: str, cb: Optional[Callable] = None): try: import cv2 cap = cv2.VideoCapture(video_path) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() if total <= 0: total = 100 self.progress_tracker = ProgressTracker(total, cb) except Exception as e: logger.warning(f"Progress init failed: {e}") self.progress_tracker = ProgressTracker(100, cb) # ───────────────────────────────────────────────────────────────────── # Model loading # ───────────────────────────────────────────────────────────────────── def load_models(self, progress_callback: Optional[Callable] = None) -> str: with self.loading_lock: if self.models_loaded: return "Models already loaded and validated" try: self.cancel_event.clear() if progress_callback: progress_callback( 0.0, f"Loading on {self.device_manager.get_optimal_device()}" ) sam2_loaded, mat_loaded = self.model_loader.load_all_models( progress_callback=progress_callback, cancel_event=self.cancel_event ) if self.cancel_event.is_set(): return "Model loading cancelled" # Unwrap actual predictor / model objects sam2_predictor = sam2_loaded.model if sam2_loaded else None mat_model = mat_loaded.model if mat_loaded else None # Core single-stage processor self.core_processor = CoreVideoProcessor( config=self.config, models=self.model_loader ) # Two-stage processor (optional) if TWO_STAGE_AVAILABLE and (sam2_predictor or mat_model): try: self.two_stage_processor = TwoStageProcessor( sam2_predictor=sam2_predictor, matanyone_model=mat_model ) logger.info("Two-stage processor initialised") except Exception as e: logger.warning(f"Two-stage init failed: {e}") self.two_stage_processor = None self.models_loaded = True msg = self.model_loader.get_load_summary() msg += ( "\n✅ Two-stage processor ready" if self.two_stage_processor else "\n⚠️ Two-stage processor not available" ) logger.info(msg) return msg except (AttributeError, ModelLoadingError) as e: self.models_loaded = False err = f"Model loading failed: {e}" logger.error(err) return err except Exception as e: self.models_loaded = False err = f"Unexpected error during model loading: {e}" logger.error(err) return err # ───────────────────────────────────────────────────────────────────── # Public entry – process video # ───────────────────────────────────────────────────────────────────── def process_video( self, video_path: str, background_choice: str, custom_background_path: Optional[str] = None, progress_callback: Optional[Callable] = None, use_two_stage: bool = False, chroma_preset: str = "standard", key_color_mode: str = "auto", # NEW preview_mask: bool = False, preview_greenscreen: bool = False, ) -> Tuple[Optional[str], str]: """ Dispatch to single-stage or two-stage pipeline. """ if not self.models_loaded or not self.core_processor: return None, "Models not loaded. Please click “Load Models” first." if self.cancel_event.is_set(): return None, "Processing cancelled" self._init_progress(video_path, progress_callback) ok, why = validate_video_file(video_path) if not ok: return None, f"Invalid video: {why}" try: if use_two_stage: if not TWO_STAGE_AVAILABLE: return None, "Two-stage processing not available on this build" if not self.two_stage_processor: return None, "Two-stage processor not initialised" return self._process_two_stage( video_path, background_choice, custom_background_path, progress_callback, chroma_preset, key_color_mode, # NEW ) else: return self._process_single_stage( video_path, background_choice, custom_background_path, progress_callback, preview_mask, preview_greenscreen, ) except VideoProcessingError as e: logger.error(f"Processing failed: {e}") return None, f"Processing failed: {e}" except Exception as e: logger.error(f"Unexpected processing error: {e}") return None, f"Unexpected error: {e}" # ───────────────────────────────────────────────────────────────────── # Private – single-stage # ───────────────────────────────────────────────────────────────────── def _process_single_stage( self, video_path: str, background_choice: str, custom_background_path: Optional[str], progress_callback: Optional[Callable], preview_mask: bool, preview_greenscreen: bool, ) -> Tuple[Optional[str], str]: import time ts = int(time.time()) out_dir = Path(self.config.output_dir) / "single_stage" out_dir.mkdir(parents=True, exist_ok=True) out_path = str(out_dir / f"processed_{ts}.mp4") result = self.core_processor.process_video( input_path=video_path, output_path=out_path, bg_config={ "background_choice": background_choice, "custom_path": custom_background_path, }, ) if not result: return None, "Video processing failed" if not (preview_mask or preview_greenscreen): final_path = self.audio_processor.add_audio_to_video( original_video=video_path, processed_video=out_path ) else: final_path = out_path msg = ( "Processing completed.\n" f"Frames: {result.get('frames', 'unknown')}\n" f"Background: {background_choice}\n" f"Mode: Single-stage\n" f"Device: {self.device_manager.get_optimal_device()}" ) return final_path, msg # ───────────────────────────────────────────────────────────────────── # Private – two-stage # ───────────────────────────────────────────────────────────────────── def _process_two_stage( self, video_path: str, background_choice: str, custom_background_path: Optional[str], progress_callback: Optional[Callable], chroma_preset: str, key_color_mode: str, # NEW ) -> Tuple[Optional[str], str]: if self.two_stage_processor is None: return None, "Two-stage processor not available" import cv2, time cap = cv2.VideoCapture(video_path) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() background = self.core_processor.prepare_background( background_choice, custom_background_path, w, h ) if background is None: return None, "Failed to prepare background" ts = int(time.time()) out_dir = Path(self.config.output_dir) / "two_stage" out_dir.mkdir(parents=True, exist_ok=True) final_out = str(out_dir / f"final_{ts}.mp4") chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS["standard"]) logger.info(f"Two-stage with preset: {chroma_preset} and key_color_mode={key_color_mode}") result, message = self.two_stage_processor.process_full_pipeline( video_path, background, final_out, key_color_mode=key_color_mode, # NEW chroma_settings=chroma_cfg, progress_callback=progress_callback, ) if result is None: return None, message msg = ( "Two-stage processing completed.\n" f"Background: {background_choice}\n" f"Chroma Preset: {chroma_preset}\n" f"Device: {self.device_manager.get_optimal_device()}" ) return result, msg # ───────────────────────────────────────────────────────────────────── # Status helpers # ───────────────────────────────────────────────────────────────────── def get_status(self) -> Dict[str, Any]: status = { "models_loaded": self.models_loaded, "two_stage_available": TWO_STAGE_AVAILABLE and (self.two_stage_processor is not None), "device": str(self.device_manager.get_optimal_device()), "memory_usage": self.memory_manager.get_memory_usage(), "config": self.config.to_dict(), "core_processor_loaded": self.core_processor is not None, } try: status["sam2_loaded"] = self.model_loader.get_sam2() is not None status["matanyone_loaded"] = ( self.model_loader.get_matanyone() is not None ) except Exception: status["sam2_loaded"] = False status["matanyone_loaded"] = False if self.progress_tracker: status["progress"] = self.progress_tracker.get_all_progress() return status def cancel_processing(self): self.cancel_event.set() logger.info("Cancellation requested") def cleanup_resources(self): self.memory_manager.cleanup_aggressive() self.model_loader.cleanup() logger.info("Resources cleaned up") # ╔══════════════════════════════════════════════════════════════════════════╗ # ║ Singleton instance + wrappers ║ # ╚══════════════════════════════════════════════════════════════════════════╝ processor = VideoProcessor() def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str: return processor.load_models(progress_callback) def process_video_fixed( video_path: str, background_choice: str, custom_background_path: Optional[str], progress_callback: Optional[Callable] = None, use_two_stage: bool = False, chroma_preset: str = "standard", key_color_mode: str = "auto", # NEW preview_mask: bool = False, preview_greenscreen: bool = False, ) -> Tuple[Optional[str], str]: return processor.process_video( video_path, background_choice, custom_background_path, progress_callback, use_two_stage, chroma_preset, key_color_mode, # NEW preview_mask, preview_greenscreen, ) def get_model_status() -> Dict[str, Any]: return processor.get_status() def get_cache_status() -> Dict[str, Any]: # Placeholder – could expose FS cache size, etc. return processor.get_status() PROCESS_CANCELLED = processor.cancel_event # ╔══════════════════════════════════════════════════════════════════════════╗ # ║ CLI ║ # ╚══════════════════════════════════════════════════════════════════════════╝ def main(): try: logger.info("Starting BackgroundFX Pro") logger.info(f"Device: {processor.device_manager.get_optimal_device()}") logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}") # UI lives in ui/components.py from ui.components import create_interface demo = create_interface() demo.queue().launch( server_name="0.0.0.0", server_port=7860, show_error=True, debug=False, ) finally: processor.cleanup_resources() if __name__ == "__main__": main()