MogensR's picture
Update core/app.py
d03832f
raw
history blame
19.6 kB
#!/usr/bin/env python3
"""
BackgroundFX Pro - Main Application Entry Point
Refactored modular architecture - orchestrates specialized components
"""
import early_env # <<< centralizes the OMP/torch thread fix; must be first
import os
import logging
import threading
from pathlib import Path
from typing import Optional, Tuple, Dict, Any, Callable
# Configure logging first
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Apply Gradio schema patch early (before other imports)
try:
import gradio_client.utils as gc_utils
original_get_type = gc_utils.get_type
def patched_get_type(schema):
if not isinstance(schema, dict):
if isinstance(schema, bool):
return "boolean"
if isinstance(schema, str):
return "string"
if isinstance(schema, (int, float)):
return "number"
return "string"
return original_get_type(schema)
gc_utils.get_type = patched_get_type
logger.info("Gradio schema patch applied successfully")
except Exception as e:
logger.error(f"Gradio patch failed: {e}")
# Import configuration from new location
from processing.video.video_processor import ProcessorConfig
from config.app_config import get_config
# Import core components from new locations
from core.exceptions import ModelLoadingError, VideoProcessingError
from utils.hardware.device_manager import DeviceManager
from utils.system.memory_manager import MemoryManager
from models.loaders.model_loader import ModelLoader
from processing.video.video_processor import CoreVideoProcessor
from processing.audio.audio_processor import AudioProcessor
from utils.monitoring.progress_tracker import ProgressTracker
# Import existing utilities (temporary during migration)
from utilities import (
segment_person_hq,
refine_mask_hq,
replace_background_hq,
create_professional_background,
PROFESSIONAL_BACKGROUNDS,
validate_video_file
)
# Import two-stage processor if available
try:
from processing.two_stage.two_stage_processor import TwoStageProcessor, CHROMA_PRESETS
TWO_STAGE_AVAILABLE = True
except ImportError:
TWO_STAGE_AVAILABLE = False
CHROMA_PRESETS = {'standard': {}}
class VideoProcessor:
"""
Main video processing orchestrator - coordinates all specialized components
"""
def __init__(self):
"""Initialize the video processor with all required components"""
self.config = get_config() # Use singleton config
self.device_manager = DeviceManager()
self.memory_manager = MemoryManager(self.device_manager.get_optimal_device())
# Initialize ModelLoader with DeviceManager and MemoryManager (as per actual implementation)
self.model_loader = ModelLoader(self.device_manager, self.memory_manager)
self.audio_processor = AudioProcessor()
self.progress_tracker = None
# Initialize core processor (will be set up after models load)
self.core_processor = None
self.two_stage_processor = None
# State management
self.models_loaded = False
self.loading_lock = threading.Lock()
self.cancel_event = threading.Event()
logger.info(f"VideoProcessor initialized on device: {self.device_manager.get_optimal_device()}")
def _initialize_progress_tracker(self, video_path: str, progress_callback: Optional[Callable] = None):
"""Initialize progress tracker with video frame count"""
try:
import cv2
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if total_frames <= 0:
total_frames = 100 # Fallback estimate
self.progress_tracker = ProgressTracker(total_frames, progress_callback)
logger.info(f"Progress tracker initialized for {total_frames} frames")
except Exception as e:
logger.warning(f"Could not initialize progress tracker: {e}")
# Fallback to basic tracker
self.progress_tracker = ProgressTracker(100, progress_callback)
def load_models(self, progress_callback: Optional[Callable] = None) -> str:
"""Load and validate all AI models"""
with self.loading_lock:
if self.models_loaded:
return "Models already loaded and validated"
try:
self.cancel_event.clear()
if progress_callback:
progress_callback(0.0, f"Starting model loading on {self.device_manager.get_optimal_device()}")
# Add detailed debugging for the IndexError
try:
# Load models using load_all_models which returns tuple of (LoadedModel, LoadedModel)
sam2_result, matanyone_result = self.model_loader.load_all_models(
progress_callback=progress_callback,
cancel_event=self.cancel_event
)
except IndexError as e:
import traceback
logger.error(f"IndexError in load_all_models: {e}")
logger.error(f"Full traceback:\n{traceback.format_exc()}")
# Get more context about where exactly the error happened
tb = traceback.extract_tb(e.__traceback__)
for frame in tb:
logger.error(f" File: {frame.filename}, Line: {frame.lineno}, Function: {frame.name}")
logger.error(f" Code: {frame.line}")
# Re-raise with more context
raise ModelLoadingError(f"Model loading failed with IndexError at line {tb[-1].lineno}: {e}")
except Exception as e:
import traceback
logger.error(f"Unexpected error in load_all_models: {e}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Full traceback:\n{traceback.format_exc()}")
raise
if self.cancel_event.is_set():
return "Model loading cancelled"
# Extract actual models from LoadedModel wrappers for two-stage processor
sam2_predictor = sam2_result.model if sam2_result else None
matanyone_model = matanyone_result.model if matanyone_result else None
# Check if at least one model loaded successfully
success = sam2_predictor is not None or matanyone_model is not None
if not success:
return "Model loading failed - check logs for details"
# Initialize core processor with the model loader (it expects a models object)
self.core_processor = CoreVideoProcessor(
config=self.config,
models=self.model_loader # Pass the whole model_loader object
)
# Initialize two-stage processor if available and models loaded
if TWO_STAGE_AVAILABLE:
if sam2_predictor is not None or matanyone_model is not None:
try:
# Two-stage processor needs the actual models
self.two_stage_processor = TwoStageProcessor(
sam2_predictor=sam2_predictor,
matanyone_model=matanyone_model
)
logger.info("✅ Two-stage processor initialized with AI models")
except Exception as e:
logger.warning(f"Two-stage processor init failed: {e}")
self.two_stage_processor = None
else:
logger.warning("Two-stage processor not initialized - models not available")
if sam2_predictor is None:
logger.warning(" - SAM2 predictor is None")
if matanyone_model is None:
logger.warning(" - MatAnyone model is None")
self.models_loaded = True
message = self.model_loader.get_load_summary()
# Add two-stage status to message
if self.two_stage_processor is not None:
message += "\n✅ Two-stage processor ready with AI models"
else:
message += "\n⚠️ Two-stage processor not available"
logger.info(message)
return message
except AttributeError as e:
self.models_loaded = False
error_msg = f"Model loading failed - method not found: {str(e)}"
logger.error(error_msg)
return error_msg
except ModelLoadingError as e:
self.models_loaded = False
error_msg = f"Model loading failed: {str(e)}"
logger.error(error_msg)
return error_msg
except Exception as e:
self.models_loaded = False
error_msg = f"Unexpected error during model loading: {str(e)}"
logger.error(error_msg)
return error_msg
def process_video(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str] = None,
progress_callback: Optional[Callable] = None,
use_two_stage: bool = False,
chroma_preset: str = "standard",
preview_mask: bool = False,
preview_greenscreen: bool = False
) -> Tuple[Optional[str], str]:
"""Process video with the specified parameters"""
if not self.models_loaded or not self.core_processor:
return None, "Models not loaded. Please load models first."
if self.cancel_event.is_set():
return None, "Processing cancelled"
# Initialize progress tracker with video frame count
self._initialize_progress_tracker(video_path, progress_callback)
# Validate input file
is_valid, validation_msg = validate_video_file(video_path)
if not is_valid:
return None, f"Invalid video: {validation_msg}"
try:
# Route to appropriate processing method
if use_two_stage:
if not TWO_STAGE_AVAILABLE:
return None, "Two-stage processing not available - module not found"
if self.two_stage_processor is None:
return None, "Two-stage processor not initialized - models may not be loaded properly"
logger.info("Using two-stage processing pipeline with AI models")
return self._process_two_stage(
video_path, background_choice, custom_background_path,
progress_callback, chroma_preset
)
else:
logger.info("Using single-stage processing pipeline")
return self._process_single_stage(
video_path, background_choice, custom_background_path,
progress_callback, preview_mask, preview_greenscreen
)
except VideoProcessingError as e:
logger.error(f"Video processing failed: {e}")
return None, f"Processing failed: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error during video processing: {e}")
return None, f"Unexpected error: {str(e)}"
def _process_single_stage(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable],
preview_mask: bool,
preview_greenscreen: bool
) -> Tuple[Optional[str], str]:
"""Process video using single-stage pipeline"""
# Generate output path
import time
timestamp = int(time.time())
output_dir = Path(self.config.output_dir) / "single_stage"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = str(output_dir / f"processed_{timestamp}.mp4")
# Process video using core processor
result = self.core_processor.process_video(
input_path=video_path,
output_path=output_path,
bg_config={'background_choice': background_choice, 'custom_path': custom_background_path}
)
if not result:
return None, "Video processing failed"
# Add audio if not in preview mode
if not (preview_mask or preview_greenscreen):
final_video_path = self.audio_processor.add_audio_to_video(
original_video=video_path,
processed_video=output_path
)
else:
final_video_path = output_path
success_msg = (
f"Processing completed successfully!\n"
f"Frames processed: {result.get('frames', 'unknown')}\n"
f"Background: {background_choice}\n"
f"Mode: Single-stage\n"
f"Device: {self.device_manager.get_optimal_device()}"
)
return final_video_path, success_msg
def _process_two_stage(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable],
chroma_preset: str
) -> Tuple[Optional[str], str]:
"""Process video using two-stage pipeline"""
if self.two_stage_processor is None:
return None, "Two-stage processor not available"
# Get video dimensions for background preparation
import cv2
cap = cv2.VideoCapture(video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# Prepare background using core processor
background = self.core_processor.prepare_background(
background_choice, custom_background_path, frame_width, frame_height
)
if background is None:
return None, "Failed to prepare background"
# Process with two-stage pipeline
import time
timestamp = int(time.time())
output_dir = Path(self.config.output_dir) / "two_stage"
output_dir.mkdir(parents=True, exist_ok=True)
final_output = str(output_dir / f"final_{timestamp}.mp4")
chroma_settings = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS['standard'])
logger.info(f"Starting two-stage processing with chroma preset: {chroma_preset}")
result, message = self.two_stage_processor.process_full_pipeline(
video_path,
background,
final_output,
chroma_settings=chroma_settings,
progress_callback=progress_callback
)
if result is None:
return None, message
success_msg = (
f"Two-stage processing completed!\n"
f"Background: {background_choice}\n"
f"Chroma Preset: {chroma_preset}\n"
f"Quality: Cinema-grade with AI models\n"
f"Device: {self.device_manager.get_optimal_device()}"
)
return result, success_msg
def get_status(self) -> Dict[str, Any]:
"""Get comprehensive status of all components"""
base_status = {
'models_loaded': self.models_loaded,
'two_stage_available': TWO_STAGE_AVAILABLE and self.two_stage_processor is not None,
'device': str(self.device_manager.get_optimal_device()),
'memory_usage': self.memory_manager.get_memory_usage(),
'config': self.config.to_dict()
}
# Add model-specific status if available
if self.model_loader:
base_status['model_loader_available'] = True
try:
base_status['sam2_loaded'] = self.model_loader.get_sam2() is not None
base_status['matanyone_loaded'] = self.model_loader.get_matanyone() is not None
except AttributeError:
base_status['sam2_loaded'] = False
base_status['matanyone_loaded'] = False
# Add processing status if available
if self.core_processor:
base_status['core_processor_loaded'] = True
# Add two-stage processor status
if self.two_stage_processor:
base_status['two_stage_processor_ready'] = True
else:
base_status['two_stage_processor_ready'] = False
# Add progress tracking if available
if self.progress_tracker:
base_status['progress'] = self.progress_tracker.get_all_progress()
return base_status
def cancel_processing(self):
"""Cancel any ongoing processing"""
self.cancel_event.set()
logger.info("Processing cancellation requested")
def cleanup_resources(self):
"""Clean up all resources"""
self.memory_manager.cleanup_aggressive()
if self.model_loader:
self.model_loader.cleanup()
logger.info("Resources cleaned up")
# Global processor instance for application
processor = VideoProcessor()
# Backward compatibility functions for existing UI
def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str:
"""Load models with validation - backward compatibility wrapper"""
return processor.load_models(progress_callback)
def process_video_fixed(
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable] = None,
use_two_stage: bool = False,
chroma_preset: str = "standard",
preview_mask: bool = False,
preview_greenscreen: bool = False
) -> Tuple[Optional[str], str]:
"""Process video - backward compatibility wrapper"""
return processor.process_video(
video_path, background_choice, custom_background_path,
progress_callback, use_two_stage, chroma_preset,
preview_mask, preview_greenscreen
)
def get_model_status() -> Dict[str, Any]:
"""Get model status - backward compatibility wrapper"""
return processor.get_status()
def get_cache_status() -> Dict[str, Any]:
"""Get cache status - backward compatibility wrapper"""
return processor.get_status()
# For backward compatibility
PROCESS_CANCELLED = processor.cancel_event
def main():
"""Main application entry point"""
try:
logger.info("Starting Video Background Replacement application")
logger.info(f"Device: {processor.device_manager.get_optimal_device()}")
logger.info(f"Two-stage module available: {TWO_STAGE_AVAILABLE}")
logger.info("Modular architecture loaded successfully")
# Import and create UI
from ui_components import create_interface
demo = create_interface()
# Launch application (no share=True on Spaces)
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
debug=False
)
except Exception as e:
logger.error(f"Application startup failed: {e}")
raise
finally:
# Cleanup on exit
processor.cleanup_resources()
if __name__ == "__main__":
main()