|
|
|
|
|
""" |
|
|
Video Background Replacement - Main Application Entry Point |
|
|
Refactored modular architecture - orchestrates specialized components |
|
|
|
|
|
This file has been refactored from a monolithic 600+ line structure into |
|
|
a clean orchestration layer that coordinates specialized modules: |
|
|
- config: Application configuration and environment variables |
|
|
- device_manager: Hardware detection and optimization |
|
|
- memory_manager: Memory and GPU resource management |
|
|
- model_loader: AI model loading and validation |
|
|
- video_processor: Core video processing pipeline |
|
|
- audio_processor: Audio track handling and FFmpeg operations |
|
|
- progress_tracker: Progress monitoring and ETA calculations |
|
|
- exceptions: Custom exception classes for better error handling |
|
|
""" |
|
|
|
|
|
import os |
|
|
import logging |
|
|
import threading |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple, Dict, Any, Callable |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
try: |
|
|
import gradio_client.utils as gc_utils |
|
|
original_get_type = gc_utils.get_type |
|
|
|
|
|
def patched_get_type(schema): |
|
|
if not isinstance(schema, dict): |
|
|
if isinstance(schema, bool): |
|
|
return "boolean" |
|
|
if isinstance(schema, str): |
|
|
return "string" |
|
|
if isinstance(schema, (int, float)): |
|
|
return "number" |
|
|
return "string" |
|
|
return original_get_type(schema) |
|
|
|
|
|
gc_utils.get_type = patched_get_type |
|
|
logger.info("Gradio schema patch applied successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"Gradio patch failed: {e}") |
|
|
|
|
|
|
|
|
from app_config import ProcessingConfig |
|
|
from device_manager import DeviceManager |
|
|
from memory_manager import MemoryManager |
|
|
from model_loader import ModelLoader |
|
|
from video_processor import CoreVideoProcessor |
|
|
from audio_processor import AudioProcessor |
|
|
from progress_tracker import ProgressTracker |
|
|
from exceptions import VideoProcessingError, ModelLoadingError, DeviceError |
|
|
|
|
|
|
|
|
from utilities import ( |
|
|
segment_person_hq, |
|
|
refine_mask_hq, |
|
|
replace_background_hq, |
|
|
create_professional_background, |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
validate_video_file |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from two_stage_processor import TwoStageProcessor, CHROMA_PRESETS |
|
|
TWO_STAGE_AVAILABLE = True |
|
|
except ImportError: |
|
|
TWO_STAGE_AVAILABLE = False |
|
|
CHROMA_PRESETS = {'standard': {}} |
|
|
|
|
|
class VideoProcessor: |
|
|
""" |
|
|
Main video processing orchestrator - coordinates all specialized components |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the video processor with all required components""" |
|
|
self.config = ProcessingConfig() |
|
|
self.device_manager = DeviceManager() |
|
|
self.memory_manager = MemoryManager(self.device_manager.get_optimal_device()) |
|
|
self.model_loader = ModelLoader(self.device_manager.get_optimal_device()) |
|
|
self.audio_processor = AudioProcessor() |
|
|
|
|
|
|
|
|
self.core_processor = None |
|
|
self.two_stage_processor = None |
|
|
|
|
|
|
|
|
self.models_loaded = False |
|
|
self.loading_lock = threading.Lock() |
|
|
self.cancel_event = threading.Event() |
|
|
|
|
|
logger.info(f"VideoProcessor initialized on device: {self.device_manager.get_optimal_device()}") |
|
|
|
|
|
def load_models(self, progress_callback: Optional[Callable] = None) -> str: |
|
|
"""Load and validate all AI models""" |
|
|
with self.loading_lock: |
|
|
if self.models_loaded: |
|
|
return "Models already loaded and validated" |
|
|
|
|
|
try: |
|
|
self.cancel_event.clear() |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(0.0, f"Starting model loading on {self.device_manager.get_optimal_device()}") |
|
|
|
|
|
|
|
|
sam2_predictor, matanyone_model = self.model_loader.load_all_models( |
|
|
progress_callback=progress_callback, |
|
|
cancel_event=self.cancel_event |
|
|
) |
|
|
|
|
|
if self.cancel_event.is_set(): |
|
|
return "Model loading cancelled" |
|
|
|
|
|
|
|
|
self.core_processor = CoreVideoProcessor( |
|
|
sam2_predictor=sam2_predictor, |
|
|
matanyone_model=matanyone_model, |
|
|
config=self.config, |
|
|
memory_manager=self.memory_manager |
|
|
) |
|
|
|
|
|
|
|
|
if TWO_STAGE_AVAILABLE and sam2_predictor and matanyone_model: |
|
|
try: |
|
|
self.two_stage_processor = TwoStageProcessor(sam2_predictor, matanyone_model) |
|
|
logger.info("Two-stage processor initialized") |
|
|
except Exception as e: |
|
|
logger.warning(f"Two-stage processor init failed: {e}") |
|
|
|
|
|
self.models_loaded = True |
|
|
message = self.model_loader.get_load_summary() |
|
|
logger.info(message) |
|
|
return message |
|
|
|
|
|
except ModelLoadingError as e: |
|
|
self.models_loaded = False |
|
|
error_msg = f"Model loading failed: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
except Exception as e: |
|
|
self.models_loaded = False |
|
|
error_msg = f"Unexpected error during model loading: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str] = None, |
|
|
progress_callback: Optional[Callable] = None, |
|
|
use_two_stage: bool = False, |
|
|
chroma_preset: str = "standard", |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video with the specified parameters""" |
|
|
|
|
|
if not self.models_loaded or not self.core_processor: |
|
|
return None, "Models not loaded. Please load models first." |
|
|
|
|
|
if self.cancel_event.is_set(): |
|
|
return None, "Processing cancelled" |
|
|
|
|
|
|
|
|
is_valid, validation_msg = validate_video_file(video_path) |
|
|
if not is_valid: |
|
|
return None, f"Invalid video: {validation_msg}" |
|
|
|
|
|
try: |
|
|
|
|
|
if use_two_stage and TWO_STAGE_AVAILABLE and self.two_stage_processor: |
|
|
return self._process_two_stage( |
|
|
video_path, background_choice, custom_background_path, |
|
|
progress_callback, chroma_preset |
|
|
) |
|
|
else: |
|
|
return self._process_single_stage( |
|
|
video_path, background_choice, custom_background_path, |
|
|
progress_callback, preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
except VideoProcessingError as e: |
|
|
logger.error(f"Video processing failed: {e}") |
|
|
return None, f"Processing failed: {str(e)}" |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error during video processing: {e}") |
|
|
return None, f"Unexpected error: {str(e)}" |
|
|
|
|
|
def _process_single_stage( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable], |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video using single-stage pipeline""" |
|
|
|
|
|
|
|
|
processed_video_path, process_message = self.core_processor.process_video( |
|
|
video_path=video_path, |
|
|
background_choice=background_choice, |
|
|
custom_background_path=custom_background_path, |
|
|
progress_callback=progress_callback, |
|
|
cancel_event=self.cancel_event, |
|
|
preview_mask=preview_mask, |
|
|
preview_greenscreen=preview_greenscreen |
|
|
) |
|
|
|
|
|
if processed_video_path is None: |
|
|
return None, process_message |
|
|
|
|
|
|
|
|
if not (preview_mask or preview_greenscreen): |
|
|
final_video_path = self.audio_processor.add_audio_to_video( |
|
|
original_video=video_path, |
|
|
processed_video=processed_video_path |
|
|
) |
|
|
else: |
|
|
final_video_path = processed_video_path |
|
|
|
|
|
success_msg = ( |
|
|
f"{process_message}\n" |
|
|
f"Background: {background_choice}\n" |
|
|
f"Mode: Single-stage\n" |
|
|
f"Device: {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
|
|
|
return final_video_path, success_msg |
|
|
|
|
|
def _process_two_stage( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable], |
|
|
chroma_preset: str |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video using two-stage pipeline""" |
|
|
|
|
|
|
|
|
import cv2 |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
cap.release() |
|
|
|
|
|
|
|
|
background = self.core_processor.prepare_background( |
|
|
background_choice, custom_background_path, frame_width, frame_height |
|
|
) |
|
|
if background is None: |
|
|
return None, "Failed to prepare background" |
|
|
|
|
|
|
|
|
import time |
|
|
timestamp = int(time.time()) |
|
|
final_output = f"/tmp/twostage_final_{timestamp}.mp4" |
|
|
|
|
|
chroma_settings = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS['standard']) |
|
|
|
|
|
result, message = self.two_stage_processor.process_full_pipeline( |
|
|
video_path, |
|
|
background, |
|
|
final_output, |
|
|
chroma_settings=chroma_settings, |
|
|
progress_callback=progress_callback |
|
|
) |
|
|
|
|
|
if result is None: |
|
|
return None, message |
|
|
|
|
|
success_msg = ( |
|
|
f"Two-stage success!\n" |
|
|
f"Background: {background_choice}\n" |
|
|
f"Preset: {chroma_preset}\n" |
|
|
f"Quality: Cinema-grade\n" |
|
|
f"Device: {self.device_manager.get_optimal_device()}" |
|
|
) |
|
|
|
|
|
return result, success_msg |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive status of all components""" |
|
|
base_status = { |
|
|
'models_loaded': self.models_loaded, |
|
|
'two_stage_available': TWO_STAGE_AVAILABLE and self.two_stage_processor is not None, |
|
|
'device': str(self.device_manager.get_optimal_device()), |
|
|
'memory_usage': self.memory_manager.get_memory_usage(), |
|
|
'config': self.config.to_dict() |
|
|
} |
|
|
|
|
|
|
|
|
if self.model_loader: |
|
|
base_status.update(self.model_loader.get_status()) |
|
|
|
|
|
|
|
|
if self.core_processor: |
|
|
base_status.update(self.core_processor.get_status()) |
|
|
|
|
|
return base_status |
|
|
|
|
|
def cancel_processing(self): |
|
|
"""Cancel any ongoing processing""" |
|
|
self.cancel_event.set() |
|
|
logger.info("Processing cancellation requested") |
|
|
|
|
|
def cleanup_resources(self): |
|
|
"""Clean up all resources""" |
|
|
self.memory_manager.cleanup_aggressive() |
|
|
if self.model_loader: |
|
|
self.model_loader.cleanup() |
|
|
logger.info("Resources cleaned up") |
|
|
|
|
|
|
|
|
processor = VideoProcessor() |
|
|
|
|
|
|
|
|
def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str: |
|
|
"""Load models with validation - backward compatibility wrapper""" |
|
|
return processor.load_models(progress_callback) |
|
|
|
|
|
def process_video_fixed( |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
progress_callback: Optional[Callable] = None, |
|
|
use_two_stage: bool = False, |
|
|
chroma_preset: str = "standard", |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Process video - backward compatibility wrapper""" |
|
|
return processor.process_video( |
|
|
video_path, background_choice, custom_background_path, |
|
|
progress_callback, use_two_stage, chroma_preset, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
def get_model_status() -> Dict[str, Any]: |
|
|
"""Get model status - backward compatibility wrapper""" |
|
|
return processor.get_status() |
|
|
|
|
|
def get_cache_status() -> Dict[str, Any]: |
|
|
"""Get cache status - backward compatibility wrapper""" |
|
|
return processor.get_status() |
|
|
|
|
|
|
|
|
PROCESS_CANCELLED = processor.cancel_event |
|
|
|
|
|
def main(): |
|
|
"""Main application entry point""" |
|
|
try: |
|
|
logger.info("Starting Video Background Replacement application") |
|
|
logger.info(f"Device: {processor.device_manager.get_optimal_device()}") |
|
|
logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}") |
|
|
logger.info("Modular architecture loaded successfully") |
|
|
|
|
|
|
|
|
from ui_components import create_interface |
|
|
demo = create_interface() |
|
|
|
|
|
|
|
|
demo.queue().launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=True, |
|
|
show_error=True, |
|
|
debug=False |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Application startup failed: {e}") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
processor.cleanup_resources() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |