|
|
|
|
|
""" |
|
|
BackgroundFX Pro - Main Application Entry Point |
|
|
Refactored modular architecture - orchestrates specialized components |
|
|
""" |
|
|
|
|
|
import early_env |
|
|
|
|
|
import os |
|
|
import logging |
|
|
import threading |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple, Dict, Any, Callable |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
try: |
|
|
import gradio_client.utils as gc_utils |
|
|
original_get_type = gc_utils.get_type |
|
|
|
|
|
def patched_get_type(schema): |
|
|
if not isinstance(schema, dict): |
|
|
if isinstance(schema, bool): |
|
|
return "boolean" |
|
|
if isinstance(schema, str): |
|
|
return "string" |
|
|
if isinstance(schema, (int, float)): |
|
|
return "number" |
|
|
return "string" |
|
|
return original_get_type(schema) |
|
|
|
|
|
gc_utils.get_type = patched_get_type |
|
|
logger.info("Gradio schema patch applied successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"Gradio patch failed: {e}") |
|
|
|
|
|
|
|
|
from processing.video.video_processor import ProcessorConfig |
|
|
from config.app_config import get_config |
|
|
|
|
|
|
|
|
from core.exceptions import ModelLoadingError, VideoProcessingError |
|
|
from utils.hardware.device_manager import DeviceManager |
|
|
from utils.system.memory_manager import MemoryManager |
|
|
from models.loaders.model_loader import ModelLoader |
|
|
from processing.video.video_processor import CoreVideoProcessor |
|
|
from processing.audio.audio_processor import AudioProcessor |
|
|
from utils.monitoring.progress_tracker import ProgressTracker |
|
|
|
|
|
|
|
|
from utilities import ( |
|
|
segment_person_hq, |
|
|
refine_mask_hq, |
|
|
replace_background_hq, |
|
|
create_professional_background, |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
validate_video_file |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from processing.two_stage.two_stage_processor import TwoStageProcessor, CHROMA_PRESETS |
|
|
TWO_STAGE_AVAILABLE = True |
|
|
except ImportError: |
|
|
TWO_STAGE_AVAILABLE = False |
|
|
CHROMA_PRESETS = {'standard': {}} |
|
|
|
|
|
|
|
|
class VideoProcessor: |
|
|
""" |
|
|
Main video processing orchestrator - coordinates all specialized components |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the video processor with all required components""" |
|
|
self.config = get_config() |
|
|
self.device_manager = DeviceManager() |
|
|
self.memory_manager = MemoryManager(self.device_manager.get_optimal_device()) |
|
|
|
|
|
|
|
|
self.model_loader = ModelLoader(self.device_manager, self.memory_manager) |
|
|
|
|
|
self.audio_processor = AudioProcessor() |
|
|
self.progress_tracker = None |
|
|
|
|
|
|
|
|
self.core_processor = None |
|
|
self.two_stage_processor = None |
|
|
|
|
|
|
|
|
self.models_loaded = False |
|
|
self.loading_lock = threading.Lock() |
|
|
self.cancel_event = threading.Event() |
|
|
|
|
|
logger.info(f"VideoProcessor initialized on device: {self.device_manager.get_optimal_device()}") |
|
|
|
|
|
def _initialize_progress_tracker(self, video_path: str, progress_callback: Optional[Callable] = None): |
|
|
"""Initialize progress tracker with video frame count""" |
|
|
try: |
|
|
import cv2 |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
cap.release() |
|
|
|
|
|
if total_frames <= 0: |
|
|
total_frames = 100 |
|
|
|
|
|
self.progress_tracker = ProgressTracker(total_frames, progress_callback) |
|
|
logger.info(f"Progress tracker initialized for {total_frames} frames") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not initialize progress tracker: {e}") |
|
|
|
|
|
self.progress_tracker = ProgressTracker(100, progress_callback) |
|
|
|
|
|
def load_models(self, progress_callback: Optional[Callable] = None) -> str: |
|
|
"""Load and validate all AI models""" |
|
|
with self.loading_lock: |
|
|
if self.models_loaded: |
|
|
return "Models already loaded and validated" |
|
|
|
|
|
try: |
|
|
self.cancel_event.clear() |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(0.0, f"Starting model loading on {self.device_manager.get_optimal_device()}") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
sam2_result, matanyone_result = self.model_loader.load_all_models( |
|
|
progress_callback=progress_callback, |
|
|
cancel_event=self.cancel_event |
|
|
) |
|
|
|
|
|
except IndexError as e: |
|
|
import traceback |
|
|
logger.error(f"IndexError in load_all_models: {e}") |
|
|
logger.error(f"Full traceback:\n{traceback.format_exc()}") |
|
|
|
|
|
|
|
|
tb = traceback.extract_tb(e.__traceback__) |
|
|
for frame in tb: |
|
|
logger.error(f" File: {frame.filename}, Line: {frame.lineno}, Function: {frame.name}") |
|
|
logger.error(f" Code: {frame.line}") |
|
|
|
|
|
|
|
|
raise ModelLoadingError(f"Model loading failed with IndexError at line {tb[-1].lineno}: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
logger.error(f"Unexpected error in load_all_models: {e}") |
|
|
logger.error(f"Error type: {type(e).__name__}") |
|
|
logger.error(f"Full traceback:\n{traceback.format_exc()}") |
|
|
raise |
|
|
|
|
|
if self.cancel_event.is_set(): |
|
|
return "Model loading cancelled" |
|
|
|
|
|
|
|
|
sam2_predictor = sam2_result.model if sam2_result else None |
|
|
matanyone_model = matanyone_result.model if matanyone_result else None |
|
|
|
|
|
|
|
|
success = sam2_predictor is not None or matanyone_model is not None |
|
|
|
|
|
if not success: |
|
|
return "Model loading failed - check logs for details" |
|
|
|
|
|
|
|
|
self.core_processor = CoreVideoProcessor( |
|
|
config=self.config, |
|
|
models=self.model_loader |
|
|
) |
|
|
|
|
|
|
|
|
if TWO_STAGE_AVAILABLE: |
|
|
if sam2_predictor is not None or matanyone_model is not None: |
|
|
try: |
|
|
|
|
|
self.two_stage_processor = TwoStageProcessor( |
|
|
sam2_predictor=sam2_predictor, |
|
|
matanyone_model=matanyone_model |
|
|
) |
|
|
logger.info("✅ Two-stage processor initialized with AI models") |
|
|
except Exception as e: |
|
|
logger.warning(f"Two-stage processor init failed: {e}") |
|
|
self.two_stage_processor = None |
|
|
else: |
|
|
logger.warning("Two-stage processor not initialized - models not available") |
|
|
if sam2_predictor is None: |
|
|
logger.warning(" - SAM2 predictor is None") |
|
|
if matanyone_model is None: |
|
|
logger.warning(" - MatAnyone model is None") |
|
|
|
|
|
self.models_loaded = True |
|
|
message = self.model_loader.get_load_summary() |
|
|
|
|
|
|
|
|
if self.two_stage_processor is not None: |
|
|
message += "\n✅ Two-stage processor ready with AI models" |
|
|
else: |
|
|
message += "\n⚠️ Two-stage processor not available" |
|
|
|
|
|
logger.info(message) |
|
|
return message |
|
|
|
|
|
except AttributeError as e: |
|
|
self.models_loaded = False |
|
|
error_msg = f"Model loading failed - method not found: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
except ModelLoadingError as e: |
|
|
self.models_loaded = False |
|
|
error_msg = f"Model loading failed: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
except Exception as e: |
|
|
self.models_loaded = False |
|
|
error_msg = f"Unexpected error during model loading: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str] = None, |
|
|
progress_callback: Optional[Callable] = None, |
|
|
use_two_stage: bool = False, |
|
|
chroma_preset: str = "standard", |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video with the specified parameters""" |
|
|
|
|
|
if not self.models_loaded or not self.core_processor: |
|
|
return None, "Models not loaded. Please load models first." |
|
|
|
|
|
if self.cancel_event.is_set(): |
|
|
return None, "Processing cancelled" |
|
|
|
|
|
|
|
|
self._initialize_progress_tracker(video_path, progress_callback) |
|
|
|
|
|
|
|
|
is_valid, validation_msg = validate_video_file(video_path) |
|
|
if not is_valid: |
|
|
return None, f"Invalid video: {validation_msg}" |
|
|
|
|
|
try: |
|
|
|
|
|
if use_two_stage: |
|
|
if not TWO_STAGE_AVAILABLE: |
|
|
return None, "Two-stage processing not available - module not found" |
|
|
|
|
|
if self.two_stage_processor is None: |
|
|
return None, "Two-stage processor not initialized - models may not be loaded properly" |
|
|
|
|
|
logger.info("Using two-stage processing pipeline with AI models") |
|
|
return self._process_two_stage( |
|
|
video_path, background_choice, custom_background_path, |
|
|
progress_callback, chroma_preset |
|
|
) |
|
|
else: |
|
|
logger.info("Using single-stage processing pipeline") |
|
|
return self._process_single_stage( |
|
|
video_path, background_choice, custom_background_path, |
|
|
progress_callback, preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
except VideoProcessingError as e: |
|
|
logger.error(f"Video processing failed: {e}") |
|
|
return None, f"Processing failed: {str(e)}" |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error during video processing: {e}") |
|
|
return None, f"Unexpected error: {str(e)}" |
|
|
|
|
|
def _process_single_stage( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable], |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video using single-stage pipeline""" |
|
|
|
|
|
|
|
|
import time |
|
|
timestamp = int(time.time()) |
|
|
output_dir = Path(self.config.output_dir) / "single_stage" |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
output_path = str(output_dir / f"processed_{timestamp}.mp4") |
|
|
|
|
|
|
|
|
result = self.core_processor.process_video( |
|
|
input_path=video_path, |
|
|
output_path=output_path, |
|
|
bg_config={'background_choice': background_choice, 'custom_path': custom_background_path} |
|
|
) |
|
|
|
|
|
if not result: |
|
|
return None, "Video processing failed" |
|
|
|
|
|
|
|
|
if not (preview_mask or preview_greenscreen): |
|
|
final_video_path = self.audio_processor.add_audio_to_video( |
|
|
original_video=video_path, |
|
|
processed_video=output_path |
|
|
) |
|
|
else: |
|
|
final_video_path = output_path |
|
|
|
|
|
success_msg = ( |
|
|
f"Processing completed successfully!\n" |
|
|
f"Frames processed: {result.get('frames', 'unknown')}\n" |
|
|
f"Background: {background_choice}\n" |
|
|
f"Mode: Single-stage\n" |
|
|
f"Device: {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
|
|
|
return final_video_path, success_msg |
|
|
|
|
|
def _process_two_stage( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable], |
|
|
chroma_preset: str |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video using two-stage pipeline""" |
|
|
|
|
|
if self.two_stage_processor is None: |
|
|
return None, "Two-stage processor not available" |
|
|
|
|
|
|
|
|
import cv2 |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
cap.release() |
|
|
|
|
|
|
|
|
background = self.core_processor.prepare_background( |
|
|
background_choice, custom_background_path, frame_width, frame_height |
|
|
) |
|
|
if background is None: |
|
|
return None, "Failed to prepare background" |
|
|
|
|
|
|
|
|
import time |
|
|
timestamp = int(time.time()) |
|
|
output_dir = Path(self.config.output_dir) / "two_stage" |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
final_output = str(output_dir / f"final_{timestamp}.mp4") |
|
|
|
|
|
chroma_settings = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS['standard']) |
|
|
|
|
|
logger.info(f"Starting two-stage processing with chroma preset: {chroma_preset}") |
|
|
result, message = self.two_stage_processor.process_full_pipeline( |
|
|
video_path, |
|
|
background, |
|
|
final_output, |
|
|
chroma_settings=chroma_settings, |
|
|
progress_callback=progress_callback |
|
|
) |
|
|
|
|
|
if result is None: |
|
|
return None, message |
|
|
|
|
|
success_msg = ( |
|
|
f"Two-stage processing completed!\n" |
|
|
f"Background: {background_choice}\n" |
|
|
f"Chroma Preset: {chroma_preset}\n" |
|
|
f"Quality: Cinema-grade with AI models\n" |
|
|
f"Device: {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
|
|
|
return result, success_msg |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive status of all components""" |
|
|
base_status = { |
|
|
'models_loaded': self.models_loaded, |
|
|
'two_stage_available': TWO_STAGE_AVAILABLE and self.two_stage_processor is not None, |
|
|
'device': str(self.device_manager.get_optimal_device()), |
|
|
'memory_usage': self.memory_manager.get_memory_usage(), |
|
|
'config': self.config.to_dict() |
|
|
} |
|
|
|
|
|
|
|
|
if self.model_loader: |
|
|
base_status['model_loader_available'] = True |
|
|
try: |
|
|
base_status['sam2_loaded'] = self.model_loader.get_sam2() is not None |
|
|
base_status['matanyone_loaded'] = self.model_loader.get_matanyone() is not None |
|
|
except AttributeError: |
|
|
base_status['sam2_loaded'] = False |
|
|
base_status['matanyone_loaded'] = False |
|
|
|
|
|
|
|
|
if self.core_processor: |
|
|
base_status['core_processor_loaded'] = True |
|
|
|
|
|
|
|
|
if self.two_stage_processor: |
|
|
base_status['two_stage_processor_ready'] = True |
|
|
else: |
|
|
base_status['two_stage_processor_ready'] = False |
|
|
|
|
|
|
|
|
if self.progress_tracker: |
|
|
base_status['progress'] = self.progress_tracker.get_all_progress() |
|
|
|
|
|
return base_status |
|
|
|
|
|
def cancel_processing(self): |
|
|
"""Cancel any ongoing processing""" |
|
|
self.cancel_event.set() |
|
|
logger.info("Processing cancellation requested") |
|
|
|
|
|
def cleanup_resources(self): |
|
|
"""Clean up all resources""" |
|
|
self.memory_manager.cleanup_aggressive() |
|
|
if self.model_loader: |
|
|
self.model_loader.cleanup() |
|
|
logger.info("Resources cleaned up") |
|
|
|
|
|
|
|
|
|
|
|
processor = VideoProcessor() |
|
|
|
|
|
|
|
|
|
|
|
def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str: |
|
|
"""Load models with validation - backward compatibility wrapper""" |
|
|
return processor.load_models(progress_callback) |
|
|
|
|
|
|
|
|
def process_video_fixed( |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable] = None, |
|
|
use_two_stage: bool = False, |
|
|
chroma_preset: str = "standard", |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video - backward compatibility wrapper""" |
|
|
return processor.process_video( |
|
|
video_path, background_choice, custom_background_path, |
|
|
progress_callback, use_two_stage, chroma_preset, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
|
|
|
def get_model_status() -> Dict[str, Any]: |
|
|
"""Get model status - backward compatibility wrapper""" |
|
|
return processor.get_status() |
|
|
|
|
|
|
|
|
def get_cache_status() -> Dict[str, Any]: |
|
|
"""Get cache status - backward compatibility wrapper""" |
|
|
return processor.get_status() |
|
|
|
|
|
|
|
|
|
|
|
PROCESS_CANCELLED = processor.cancel_event |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main application entry point""" |
|
|
try: |
|
|
logger.info("Starting Video Background Replacement application") |
|
|
logger.info(f"Device: {processor.device_manager.get_optimal_device()}") |
|
|
logger.info(f"Two-stage module available: {TWO_STAGE_AVAILABLE}") |
|
|
logger.info("Modular architecture loaded successfully") |
|
|
|
|
|
|
|
|
from ui_components import create_interface |
|
|
demo = create_interface() |
|
|
|
|
|
|
|
|
demo.queue().launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
show_error=True, |
|
|
debug=False |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Application startup failed: {e}") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
processor.cleanup_resources() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|