|
|
""" |
|
|
Core Video Processing Module |
|
|
Handles the main video processing pipeline, frame processing, and background replacement |
|
|
""" |
|
|
|
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import time |
|
|
import logging |
|
|
import threading |
|
|
from typing import Optional, Tuple, Dict, Any, Callable |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
import app_config |
|
|
import memory_manager |
|
|
import progress_tracker |
|
|
import exceptions |
|
|
|
|
|
|
|
|
from utilities import ( |
|
|
segment_person_hq, |
|
|
refine_mask_hq, |
|
|
replace_background_hq, |
|
|
create_professional_background, |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
validate_video_file |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class CoreVideoProcessor: |
|
|
""" |
|
|
Core video processing pipeline for background replacement |
|
|
""" |
|
|
|
|
|
def __init__(self, sam2_predictor: Any, matanyone_model: Any, |
|
|
config: app_config.ProcessingConfig, memory_mgr: memory_manager.MemoryManager): |
|
|
self.sam2_predictor = sam2_predictor |
|
|
self.matanyone_model = matanyone_model |
|
|
self.config = config |
|
|
self.memory_manager = memory_mgr |
|
|
|
|
|
|
|
|
self.processing_active = False |
|
|
self.last_refined_mask = None |
|
|
self.frame_cache = {} |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'videos_processed': 0, |
|
|
'total_frames_processed': 0, |
|
|
'total_processing_time': 0.0, |
|
|
'average_fps': 0.0, |
|
|
'failed_frames': 0, |
|
|
'successful_frames': 0, |
|
|
'cache_hits': 0, |
|
|
'segmentation_errors': 0, |
|
|
'refinement_errors': 0 |
|
|
} |
|
|
|
|
|
|
|
|
self.quality_settings = config.get_quality_settings() |
|
|
|
|
|
logger.info("CoreVideoProcessor initialized") |
|
|
logger.info(f"Quality preset: {config.quality_preset}") |
|
|
logger.info(f"Quality settings: {self.quality_settings}") |
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str] = None, |
|
|
progress_callback: Optional[Callable] = None, |
|
|
cancel_event: Optional[threading.Event] = None, |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False |
|
|
) -> Tuple[Optional[str], str]: |
|
|
""" |
|
|
Process video with background replacement |
|
|
|
|
|
Args: |
|
|
video_path: Input video path |
|
|
background_choice: Background type or name |
|
|
custom_background_path: Path to custom background (if applicable) |
|
|
progress_callback: Progress update callback |
|
|
cancel_event: Event to cancel processing |
|
|
preview_mask: Generate mask preview instead of final output |
|
|
preview_greenscreen: Generate greenscreen preview |
|
|
|
|
|
Returns: |
|
|
Tuple of (output_path, status_message) |
|
|
""" |
|
|
if self.processing_active: |
|
|
return None, "Processing already in progress" |
|
|
|
|
|
self.processing_active = True |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
is_valid, validation_msg = validate_video_file(video_path) |
|
|
if not is_valid: |
|
|
return None, f"Invalid video file: {validation_msg}" |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return None, "Could not open video file" |
|
|
|
|
|
|
|
|
video_info = self._get_video_info(cap) |
|
|
logger.info(f"Processing video: {video_info}") |
|
|
|
|
|
|
|
|
memory_check = self.memory_manager.can_process_video( |
|
|
video_info['width'], video_info['height'] |
|
|
) |
|
|
|
|
|
if not memory_check['can_process']: |
|
|
cap.release() |
|
|
return None, f"Insufficient memory: {memory_check['recommendations']}" |
|
|
|
|
|
|
|
|
background = self.prepare_background( |
|
|
background_choice, custom_background_path, |
|
|
video_info['width'], video_info['height'] |
|
|
) |
|
|
|
|
|
if background is None: |
|
|
cap.release() |
|
|
return None, "Failed to prepare background" |
|
|
|
|
|
|
|
|
output_path = self._setup_output_video(video_info, preview_mask, preview_greenscreen) |
|
|
out = self._create_video_writer(output_path, video_info) |
|
|
|
|
|
if out is None: |
|
|
cap.release() |
|
|
return None, "Could not create output video writer" |
|
|
|
|
|
|
|
|
result = self._process_video_frames( |
|
|
cap, out, background, video_info, |
|
|
progress_callback, cancel_event, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
if result['success']: |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
self._update_processing_stats(video_info, processing_time, result) |
|
|
|
|
|
success_msg = ( |
|
|
f"Processing completed successfully!\n" |
|
|
f"Processed: {result['successful_frames']}/{result['total_frames']} frames\n" |
|
|
f"Time: {processing_time:.1f}s\n" |
|
|
f"Average FPS: {result['total_frames'] / processing_time:.1f}\n" |
|
|
f"Background: {background_choice}" |
|
|
) |
|
|
|
|
|
return output_path, success_msg |
|
|
else: |
|
|
|
|
|
try: |
|
|
os.remove(output_path) |
|
|
except: |
|
|
pass |
|
|
return None, result['error_message'] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video processing failed: {e}") |
|
|
return None, f"Processing failed: {str(e)}" |
|
|
|
|
|
finally: |
|
|
self.processing_active = False |
|
|
|
|
|
def _get_video_info(self, cap: cv2.VideoCapture) -> Dict[str, Any]: |
|
|
"""Extract comprehensive video information""" |
|
|
return { |
|
|
'fps': cap.get(cv2.CAP_PROP_FPS), |
|
|
'total_frames': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
|
|
'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
|
|
'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
|
|
'duration': cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS), |
|
|
'codec': int(cap.get(cv2.CAP_PROP_FOURCC)) |
|
|
} |
|
|
|
|
|
def _setup_output_video(self, video_info: Dict[str, Any], |
|
|
preview_mask: bool, preview_greenscreen: bool) -> str: |
|
|
"""Setup output video path""" |
|
|
timestamp = int(time.time()) |
|
|
|
|
|
if preview_mask: |
|
|
filename = f"mask_preview_{timestamp}.mp4" |
|
|
elif preview_greenscreen: |
|
|
filename = f"greenscreen_preview_{timestamp}.mp4" |
|
|
else: |
|
|
filename = f"processed_video_{timestamp}.mp4" |
|
|
|
|
|
return os.path.join(self.config.temp_dir, filename) |
|
|
|
|
|
def _create_video_writer(self, output_path: str, |
|
|
video_info: Dict[str, Any]) -> Optional[cv2.VideoWriter]: |
|
|
"""Create video writer with optimal settings""" |
|
|
try: |
|
|
|
|
|
if self.config.output_quality == 'high': |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
else: |
|
|
fourcc = cv2.VideoWriter_fourcc(*'XVID') |
|
|
|
|
|
writer = cv2.VideoWriter( |
|
|
output_path, |
|
|
fourcc, |
|
|
video_info['fps'], |
|
|
(video_info['width'], video_info['height']) |
|
|
) |
|
|
|
|
|
if not writer.isOpened(): |
|
|
logger.error("Failed to open video writer") |
|
|
return None |
|
|
|
|
|
return writer |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating video writer: {e}") |
|
|
return None |
|
|
|
|
|
def _process_video_frames( |
|
|
self, |
|
|
cap: cv2.VideoCapture, |
|
|
out: cv2.VideoWriter, |
|
|
background: np.ndarray, |
|
|
video_info: Dict[str, Any], |
|
|
progress_callback: Optional[Callable], |
|
|
cancel_event: Optional[threading.Event], |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> Dict[str, Any]: |
|
|
"""Process all video frames""" |
|
|
|
|
|
|
|
|
prog_tracker = progress_tracker.ProgressTracker( |
|
|
total_frames=video_info['total_frames'], |
|
|
callback=progress_callback, |
|
|
track_performance=True |
|
|
) |
|
|
|
|
|
frame_count = 0 |
|
|
successful_frames = 0 |
|
|
failed_frames = 0 |
|
|
|
|
|
|
|
|
self.last_refined_mask = None |
|
|
self.frame_cache.clear() |
|
|
|
|
|
try: |
|
|
prog_tracker.set_stage("Processing frames") |
|
|
|
|
|
while True: |
|
|
|
|
|
if cancel_event and cancel_event.is_set(): |
|
|
return { |
|
|
'success': False, |
|
|
'error_message': 'Processing cancelled by user', |
|
|
'total_frames': frame_count, |
|
|
'successful_frames': successful_frames, |
|
|
'failed_frames': failed_frames |
|
|
} |
|
|
|
|
|
|
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
try: |
|
|
|
|
|
prog_tracker.update(frame_count, "Processing frame") |
|
|
|
|
|
|
|
|
processed_frame = self._process_single_frame( |
|
|
frame, background, frame_count, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
|
|
|
out.write(processed_frame) |
|
|
successful_frames += 1 |
|
|
|
|
|
|
|
|
if frame_count % self.config.memory_cleanup_interval == 0: |
|
|
self.memory_manager.auto_cleanup_if_needed() |
|
|
|
|
|
except Exception as frame_error: |
|
|
logger.warning(f"Frame {frame_count} processing failed: {frame_error}") |
|
|
|
|
|
|
|
|
out.write(frame) |
|
|
failed_frames += 1 |
|
|
self.stats['failed_frames'] += 1 |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
if self.config.frame_skip > 1: |
|
|
for _ in range(self.config.frame_skip - 1): |
|
|
ret, _ = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
final_stats = prog_tracker.finalize() |
|
|
|
|
|
return { |
|
|
'success': successful_frames > 0, |
|
|
'error_message': f'No frames processed successfully' if successful_frames == 0 else '', |
|
|
'total_frames': frame_count, |
|
|
'successful_frames': successful_frames, |
|
|
'failed_frames': failed_frames, |
|
|
'processing_stats': final_stats |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Frame processing loop failed: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error_message': f'Frame processing failed: {str(e)}', |
|
|
'total_frames': frame_count, |
|
|
'successful_frames': successful_frames, |
|
|
'failed_frames': failed_frames |
|
|
} |
|
|
|
|
|
def _process_single_frame( |
|
|
self, |
|
|
frame: np.ndarray, |
|
|
background: np.ndarray, |
|
|
frame_number: int, |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> np.ndarray: |
|
|
"""Process a single video frame""" |
|
|
|
|
|
try: |
|
|
|
|
|
mask = self._segment_person(frame, frame_number) |
|
|
|
|
|
|
|
|
if self._should_refine_mask(frame_number): |
|
|
refined_mask = self._refine_mask(frame, mask, frame_number) |
|
|
self.last_refined_mask = refined_mask.copy() |
|
|
else: |
|
|
|
|
|
refined_mask = self._apply_temporal_consistency(mask, frame_number) |
|
|
|
|
|
|
|
|
if preview_mask: |
|
|
return self._create_mask_preview(frame, refined_mask) |
|
|
elif preview_greenscreen: |
|
|
return self._create_greenscreen_preview(frame, refined_mask) |
|
|
else: |
|
|
return self._replace_background(frame, refined_mask, background) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Single frame processing failed: {e}") |
|
|
raise |
|
|
|
|
|
def _segment_person(self, frame: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""Perform person segmentation""" |
|
|
try: |
|
|
mask = segment_person_hq(frame, self.sam2_predictor) |
|
|
|
|
|
if mask is None or mask.size == 0: |
|
|
raise exceptions.SegmentationError(frame_number, "Segmentation returned empty mask") |
|
|
|
|
|
return mask |
|
|
|
|
|
except Exception as e: |
|
|
self.stats['segmentation_errors'] += 1 |
|
|
raise exceptions.SegmentationError(frame_number, f"Segmentation failed: {str(e)}") |
|
|
|
|
|
def _should_refine_mask(self, frame_number: int) -> bool: |
|
|
"""Determine if mask should be refined for this frame""" |
|
|
|
|
|
return ( |
|
|
frame_number % self.quality_settings['keyframe_interval'] == 0 or |
|
|
self.last_refined_mask is None or |
|
|
not self.quality_settings.get('temporal_consistency', True) |
|
|
) |
|
|
|
|
|
def _refine_mask(self, frame: np.ndarray, mask: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""Refine mask using MatAnyone or fallback methods""" |
|
|
try: |
|
|
if self.matanyone_model is not None and self.quality_settings.get('edge_refinement', True): |
|
|
refined_mask = refine_mask_hq(frame, mask, self.matanyone_model) |
|
|
else: |
|
|
|
|
|
refined_mask = self._fallback_mask_refinement(mask) |
|
|
|
|
|
return refined_mask |
|
|
|
|
|
except Exception as e: |
|
|
self.stats['refinement_errors'] += 1 |
|
|
logger.warning(f"Mask refinement failed for frame {frame_number}: {e}") |
|
|
|
|
|
return mask |
|
|
|
|
|
def _fallback_mask_refinement(self, mask: np.ndarray) -> np.ndarray: |
|
|
"""Fallback mask refinement using basic OpenCV operations""" |
|
|
try: |
|
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) |
|
|
refined = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) |
|
|
refined = cv2.morphologyEx(refined, cv2.MORPH_OPEN, kernel) |
|
|
|
|
|
|
|
|
refined = cv2.GaussianBlur(refined, (3, 3), 1.0) |
|
|
|
|
|
return refined |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Fallback mask refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _apply_temporal_consistency(self, current_mask: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""Apply temporal consistency using previous refined mask""" |
|
|
if self.last_refined_mask is None or not self.quality_settings.get('temporal_consistency', True): |
|
|
return current_mask |
|
|
|
|
|
try: |
|
|
|
|
|
alpha = 0.7 |
|
|
beta = 0.3 |
|
|
|
|
|
|
|
|
if current_mask.shape != self.last_refined_mask.shape: |
|
|
last_mask = cv2.resize(self.last_refined_mask, |
|
|
(current_mask.shape[1], current_mask.shape[0])) |
|
|
else: |
|
|
last_mask = self.last_refined_mask |
|
|
|
|
|
|
|
|
blended_mask = cv2.addWeighted(current_mask, alpha, last_mask, beta, 0) |
|
|
|
|
|
|
|
|
blended_mask = cv2.GaussianBlur(blended_mask, (3, 3), 0.5) |
|
|
|
|
|
return blended_mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Temporal consistency application failed: {e}") |
|
|
return current_mask |
|
|
|
|
|
def _create_mask_preview(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray: |
|
|
"""Create mask visualization preview""" |
|
|
try: |
|
|
|
|
|
mask_colored = np.zeros_like(frame) |
|
|
mask_colored[:, :, 1] = mask |
|
|
|
|
|
|
|
|
alpha = 0.6 |
|
|
preview = cv2.addWeighted(frame, 1-alpha, mask_colored, alpha, 0) |
|
|
|
|
|
return preview |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Mask preview creation failed: {e}") |
|
|
return frame |
|
|
|
|
|
def _create_greenscreen_preview(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray: |
|
|
"""Create green screen preview""" |
|
|
try: |
|
|
|
|
|
green_bg = np.zeros_like(frame) |
|
|
green_bg[:, :] = [0, 255, 0] |
|
|
|
|
|
|
|
|
mask_3ch = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) if len(mask.shape) == 2 else mask |
|
|
mask_norm = mask_3ch.astype(np.float32) / 255.0 |
|
|
|
|
|
result = (frame * mask_norm + green_bg * (1 - mask_norm)).astype(np.uint8) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Greenscreen preview creation failed: {e}") |
|
|
return frame |
|
|
|
|
|
def _replace_background(self, frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray: |
|
|
"""Replace background using the refined mask""" |
|
|
try: |
|
|
result = replace_background_hq(frame, mask, background) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Background replacement failed: {e}") |
|
|
return frame |
|
|
|
|
|
def prepare_background( |
|
|
self, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
width: int, |
|
|
height: int |
|
|
) -> Optional[np.ndarray]: |
|
|
""" |
|
|
Prepare background image for processing |
|
|
|
|
|
Args: |
|
|
background_choice: Background type or name |
|
|
custom_background_path: Path to custom background |
|
|
width: Target width |
|
|
height: Target height |
|
|
|
|
|
Returns: |
|
|
Prepared background image or None if failed |
|
|
""" |
|
|
try: |
|
|
if background_choice == "custom" and custom_background_path: |
|
|
if not os.path.exists(custom_background_path): |
|
|
raise exceptions.BackgroundProcessingError("custom", f"File not found: {custom_background_path}") |
|
|
|
|
|
background = cv2.imread(custom_background_path) |
|
|
if background is None: |
|
|
raise exceptions.BackgroundProcessingError("custom", "Could not read custom background image") |
|
|
|
|
|
logger.info(f"Loaded custom background: {custom_background_path}") |
|
|
|
|
|
else: |
|
|
|
|
|
if background_choice not in PROFESSIONAL_BACKGROUNDS: |
|
|
raise exceptions.BackgroundProcessingError(background_choice, "Unknown professional background") |
|
|
|
|
|
bg_config = PROFESSIONAL_BACKGROUNDS[background_choice] |
|
|
background = create_professional_background(bg_config, width, height) |
|
|
|
|
|
logger.info(f"Generated professional background: {background_choice}") |
|
|
|
|
|
|
|
|
if background.shape[:2] != (height, width): |
|
|
background = cv2.resize(background, (width, height), interpolation=cv2.INTER_LANCZOS4) |
|
|
|
|
|
|
|
|
if background is None or background.size == 0: |
|
|
raise exceptions.BackgroundProcessingError(background_choice, "Background image is empty") |
|
|
|
|
|
return background |
|
|
|
|
|
except Exception as e: |
|
|
if isinstance(e, exceptions.BackgroundProcessingError): |
|
|
logger.error(str(e)) |
|
|
return None |
|
|
else: |
|
|
logger.error(f"Unexpected error preparing background: {e}") |
|
|
return None |
|
|
|
|
|
def _update_processing_stats(self, video_info: Dict[str, Any], |
|
|
processing_time: float, result: Dict[str, Any]): |
|
|
"""Update processing statistics""" |
|
|
self.stats['videos_processed'] += 1 |
|
|
self.stats['total_frames_processed'] += result['successful_frames'] |
|
|
self.stats['total_processing_time'] += processing_time |
|
|
self.stats['successful_frames'] += result['successful_frames'] |
|
|
self.stats['failed_frames'] += result['failed_frames'] |
|
|
|
|
|
|
|
|
if self.stats['total_processing_time'] > 0: |
|
|
self.stats['average_fps'] = self.stats['total_frames_processed'] / self.stats['total_processing_time'] |
|
|
|
|
|
def get_processing_capabilities(self) -> Dict[str, Any]: |
|
|
"""Get current processing capabilities""" |
|
|
return { |
|
|
'sam2_available': self.sam2_predictor is not None, |
|
|
'matanyone_available': self.matanyone_model is not None, |
|
|
'quality_preset': self.config.quality_preset, |
|
|
'supports_temporal_consistency': self.quality_settings.get('temporal_consistency', False), |
|
|
'supports_edge_refinement': self.quality_settings.get('edge_refinement', False), |
|
|
'keyframe_interval': self.quality_settings['keyframe_interval'], |
|
|
'max_resolution': self.config.get_resolution_limits(), |
|
|
'supported_formats': ['.mp4', '.avi', '.mov', '.mkv'], |
|
|
'memory_limit_gb': self.memory_manager.memory_limit_gb |
|
|
} |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get current processor status""" |
|
|
return { |
|
|
'processing_active': self.processing_active, |
|
|
'models_available': { |
|
|
'sam2': self.sam2_predictor is not None, |
|
|
'matanyone': self.matanyone_model is not None |
|
|
}, |
|
|
'quality_settings': self.quality_settings, |
|
|
'statistics': self.stats.copy(), |
|
|
'cache_size': len(self.frame_cache), |
|
|
'memory_usage': self.memory_manager.get_memory_usage(), |
|
|
'capabilities': self.get_processing_capabilities() |
|
|
} |
|
|
|
|
|
def optimize_for_video(self, video_info: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Optimize settings for specific video characteristics""" |
|
|
optimizations = { |
|
|
'original_settings': self.quality_settings.copy(), |
|
|
'optimizations_applied': [] |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
if video_info['width'] * video_info['height'] > 1920 * 1080: |
|
|
if self.quality_settings['keyframe_interval'] < 10: |
|
|
self.quality_settings['keyframe_interval'] = 10 |
|
|
optimizations['optimizations_applied'].append('increased_keyframe_interval_for_high_res') |
|
|
|
|
|
|
|
|
if video_info['duration'] > 300: |
|
|
if self.config.memory_cleanup_interval > 20: |
|
|
self.config.memory_cleanup_interval = 20 |
|
|
optimizations['optimizations_applied'].append('increased_memory_cleanup_frequency') |
|
|
|
|
|
|
|
|
if video_info['fps'] < 15: |
|
|
self.quality_settings['temporal_consistency'] = False |
|
|
optimizations['optimizations_applied'].append('disabled_temporal_consistency_for_low_fps') |
|
|
|
|
|
|
|
|
memory_usage = self.memory_manager.get_memory_usage() |
|
|
memory_pressure = self.memory_manager.check_memory_pressure() |
|
|
|
|
|
if memory_pressure['under_pressure']: |
|
|
self.quality_settings['edge_refinement'] = False |
|
|
self.quality_settings['keyframe_interval'] = max(self.quality_settings['keyframe_interval'], 15) |
|
|
optimizations['optimizations_applied'].extend([ |
|
|
'disabled_edge_refinement_for_memory', |
|
|
'increased_keyframe_interval_for_memory' |
|
|
]) |
|
|
|
|
|
optimizations['final_settings'] = self.quality_settings.copy() |
|
|
|
|
|
if optimizations['optimizations_applied']: |
|
|
logger.info(f"Applied video optimizations: {optimizations['optimizations_applied']}") |
|
|
|
|
|
return optimizations |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Video optimization failed: {e}") |
|
|
return optimizations |
|
|
|
|
|
def reset_cache(self): |
|
|
"""Reset frame cache and temporal state""" |
|
|
self.frame_cache.clear() |
|
|
self.last_refined_mask = None |
|
|
self.stats['cache_hits'] = 0 |
|
|
logger.debug("Frame cache and temporal state reset") |
|
|
|
|
|
def cleanup(self): |
|
|
"""Clean up processor resources""" |
|
|
try: |
|
|
self.reset_cache() |
|
|
self.processing_active = False |
|
|
logger.info("CoreVideoProcessor cleanup completed") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error during cleanup: {e}") |