|
|
|
|
|
""" |
|
|
BackgroundFX Pro β Main Application Entry Point |
|
|
Refactored modular architecture β orchestrates specialised components |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import early_env |
|
|
|
|
|
import logging |
|
|
import threading |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple, Dict, Any, Callable |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
|
) |
|
|
logger = logging.getLogger("core.app") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
import gradio_client.utils as gc_utils |
|
|
|
|
|
_orig_get_type = gc_utils.get_type |
|
|
|
|
|
def _patched_get_type(schema): |
|
|
if not isinstance(schema, dict): |
|
|
if isinstance(schema, bool): |
|
|
return "boolean" |
|
|
if isinstance(schema, str): |
|
|
return "string" |
|
|
if isinstance(schema, (int, float)): |
|
|
return "number" |
|
|
return "string" |
|
|
return _orig_get_type(schema) |
|
|
|
|
|
gc_utils.get_type = _patched_get_type |
|
|
logger.info("Gradio schema patch applied") |
|
|
except Exception as e: |
|
|
logger.warning(f"Gradio patch failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from config.app_config import get_config |
|
|
from core.exceptions import ModelLoadingError, VideoProcessingError |
|
|
from utils.hardware.device_manager import DeviceManager |
|
|
from utils.system.memory_manager import MemoryManager |
|
|
from models.loaders.model_loader import ModelLoader |
|
|
from processing.video.video_processor import CoreVideoProcessor |
|
|
from processing.audio.audio_processor import AudioProcessor |
|
|
from utils.monitoring.progress_tracker import ProgressTracker |
|
|
|
|
|
|
|
|
try: |
|
|
from processing.two_stage.two_stage_processor import ( |
|
|
TwoStageProcessor, |
|
|
CHROMA_PRESETS, |
|
|
) |
|
|
|
|
|
TWO_STAGE_AVAILABLE = True |
|
|
except Exception: |
|
|
TWO_STAGE_AVAILABLE = False |
|
|
CHROMA_PRESETS = {"standard": {}} |
|
|
|
|
|
|
|
|
from utils.cv_processing import validate_video_file |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoProcessor: |
|
|
""" |
|
|
Main orchestrator β coordinates all specialised components. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self): |
|
|
self.config = get_config() |
|
|
self.device_manager = DeviceManager() |
|
|
self.memory_manager = MemoryManager(self.device_manager.get_optimal_device()) |
|
|
self.model_loader = ModelLoader(self.device_manager, self.memory_manager) |
|
|
|
|
|
self.audio_processor = AudioProcessor() |
|
|
self.core_processor: CoreVideoProcessor | None = None |
|
|
self.two_stage_processor: TwoStageProcessor | None = None |
|
|
|
|
|
self.models_loaded = False |
|
|
self.loading_lock = threading.Lock() |
|
|
self.cancel_event = threading.Event() |
|
|
self.progress_tracker: ProgressTracker | None = None |
|
|
|
|
|
logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _init_progress(self, video_path: str, cb: Optional[Callable] = None): |
|
|
try: |
|
|
import cv2 |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
cap.release() |
|
|
if total <= 0: |
|
|
total = 100 |
|
|
self.progress_tracker = ProgressTracker(total, cb) |
|
|
except Exception as e: |
|
|
logger.warning(f"Progress init failed: {e}") |
|
|
self.progress_tracker = ProgressTracker(100, cb) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_models(self, progress_callback: Optional[Callable] = None) -> str: |
|
|
with self.loading_lock: |
|
|
if self.models_loaded: |
|
|
return "Models already loaded and validated" |
|
|
|
|
|
try: |
|
|
self.cancel_event.clear() |
|
|
if progress_callback: |
|
|
progress_callback( |
|
|
0.0, f"Loading on {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
|
|
|
sam2_loaded, mat_loaded = self.model_loader.load_all_models( |
|
|
progress_callback=progress_callback, cancel_event=self.cancel_event |
|
|
) |
|
|
|
|
|
if self.cancel_event.is_set(): |
|
|
return "Model loading cancelled" |
|
|
|
|
|
|
|
|
sam2_predictor = sam2_loaded.model if sam2_loaded else None |
|
|
mat_model = mat_loaded.model if mat_loaded else None |
|
|
|
|
|
|
|
|
self.core_processor = CoreVideoProcessor( |
|
|
config=self.config, models=self.model_loader |
|
|
) |
|
|
|
|
|
|
|
|
if TWO_STAGE_AVAILABLE and (sam2_predictor or mat_model): |
|
|
try: |
|
|
self.two_stage_processor = TwoStageProcessor( |
|
|
sam2_predictor=sam2_predictor, matanyone_model=mat_model |
|
|
) |
|
|
logger.info("Two-stage processor initialised") |
|
|
except Exception as e: |
|
|
logger.warning(f"Two-stage init failed: {e}") |
|
|
self.two_stage_processor = None |
|
|
|
|
|
self.models_loaded = True |
|
|
msg = self.model_loader.get_load_summary() |
|
|
msg += ( |
|
|
"\nβ
Two-stage processor ready" |
|
|
if self.two_stage_processor |
|
|
else "\nβ οΈ Two-stage processor not available" |
|
|
) |
|
|
logger.info(msg) |
|
|
return msg |
|
|
|
|
|
except (AttributeError, ModelLoadingError) as e: |
|
|
self.models_loaded = False |
|
|
err = f"Model loading failed: {e}" |
|
|
logger.error(err) |
|
|
return err |
|
|
except Exception as e: |
|
|
self.models_loaded = False |
|
|
err = f"Unexpected error during model loading: {e}" |
|
|
logger.error(err) |
|
|
return err |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str] = None, |
|
|
progress_callback: Optional[Callable] = None, |
|
|
use_two_stage: bool = False, |
|
|
chroma_preset: str = "standard", |
|
|
key_color_mode: str = "auto", |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False, |
|
|
) -> Tuple[Optional[str], str]: |
|
|
""" |
|
|
Dispatch to single-stage or two-stage pipeline. |
|
|
""" |
|
|
if not self.models_loaded or not self.core_processor: |
|
|
return None, "Models not loaded. Please click βLoad Modelsβ first." |
|
|
if self.cancel_event.is_set(): |
|
|
return None, "Processing cancelled" |
|
|
|
|
|
self._init_progress(video_path, progress_callback) |
|
|
|
|
|
ok, why = validate_video_file(video_path) |
|
|
if not ok: |
|
|
return None, f"Invalid video: {why}" |
|
|
|
|
|
try: |
|
|
if use_two_stage: |
|
|
if not TWO_STAGE_AVAILABLE: |
|
|
return None, "Two-stage processing not available on this build" |
|
|
if not self.two_stage_processor: |
|
|
return None, "Two-stage processor not initialised" |
|
|
return self._process_two_stage( |
|
|
video_path, |
|
|
background_choice, |
|
|
custom_background_path, |
|
|
progress_callback, |
|
|
chroma_preset, |
|
|
key_color_mode, |
|
|
) |
|
|
else: |
|
|
return self._process_single_stage( |
|
|
video_path, |
|
|
background_choice, |
|
|
custom_background_path, |
|
|
progress_callback, |
|
|
preview_mask, |
|
|
preview_greenscreen, |
|
|
) |
|
|
|
|
|
except VideoProcessingError as e: |
|
|
logger.error(f"Processing failed: {e}") |
|
|
return None, f"Processing failed: {e}" |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected processing error: {e}") |
|
|
return None, f"Unexpected error: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_single_stage( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable], |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool, |
|
|
) -> Tuple[Optional[str], str]: |
|
|
import time |
|
|
|
|
|
ts = int(time.time()) |
|
|
out_dir = Path(self.config.output_dir) / "single_stage" |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
out_path = str(out_dir / f"processed_{ts}.mp4") |
|
|
|
|
|
result = self.core_processor.process_video( |
|
|
input_path=video_path, |
|
|
output_path=out_path, |
|
|
bg_config={ |
|
|
"background_choice": background_choice, |
|
|
"custom_path": custom_background_path, |
|
|
}, |
|
|
) |
|
|
if not result: |
|
|
return None, "Video processing failed" |
|
|
|
|
|
if not (preview_mask or preview_greenscreen): |
|
|
final_path = self.audio_processor.add_audio_to_video( |
|
|
original_video=video_path, processed_video=out_path |
|
|
) |
|
|
else: |
|
|
final_path = out_path |
|
|
|
|
|
msg = ( |
|
|
"Processing completed.\n" |
|
|
f"Frames: {result.get('frames', 'unknown')}\n" |
|
|
f"Background: {background_choice}\n" |
|
|
f"Mode: Single-stage\n" |
|
|
f"Device: {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
return final_path, msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_two_stage( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable], |
|
|
chroma_preset: str, |
|
|
key_color_mode: str, |
|
|
) -> Tuple[Optional[str], str]: |
|
|
if self.two_stage_processor is None: |
|
|
return None, "Two-stage processor not available" |
|
|
|
|
|
import cv2, time |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
cap.release() |
|
|
|
|
|
background = self.core_processor.prepare_background( |
|
|
background_choice, custom_background_path, w, h |
|
|
) |
|
|
if background is None: |
|
|
return None, "Failed to prepare background" |
|
|
|
|
|
ts = int(time.time()) |
|
|
out_dir = Path(self.config.output_dir) / "two_stage" |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
final_out = str(out_dir / f"final_{ts}.mp4") |
|
|
|
|
|
chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS["standard"]) |
|
|
logger.info(f"Two-stage with preset: {chroma_preset} and key_color_mode={key_color_mode}") |
|
|
|
|
|
result, message = self.two_stage_processor.process_full_pipeline( |
|
|
video_path, |
|
|
background, |
|
|
final_out, |
|
|
key_color_mode=key_color_mode, |
|
|
chroma_settings=chroma_cfg, |
|
|
progress_callback=progress_callback, |
|
|
) |
|
|
if result is None: |
|
|
return None, message |
|
|
|
|
|
msg = ( |
|
|
"Two-stage processing completed.\n" |
|
|
f"Background: {background_choice}\n" |
|
|
f"Chroma Preset: {chroma_preset}\n" |
|
|
f"Device: {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
return result, msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
status = { |
|
|
"models_loaded": self.models_loaded, |
|
|
"two_stage_available": TWO_STAGE_AVAILABLE |
|
|
and (self.two_stage_processor is not None), |
|
|
"device": str(self.device_manager.get_optimal_device()), |
|
|
"memory_usage": self.memory_manager.get_memory_usage(), |
|
|
"config": self.config.to_dict(), |
|
|
"core_processor_loaded": self.core_processor is not None, |
|
|
} |
|
|
try: |
|
|
status["sam2_loaded"] = self.model_loader.get_sam2() is not None |
|
|
status["matanyone_loaded"] = ( |
|
|
self.model_loader.get_matanyone() is not None |
|
|
) |
|
|
except Exception: |
|
|
status["sam2_loaded"] = False |
|
|
status["matanyone_loaded"] = False |
|
|
|
|
|
if self.progress_tracker: |
|
|
status["progress"] = self.progress_tracker.get_all_progress() |
|
|
return status |
|
|
|
|
|
def cancel_processing(self): |
|
|
self.cancel_event.set() |
|
|
logger.info("Cancellation requested") |
|
|
|
|
|
def cleanup_resources(self): |
|
|
self.memory_manager.cleanup_aggressive() |
|
|
self.model_loader.cleanup() |
|
|
logger.info("Resources cleaned up") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
processor = VideoProcessor() |
|
|
|
|
|
def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str: |
|
|
return processor.load_models(progress_callback) |
|
|
|
|
|
def process_video_fixed( |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable] = None, |
|
|
use_two_stage: bool = False, |
|
|
chroma_preset: str = "standard", |
|
|
key_color_mode: str = "auto", |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False, |
|
|
) -> Tuple[Optional[str], str]: |
|
|
return processor.process_video( |
|
|
video_path, |
|
|
background_choice, |
|
|
custom_background_path, |
|
|
progress_callback, |
|
|
use_two_stage, |
|
|
chroma_preset, |
|
|
key_color_mode, |
|
|
preview_mask, |
|
|
preview_greenscreen, |
|
|
) |
|
|
|
|
|
def get_model_status() -> Dict[str, Any]: |
|
|
return processor.get_status() |
|
|
|
|
|
def get_cache_status() -> Dict[str, Any]: |
|
|
|
|
|
return processor.get_status() |
|
|
|
|
|
PROCESS_CANCELLED = processor.cancel_event |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
try: |
|
|
logger.info("Starting BackgroundFX Pro") |
|
|
logger.info(f"Device: {processor.device_manager.get_optimal_device()}") |
|
|
logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}") |
|
|
|
|
|
|
|
|
from ui.components import create_interface |
|
|
|
|
|
demo = create_interface() |
|
|
demo.queue().launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
show_error=True, |
|
|
debug=False, |
|
|
) |
|
|
finally: |
|
|
processor.cleanup_resources() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|