MogensR's picture
Update core/app.py
062de48
raw
history blame
20.9 kB
#!/usr/bin/env python3
"""
BackgroundFX Pro – Main Application Entry Point
Refactored modular architecture – orchestrates specialised components
"""
# ─────────────────────────────────────────────────────────────────────────────
# 0) Early env/threading hygiene (must run first)
# ─────────────────────────────────────────────────────────────────────────────
import early_env # sets OMP/MKL/OPENBLAS + torch threads safely
import logging
import threading
from pathlib import Path
from typing import Optional, Tuple, Dict, Any, Callable
# ─────────────────────────────────────────────────────────────────────────────
# 1) Logging
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("core.app")
# ─────────────────────────────────────────────────────────────────────────────
# 2) Patch Gradio schema early (HF Spaces quirk)
# ─────────────────────────────────────────────────────────────────────────────
try:
import gradio_client.utils as gc_utils
_orig_get_type = gc_utils.get_type
def _patched_get_type(schema):
if not isinstance(schema, dict):
if isinstance(schema, bool):
return "boolean"
if isinstance(schema, str):
return "string"
if isinstance(schema, (int, float)):
return "number"
return "string"
return _orig_get_type(schema)
gc_utils.get_type = _patched_get_type
logger.info("Gradio schema patch applied")
except Exception as e:
logger.warning(f"Gradio patch failed: {e}")
# ─────────────────────────────────────────────────────────────────────────────
# 3) Core config + components
# ─────────────────────────────────────────────────────────────────────────────
from config.app_config import get_config
from core.exceptions import ModelLoadingError, VideoProcessingError
from utils.hardware.device_manager import DeviceManager
from utils.system.memory_manager import MemoryManager
from models.loaders.model_loader import ModelLoader
from processing.video.video_processor import CoreVideoProcessor
from processing.audio.audio_processor import AudioProcessor
from utils.monitoring.progress_tracker import ProgressTracker
# Optional two-stage processor
try:
from processing.two_stage.two_stage_processor import (
TwoStageProcessor,
CHROMA_PRESETS,
)
TWO_STAGE_AVAILABLE = True
except Exception:
TWO_STAGE_AVAILABLE = False
CHROMA_PRESETS = {"standard": {}}
# Validation helper
from utils.cv_processing import validate_video_file
# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘ VideoProcessor class β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
class VideoProcessor:
"""
Main orchestrator – coordinates all specialised components.
"""
# ─────────────────────────────────────────────────────────────────────
# Init
# ─────────────────────────────────────────────────────────────────────
def __init__(self):
self.config = get_config()
self.device_manager = DeviceManager()
self.memory_manager = MemoryManager(self.device_manager.get_optimal_device())
self.model_loader = ModelLoader(self.device_manager, self.memory_manager)
self.audio_processor = AudioProcessor()
self.core_processor: CoreVideoProcessor | None = None
self.two_stage_processor: TwoStageProcessor | None = None
self.models_loaded = False
self.loading_lock = threading.Lock()
self.cancel_event = threading.Event()
self.progress_tracker: ProgressTracker | None = None
logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}")
# ─────────────────────────────────────────────────────────────────────
# Progress helper
# ─────────────────────────────────────────────────────────────────────
def _init_progress(self, video_path: str, cb: Optional[Callable] = None):
try:
import cv2
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if total <= 0:
total = 100
self.progress_tracker = ProgressTracker(total, cb)
except Exception as e:
logger.warning(f"Progress init failed: {e}")
self.progress_tracker = ProgressTracker(100, cb)
# ─────────────────────────────────────────────────────────────────────
# Model loading
# ─────────────────────────────────────────────────────────────────────
def load_models(self, progress_callback: Optional[Callable] = None) -> str:
with self.loading_lock:
if self.models_loaded:
return "Models already loaded and validated"
try:
self.cancel_event.clear()
if progress_callback:
progress_callback(
0.0, f"Loading on {self.device_manager.get_optimal_device()}"
)
sam2_loaded, mat_loaded = self.model_loader.load_all_models(
progress_callback=progress_callback, cancel_event=self.cancel_event
)
if self.cancel_event.is_set():
return "Model loading cancelled"
# Unwrap actual predictor / model objects
sam2_predictor = sam2_loaded.model if sam2_loaded else None
mat_model = mat_loaded.model if mat_loaded else None
# Core single-stage processor
self.core_processor = CoreVideoProcessor(
config=self.config, models=self.model_loader
)
# Two-stage processor (optional)
if TWO_STAGE_AVAILABLE and (sam2_predictor or mat_model):
try:
self.two_stage_processor = TwoStageProcessor(
sam2_predictor=sam2_predictor, matanyone_model=mat_model
)
logger.info("Two-stage processor initialised")
except Exception as e:
logger.warning(f"Two-stage init failed: {e}")
self.two_stage_processor = None
self.models_loaded = True
msg = self.model_loader.get_load_summary()
msg += (
"\nβœ… Two-stage processor ready"
if self.two_stage_processor
else "\n⚠️ Two-stage processor not available"
)
logger.info(msg)
return msg
except (AttributeError, ModelLoadingError) as e:
self.models_loaded = False
err = f"Model loading failed: {e}"
logger.error(err)
return err
except Exception as e:
self.models_loaded = False
err = f"Unexpected error during model loading: {e}"
logger.error(err)
return err
# ─────────────────────────────────────────────────────────────────────
# Public entry – process video
# ─────────────────────────────────────────────────────────────────────
def process_video(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str] = None,
progress_callback: Optional[Callable] = None,
use_two_stage: bool = False,
chroma_preset: str = "standard",
key_color_mode: str = "auto", # NEW
preview_mask: bool = False,
preview_greenscreen: bool = False,
) -> Tuple[Optional[str], str]:
"""
Dispatch to single-stage or two-stage pipeline.
"""
if not self.models_loaded or not self.core_processor:
return None, "Models not loaded. Please click β€œLoad Models” first."
if self.cancel_event.is_set():
return None, "Processing cancelled"
self._init_progress(video_path, progress_callback)
ok, why = validate_video_file(video_path)
if not ok:
return None, f"Invalid video: {why}"
try:
if use_two_stage:
if not TWO_STAGE_AVAILABLE:
return None, "Two-stage processing not available on this build"
if not self.two_stage_processor:
return None, "Two-stage processor not initialised"
return self._process_two_stage(
video_path,
background_choice,
custom_background_path,
progress_callback,
chroma_preset,
key_color_mode, # NEW
)
else:
return self._process_single_stage(
video_path,
background_choice,
custom_background_path,
progress_callback,
preview_mask,
preview_greenscreen,
)
except VideoProcessingError as e:
logger.error(f"Processing failed: {e}")
return None, f"Processing failed: {e}"
except Exception as e:
logger.error(f"Unexpected processing error: {e}")
return None, f"Unexpected error: {e}"
# ─────────────────────────────────────────────────────────────────────
# Private – single-stage
# ─────────────────────────────────────────────────────────────────────
def _process_single_stage(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable],
preview_mask: bool,
preview_greenscreen: bool,
) -> Tuple[Optional[str], str]:
import time
ts = int(time.time())
out_dir = Path(self.config.output_dir) / "single_stage"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = str(out_dir / f"processed_{ts}.mp4")
result = self.core_processor.process_video(
input_path=video_path,
output_path=out_path,
bg_config={
"background_choice": background_choice,
"custom_path": custom_background_path,
},
)
if not result:
return None, "Video processing failed"
if not (preview_mask or preview_greenscreen):
final_path = self.audio_processor.add_audio_to_video(
original_video=video_path, processed_video=out_path
)
else:
final_path = out_path
msg = (
"Processing completed.\n"
f"Frames: {result.get('frames', 'unknown')}\n"
f"Background: {background_choice}\n"
f"Mode: Single-stage\n"
f"Device: {self.device_manager.get_optimal_device()}"
)
return final_path, msg
# ─────────────────────────────────────────────────────────────────────
# Private – two-stage
# ─────────────────────────────────────────────────────────────────────
def _process_two_stage(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable],
chroma_preset: str,
key_color_mode: str, # NEW
) -> Tuple[Optional[str], str]:
if self.two_stage_processor is None:
return None, "Two-stage processor not available"
import cv2, time
cap = cv2.VideoCapture(video_path)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
background = self.core_processor.prepare_background(
background_choice, custom_background_path, w, h
)
if background is None:
return None, "Failed to prepare background"
ts = int(time.time())
out_dir = Path(self.config.output_dir) / "two_stage"
out_dir.mkdir(parents=True, exist_ok=True)
final_out = str(out_dir / f"final_{ts}.mp4")
chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS["standard"])
logger.info(f"Two-stage with preset: {chroma_preset} and key_color_mode={key_color_mode}")
result, message = self.two_stage_processor.process_full_pipeline(
video_path,
background,
final_out,
key_color_mode=key_color_mode, # NEW
chroma_settings=chroma_cfg,
progress_callback=progress_callback,
)
if result is None:
return None, message
msg = (
"Two-stage processing completed.\n"
f"Background: {background_choice}\n"
f"Chroma Preset: {chroma_preset}\n"
f"Device: {self.device_manager.get_optimal_device()}"
)
return result, msg
# ─────────────────────────────────────────────────────────────────────
# Status helpers
# ─────────────────────────────────────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
status = {
"models_loaded": self.models_loaded,
"two_stage_available": TWO_STAGE_AVAILABLE
and (self.two_stage_processor is not None),
"device": str(self.device_manager.get_optimal_device()),
"memory_usage": self.memory_manager.get_memory_usage(),
"config": self.config.to_dict(),
"core_processor_loaded": self.core_processor is not None,
}
try:
status["sam2_loaded"] = self.model_loader.get_sam2() is not None
status["matanyone_loaded"] = (
self.model_loader.get_matanyone() is not None
)
except Exception:
status["sam2_loaded"] = False
status["matanyone_loaded"] = False
if self.progress_tracker:
status["progress"] = self.progress_tracker.get_all_progress()
return status
def cancel_processing(self):
self.cancel_event.set()
logger.info("Cancellation requested")
def cleanup_resources(self):
self.memory_manager.cleanup_aggressive()
self.model_loader.cleanup()
logger.info("Resources cleaned up")
# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘ Singleton instance + wrappers β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
processor = VideoProcessor()
def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str:
return processor.load_models(progress_callback)
def process_video_fixed(
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable] = None,
use_two_stage: bool = False,
chroma_preset: str = "standard",
key_color_mode: str = "auto", # NEW
preview_mask: bool = False,
preview_greenscreen: bool = False,
) -> Tuple[Optional[str], str]:
return processor.process_video(
video_path,
background_choice,
custom_background_path,
progress_callback,
use_two_stage,
chroma_preset,
key_color_mode, # NEW
preview_mask,
preview_greenscreen,
)
def get_model_status() -> Dict[str, Any]:
return processor.get_status()
def get_cache_status() -> Dict[str, Any]:
# Placeholder – could expose FS cache size, etc.
return processor.get_status()
PROCESS_CANCELLED = processor.cancel_event
# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘ CLI β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
def main():
try:
logger.info("Starting BackgroundFX Pro")
logger.info(f"Device: {processor.device_manager.get_optimal_device()}")
logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}")
# UI lives in ui/components.py
from ui.components import create_interface
demo = create_interface()
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
debug=False,
)
finally:
processor.cleanup_resources()
if __name__ == "__main__":
main()