|
|
""" |
|
|
Core Video Processing Module - Enhanced with Temporal Consistency |
|
|
VERSION: 2.0-temporal-enhanced |
|
|
ROLLBACK: Set USE_TEMPORAL_ENHANCEMENT = False to revert to original behavior |
|
|
""" |
|
|
|
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import time |
|
|
import logging |
|
|
import threading |
|
|
from typing import Optional, Tuple, Dict, Any, Callable, List |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
import app_config |
|
|
import memory_manager |
|
|
import progress_tracker |
|
|
import exceptions |
|
|
|
|
|
|
|
|
from utilities import ( |
|
|
segment_person_hq, |
|
|
refine_mask_hq, |
|
|
replace_background_hq, |
|
|
create_professional_background, |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
validate_video_file |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
USE_TEMPORAL_ENHANCEMENT = True |
|
|
USE_HAIR_DETECTION = True |
|
|
USE_OPTICAL_FLOW_TRACKING = True |
|
|
USE_ADAPTIVE_REFINEMENT = True |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class CoreVideoProcessor: |
|
|
""" |
|
|
ENHANCED: Core video processing pipeline with temporal consistency and fine-detail handling |
|
|
""" |
|
|
|
|
|
def __init__(self, sam2_predictor: Any, matanyone_model: Any, |
|
|
config: app_config.ProcessingConfig, memory_mgr: memory_manager.MemoryManager): |
|
|
self.sam2_predictor = sam2_predictor |
|
|
self.matanyone_model = matanyone_model |
|
|
self.config = config |
|
|
self.memory_manager = memory_mgr |
|
|
|
|
|
|
|
|
self.processing_active = False |
|
|
self.last_refined_mask = None |
|
|
self.frame_cache = {} |
|
|
|
|
|
|
|
|
self.mask_history = [] |
|
|
self.optical_flow_data = None |
|
|
self.hair_regions_cache = {} |
|
|
self.quality_scores_history = [] |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'videos_processed': 0, |
|
|
'total_frames_processed': 0, |
|
|
'total_processing_time': 0.0, |
|
|
'average_fps': 0.0, |
|
|
'failed_frames': 0, |
|
|
'successful_frames': 0, |
|
|
'cache_hits': 0, |
|
|
'segmentation_errors': 0, |
|
|
'refinement_errors': 0, |
|
|
'temporal_corrections': 0, |
|
|
'hair_detections': 0, |
|
|
'flow_tracking_failures': 0 |
|
|
} |
|
|
|
|
|
|
|
|
self.quality_settings = config.get_quality_settings() |
|
|
|
|
|
logger.info("CoreVideoProcessor initialized") |
|
|
logger.info(f"Quality preset: {config.quality_preset}") |
|
|
logger.info(f"Quality settings: {self.quality_settings}") |
|
|
|
|
|
if USE_TEMPORAL_ENHANCEMENT: |
|
|
logger.info("ENHANCED: Temporal consistency enabled") |
|
|
if USE_HAIR_DETECTION: |
|
|
logger.info("ENHANCED: Hair detection enabled") |
|
|
|
|
|
def process_video( |
|
|
self, |
|
|
video_path: str, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str] = None, |
|
|
progress_callback: Optional[Callable] = None, |
|
|
cancel_event: Optional[threading.Event] = None, |
|
|
preview_mask: bool = False, |
|
|
preview_greenscreen: bool = False |
|
|
) -> Tuple[Optional[str], str]: |
|
|
""" |
|
|
ENHANCED: Process video with temporal consistency and fine-detail handling |
|
|
""" |
|
|
if self.processing_active: |
|
|
return None, "Processing already in progress" |
|
|
|
|
|
self.processing_active = True |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
self._reset_temporal_state() |
|
|
|
|
|
try: |
|
|
|
|
|
is_valid, validation_msg = validate_video_file(video_path) |
|
|
if not is_valid: |
|
|
return None, f"Invalid video file: {validation_msg}" |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return None, "Could not open video file" |
|
|
|
|
|
|
|
|
video_info = self._get_video_info(cap) |
|
|
logger.info(f"Processing video: {video_info}") |
|
|
|
|
|
|
|
|
memory_check = self.memory_manager.can_process_video( |
|
|
video_info['width'], video_info['height'] |
|
|
) |
|
|
|
|
|
if not memory_check['can_process']: |
|
|
cap.release() |
|
|
return None, f"Insufficient memory: {memory_check['recommendations']}" |
|
|
|
|
|
|
|
|
background = self.prepare_background( |
|
|
background_choice, custom_background_path, |
|
|
video_info['width'], video_info['height'] |
|
|
) |
|
|
|
|
|
if background is None: |
|
|
cap.release() |
|
|
return None, "Failed to prepare background" |
|
|
|
|
|
|
|
|
output_path = self._setup_output_video(video_info, preview_mask, preview_greenscreen) |
|
|
out = self._create_video_writer(output_path, video_info) |
|
|
|
|
|
if out is None: |
|
|
cap.release() |
|
|
return None, "Could not create output video writer" |
|
|
|
|
|
|
|
|
result = self._process_video_frames_enhanced( |
|
|
cap, out, background, video_info, |
|
|
progress_callback, cancel_event, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
if result['success']: |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
self._update_processing_stats(video_info, processing_time, result) |
|
|
|
|
|
success_msg = ( |
|
|
f"Processing completed successfully!\n" |
|
|
f"Processed: {result['successful_frames']}/{result['total_frames']} frames\n" |
|
|
f"Time: {processing_time:.1f}s\n" |
|
|
f"Average FPS: {result['total_frames'] / processing_time:.1f}\n" |
|
|
f"Temporal corrections: {self.stats['temporal_corrections']}\n" |
|
|
f"Hair detections: {self.stats['hair_detections']}\n" |
|
|
f"Background: {background_choice}" |
|
|
) |
|
|
|
|
|
return output_path, success_msg |
|
|
else: |
|
|
|
|
|
try: |
|
|
os.remove(output_path) |
|
|
except: |
|
|
pass |
|
|
return None, result['error_message'] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video processing failed: {e}") |
|
|
return None, f"Processing failed: {str(e)}" |
|
|
|
|
|
finally: |
|
|
self.processing_active = False |
|
|
|
|
|
def _reset_temporal_state(self): |
|
|
"""ENHANCED: Reset temporal consistency state""" |
|
|
self.mask_history.clear() |
|
|
self.optical_flow_data = None |
|
|
self.hair_regions_cache.clear() |
|
|
self.quality_scores_history.clear() |
|
|
self.last_refined_mask = None |
|
|
self.stats['temporal_corrections'] = 0 |
|
|
self.stats['hair_detections'] = 0 |
|
|
self.stats['flow_tracking_failures'] = 0 |
|
|
|
|
|
def _get_video_info(self, cap: cv2.VideoCapture) -> Dict[str, Any]: |
|
|
"""Extract comprehensive video information""" |
|
|
return { |
|
|
'fps': cap.get(cv2.CAP_PROP_FPS), |
|
|
'total_frames': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
|
|
'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
|
|
'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
|
|
'duration': cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS), |
|
|
'codec': int(cap.get(cv2.CAP_PROP_FOURCC)) |
|
|
} |
|
|
|
|
|
def _setup_output_video(self, video_info: Dict[str, Any], |
|
|
preview_mask: bool, preview_greenscreen: bool) -> str: |
|
|
"""Setup output video path""" |
|
|
timestamp = int(time.time()) |
|
|
|
|
|
if preview_mask: |
|
|
filename = f"mask_preview_{timestamp}.mp4" |
|
|
elif preview_greenscreen: |
|
|
filename = f"greenscreen_preview_{timestamp}.mp4" |
|
|
else: |
|
|
filename = f"processed_video_{timestamp}.mp4" |
|
|
|
|
|
return os.path.join(self.config.temp_dir, filename) |
|
|
|
|
|
def _create_video_writer(self, output_path: str, |
|
|
video_info: Dict[str, Any]) -> Optional[cv2.VideoWriter]: |
|
|
"""Create video writer with optimal settings""" |
|
|
try: |
|
|
|
|
|
if self.config.output_quality == 'high': |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
else: |
|
|
fourcc = cv2.VideoWriter_fourcc(*'XVID') |
|
|
|
|
|
writer = cv2.VideoWriter( |
|
|
output_path, |
|
|
fourcc, |
|
|
video_info['fps'], |
|
|
(video_info['width'], video_info['height']) |
|
|
) |
|
|
|
|
|
if not writer.isOpened(): |
|
|
logger.error("Failed to open video writer") |
|
|
return None |
|
|
|
|
|
return writer |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating video writer: {e}") |
|
|
return None |
|
|
|
|
|
def _process_video_frames_enhanced( |
|
|
self, |
|
|
cap: cv2.VideoCapture, |
|
|
out: cv2.VideoWriter, |
|
|
background: np.ndarray, |
|
|
video_info: Dict[str, Any], |
|
|
progress_callback: Optional[Callable], |
|
|
cancel_event: Optional[threading.Event], |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> Dict[str, Any]: |
|
|
"""ENHANCED: Process all video frames with temporal consistency""" |
|
|
|
|
|
|
|
|
prog_tracker = progress_tracker.ProgressTracker( |
|
|
total_frames=video_info['total_frames'], |
|
|
callback=progress_callback, |
|
|
track_performance=True |
|
|
) |
|
|
|
|
|
frame_count = 0 |
|
|
successful_frames = 0 |
|
|
failed_frames = 0 |
|
|
|
|
|
|
|
|
self._reset_temporal_state() |
|
|
|
|
|
try: |
|
|
prog_tracker.set_stage("Processing frames with temporal enhancement") |
|
|
|
|
|
while True: |
|
|
|
|
|
if cancel_event and cancel_event.is_set(): |
|
|
return { |
|
|
'success': False, |
|
|
'error_message': 'Processing cancelled by user', |
|
|
'total_frames': frame_count, |
|
|
'successful_frames': successful_frames, |
|
|
'failed_frames': failed_frames |
|
|
} |
|
|
|
|
|
|
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
try: |
|
|
|
|
|
prog_tracker.update(frame_count, "Processing frame with temporal consistency") |
|
|
|
|
|
|
|
|
if USE_TEMPORAL_ENHANCEMENT: |
|
|
processed_frame = self._process_single_frame_enhanced( |
|
|
frame, background, frame_count, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
else: |
|
|
processed_frame = self._process_single_frame_original( |
|
|
frame, background, frame_count, |
|
|
preview_mask, preview_greenscreen |
|
|
) |
|
|
|
|
|
|
|
|
out.write(processed_frame) |
|
|
successful_frames += 1 |
|
|
|
|
|
|
|
|
if frame_count % self.config.memory_cleanup_interval == 0: |
|
|
self.memory_manager.auto_cleanup_if_needed() |
|
|
|
|
|
except Exception as frame_error: |
|
|
logger.warning(f"Frame {frame_count} processing failed: {frame_error}") |
|
|
|
|
|
|
|
|
out.write(frame) |
|
|
failed_frames += 1 |
|
|
self.stats['failed_frames'] += 1 |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
if self.config.frame_skip > 1: |
|
|
for _ in range(self.config.frame_skip - 1): |
|
|
ret, _ = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
final_stats = prog_tracker.finalize() |
|
|
|
|
|
return { |
|
|
'success': successful_frames > 0, |
|
|
'error_message': f'No frames processed successfully' if successful_frames == 0 else '', |
|
|
'total_frames': frame_count, |
|
|
'successful_frames': successful_frames, |
|
|
'failed_frames': failed_frames, |
|
|
'processing_stats': final_stats |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Frame processing loop failed: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error_message': f'Frame processing failed: {str(e)}', |
|
|
'total_frames': frame_count, |
|
|
'successful_frames': successful_frames, |
|
|
'failed_frames': failed_frames |
|
|
} |
|
|
|
|
|
def _process_single_frame_enhanced( |
|
|
self, |
|
|
frame: np.ndarray, |
|
|
background: np.ndarray, |
|
|
frame_number: int, |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> np.ndarray: |
|
|
"""ENHANCED: Process a single video frame with temporal consistency""" |
|
|
|
|
|
try: |
|
|
|
|
|
mask = self._segment_person_enhanced(frame, frame_number) |
|
|
|
|
|
|
|
|
if USE_HAIR_DETECTION: |
|
|
hair_regions = self._detect_hair_regions(frame, mask, frame_number) |
|
|
else: |
|
|
hair_regions = None |
|
|
|
|
|
|
|
|
if USE_TEMPORAL_ENHANCEMENT and len(self.mask_history) > 0: |
|
|
mask = self._apply_temporal_consistency_enhanced(frame, mask, frame_number) |
|
|
|
|
|
|
|
|
if USE_ADAPTIVE_REFINEMENT: |
|
|
refined_mask = self._adaptive_mask_refinement(frame, mask, frame_number, hair_regions) |
|
|
else: |
|
|
refined_mask = self._refine_mask_original(frame, mask, frame_number) |
|
|
|
|
|
|
|
|
self._update_mask_history(refined_mask) |
|
|
|
|
|
|
|
|
if preview_mask: |
|
|
return self._create_mask_preview_enhanced(frame, refined_mask, hair_regions) |
|
|
elif preview_greenscreen: |
|
|
return self._create_greenscreen_preview(frame, refined_mask) |
|
|
else: |
|
|
return self._replace_background_enhanced(frame, refined_mask, background, hair_regions) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Enhanced single frame processing failed: {e}") |
|
|
|
|
|
return self._process_single_frame_original(frame, background, frame_number, preview_mask, preview_greenscreen) |
|
|
|
|
|
def _detect_hair_regions(self, frame: np.ndarray, mask: np.ndarray, frame_number: int) -> Optional[np.ndarray]: |
|
|
"""ENHANCED: Detect hair and fine detail regions automatically""" |
|
|
try: |
|
|
|
|
|
if frame_number in self.hair_regions_cache: |
|
|
self.stats['cache_hits'] += 1 |
|
|
return self.hair_regions_cache[frame_number] |
|
|
|
|
|
|
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) |
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
|
|
|
laplacian = cv2.Laplacian(gray, cv2.CV_64F) |
|
|
texture_strength = np.abs(laplacian) |
|
|
|
|
|
|
|
|
|
|
|
hair_hue_mask = ((hsv[:,:,0] >= 0) & (hsv[:,:,0] <= 30)) | \ |
|
|
((hsv[:,:,0] >= 150) & (hsv[:,:,0] <= 180)) |
|
|
hair_value_mask = hsv[:,:,2] < 100 |
|
|
|
|
|
|
|
|
hair_probability = np.zeros_like(gray, dtype=np.float32) |
|
|
|
|
|
|
|
|
texture_norm = (texture_strength - texture_strength.min()) / (texture_strength.max() - texture_strength.min() + 1e-8) |
|
|
hair_probability += texture_norm * 0.6 |
|
|
|
|
|
|
|
|
color_prob = (hair_hue_mask.astype(np.float32) * hair_value_mask.astype(np.float32)) |
|
|
hair_probability += color_prob * 0.4 |
|
|
|
|
|
|
|
|
mask_boundary = self._get_mask_boundary_region(mask, boundary_width=20) |
|
|
hair_probability *= mask_boundary |
|
|
|
|
|
|
|
|
hair_threshold = np.percentile(hair_probability[hair_probability > 0], 75) |
|
|
hair_regions = (hair_probability > hair_threshold).astype(np.uint8) |
|
|
|
|
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) |
|
|
hair_regions = cv2.morphologyEx(hair_regions, cv2.MORPH_CLOSE, kernel) |
|
|
|
|
|
|
|
|
self.hair_regions_cache[frame_number] = hair_regions |
|
|
|
|
|
|
|
|
if np.any(hair_regions): |
|
|
self.stats['hair_detections'] += 1 |
|
|
logger.debug(f"Hair regions detected in frame {frame_number}") |
|
|
|
|
|
return hair_regions |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Hair detection failed for frame {frame_number}: {e}") |
|
|
return None |
|
|
|
|
|
def _get_mask_boundary_region(self, mask: np.ndarray, boundary_width: int = 20) -> np.ndarray: |
|
|
"""Get region around mask boundary where hair/fine details are likely""" |
|
|
try: |
|
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (boundary_width, boundary_width)) |
|
|
dilated = cv2.dilate(mask, kernel, iterations=1) |
|
|
eroded = cv2.erode(mask, kernel, iterations=1) |
|
|
|
|
|
|
|
|
boundary_region = ((dilated > 0) & (eroded == 0)).astype(np.float32) |
|
|
|
|
|
return boundary_region |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Boundary region detection failed: {e}") |
|
|
return np.ones_like(mask, dtype=np.float32) |
|
|
|
|
|
def _apply_temporal_consistency_enhanced(self, frame: np.ndarray, current_mask: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""ENHANCED: Apply temporal consistency using optical flow and history""" |
|
|
try: |
|
|
if len(self.mask_history) == 0: |
|
|
return current_mask |
|
|
|
|
|
previous_mask = self.mask_history[-1] |
|
|
|
|
|
|
|
|
if USE_OPTICAL_FLOW_TRACKING and self.optical_flow_data is not None: |
|
|
try: |
|
|
flow_corrected_mask = self._apply_optical_flow_consistency( |
|
|
frame, current_mask, previous_mask |
|
|
) |
|
|
|
|
|
|
|
|
alpha = 0.7 |
|
|
beta = 0.3 |
|
|
|
|
|
blended_mask = cv2.addWeighted( |
|
|
current_mask.astype(np.float32), alpha, |
|
|
flow_corrected_mask.astype(np.float32), beta, 0 |
|
|
).astype(np.uint8) |
|
|
|
|
|
self.stats['temporal_corrections'] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Optical flow consistency failed: {e}") |
|
|
self.stats['flow_tracking_failures'] += 1 |
|
|
blended_mask = current_mask |
|
|
else: |
|
|
blended_mask = current_mask |
|
|
|
|
|
|
|
|
if len(self.mask_history) >= 3: |
|
|
|
|
|
weights = [0.5, 0.3, 0.2] |
|
|
masks_to_blend = [blended_mask] + self.mask_history[-2:] |
|
|
|
|
|
temporal_mask = np.zeros_like(blended_mask, dtype=np.float32) |
|
|
for mask, weight in zip(masks_to_blend, weights): |
|
|
temporal_mask += mask.astype(np.float32) * weight |
|
|
|
|
|
blended_mask = np.clip(temporal_mask, 0, 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
blended_mask = self._temporal_edge_filtering(frame, blended_mask, current_mask) |
|
|
|
|
|
return blended_mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Temporal consistency failed: {e}") |
|
|
return current_mask |
|
|
|
|
|
def _apply_optical_flow_consistency(self, current_frame: np.ndarray, |
|
|
current_mask: np.ndarray, previous_mask: np.ndarray) -> np.ndarray: |
|
|
"""Apply optical flow to warp previous mask to current frame""" |
|
|
try: |
|
|
|
|
|
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY) |
|
|
previous_gray = self.optical_flow_data |
|
|
|
|
|
|
|
|
flow = cv2.calcOpticalFlowPyrLK(previous_gray, current_gray, None, None) |
|
|
|
|
|
|
|
|
h, w = previous_mask.shape |
|
|
flow_map = np.zeros((h, w, 2), dtype=np.float32) |
|
|
|
|
|
|
|
|
y_coords, x_coords = np.mgrid[0:h, 0:w] |
|
|
flow_map[:, :, 0] = x_coords + flow[0] if flow[0] is not None else x_coords |
|
|
flow_map[:, :, 1] = y_coords + flow[1] if flow[1] is not None else y_coords |
|
|
|
|
|
|
|
|
warped_mask = cv2.remap(previous_mask, flow_map, None, cv2.INTER_LINEAR) |
|
|
|
|
|
return warped_mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Optical flow warping failed: {e}") |
|
|
return previous_mask |
|
|
|
|
|
def _temporal_edge_filtering(self, frame: np.ndarray, blended_mask: np.ndarray, current_mask: np.ndarray) -> np.ndarray: |
|
|
"""Apply edge-aware temporal filtering""" |
|
|
try: |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
|
|
|
|
|
|
|
|
|
edge_weight = cv2.GaussianBlur(edges.astype(np.float32), (5, 5), 1.0) / 255.0 |
|
|
|
|
|
filtered_mask = (current_mask.astype(np.float32) * edge_weight + |
|
|
blended_mask.astype(np.float32) * (1 - edge_weight)) |
|
|
|
|
|
return np.clip(filtered_mask, 0, 255).astype(np.uint8) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Temporal edge filtering failed: {e}") |
|
|
return blended_mask |
|
|
|
|
|
def _adaptive_mask_refinement(self, frame: np.ndarray, mask: np.ndarray, |
|
|
frame_number: int, hair_regions: Optional[np.ndarray]) -> np.ndarray: |
|
|
"""ENHANCED: Adaptive mask refinement based on content analysis""" |
|
|
try: |
|
|
|
|
|
refinement_needed = self._assess_refinement_needs(frame, mask, hair_regions) |
|
|
|
|
|
if refinement_needed['hair_refinement'] and hair_regions is not None: |
|
|
|
|
|
mask = self._refine_hair_regions(frame, mask, hair_regions) |
|
|
|
|
|
if refinement_needed['edge_refinement']: |
|
|
|
|
|
mask = self._enhanced_edge_refinement(frame, mask) |
|
|
|
|
|
if refinement_needed['temporal_refinement']: |
|
|
|
|
|
mask = self._temporal_aware_refinement(frame, mask, frame_number) |
|
|
|
|
|
|
|
|
if self._should_refine_mask(frame_number): |
|
|
if self.matanyone_model is not None and self.quality_settings.get('edge_refinement', True): |
|
|
mask = refine_mask_hq(frame, mask, self.matanyone_model) |
|
|
else: |
|
|
mask = self._fallback_mask_refinement_enhanced(mask) |
|
|
|
|
|
return mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Adaptive mask refinement failed: {e}") |
|
|
return self._refine_mask_original(frame, mask, frame_number) |
|
|
|
|
|
def _assess_refinement_needs(self, frame: np.ndarray, mask: np.ndarray, |
|
|
hair_regions: Optional[np.ndarray]) -> Dict[str, bool]: |
|
|
"""Assess what type of refinement is needed for this frame""" |
|
|
try: |
|
|
needs = { |
|
|
'hair_refinement': False, |
|
|
'edge_refinement': False, |
|
|
'temporal_refinement': False |
|
|
} |
|
|
|
|
|
|
|
|
if hair_regions is not None and np.any(hair_regions): |
|
|
needs['hair_refinement'] = True |
|
|
|
|
|
|
|
|
edges = cv2.Canny(mask, 50, 150) |
|
|
edge_density = np.sum(edges > 0) / (mask.shape[0] * mask.shape[1]) |
|
|
if edge_density > 0.1: |
|
|
needs['edge_refinement'] = True |
|
|
|
|
|
|
|
|
if len(self.mask_history) > 0: |
|
|
prev_mask = self.mask_history[-1] |
|
|
diff = cv2.absdiff(mask, prev_mask) |
|
|
change_ratio = np.sum(diff > 50) / (mask.shape[0] * mask.shape[1]) |
|
|
if change_ratio > 0.15: |
|
|
needs['temporal_refinement'] = True |
|
|
|
|
|
return needs |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Refinement assessment failed: {e}") |
|
|
return {'hair_refinement': False, 'edge_refinement': True, 'temporal_refinement': False} |
|
|
|
|
|
def _refine_hair_regions(self, frame: np.ndarray, mask: np.ndarray, hair_regions: np.ndarray) -> np.ndarray: |
|
|
"""Special refinement for hair and fine detail regions""" |
|
|
try: |
|
|
|
|
|
hair_mask = hair_regions > 0 |
|
|
|
|
|
|
|
|
refined_mask = mask.copy() |
|
|
|
|
|
|
|
|
hair_area_values = mask[hair_mask] |
|
|
if len(hair_area_values) > 0: |
|
|
hair_threshold = max(100, np.percentile(hair_area_values, 25)) |
|
|
refined_mask[hair_mask] = np.where(mask[hair_mask] > hair_threshold, 255, 0) |
|
|
|
|
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) |
|
|
refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_CLOSE, kernel) |
|
|
|
|
|
return refined_mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Hair region refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _enhanced_edge_refinement(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray: |
|
|
"""Enhanced edge refinement using image gradients""" |
|
|
try: |
|
|
|
|
|
refined = cv2.bilateralFilter(mask, 9, 75, 75) |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
|
|
|
|
|
|
edge_weight = cv2.GaussianBlur(edges.astype(np.float32), (3, 3), 1.0) / 255.0 |
|
|
edge_weight = np.clip(edge_weight * 2, 0, 1) |
|
|
|
|
|
final_mask = (mask.astype(np.float32) * edge_weight + |
|
|
refined.astype(np.float32) * (1 - edge_weight)) |
|
|
|
|
|
return np.clip(final_mask, 0, 255).astype(np.uint8) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Enhanced edge refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _temporal_aware_refinement(self, frame: np.ndarray, mask: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""Temporal-aware refinement considering motion and stability""" |
|
|
try: |
|
|
if len(self.mask_history) == 0: |
|
|
return mask |
|
|
|
|
|
|
|
|
if self.optical_flow_data is not None: |
|
|
current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
motion_magnitude = cv2.absdiff(current_gray, self.optical_flow_data) |
|
|
motion_mask = motion_magnitude > 10 |
|
|
|
|
|
|
|
|
|
|
|
prev_mask = self.mask_history[-1] |
|
|
|
|
|
motion_weight = cv2.GaussianBlur(motion_mask.astype(np.float32), (5, 5), 1.0) |
|
|
motion_weight = np.clip(motion_weight, 0.3, 1.0) |
|
|
|
|
|
temporal_mask = (mask.astype(np.float32) * motion_weight + |
|
|
prev_mask.astype(np.float32) * (1 - motion_weight)) |
|
|
|
|
|
return np.clip(temporal_mask, 0, 255).astype(np.uint8) |
|
|
|
|
|
return mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Temporal-aware refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _update_mask_history(self, mask: np.ndarray): |
|
|
"""Update mask history for temporal consistency""" |
|
|
self.mask_history.append(mask.copy()) |
|
|
|
|
|
|
|
|
max_history = 5 |
|
|
if len(self.mask_history) > max_history: |
|
|
self.mask_history.pop(0) |
|
|
|
|
|
def _create_mask_preview_enhanced(self, frame: np.ndarray, mask: np.ndarray, |
|
|
hair_regions: Optional[np.ndarray]) -> np.ndarray: |
|
|
"""ENHANCED: Create mask visualization with hair regions highlighted""" |
|
|
try: |
|
|
|
|
|
mask_colored = np.zeros_like(frame) |
|
|
mask_colored[:, :, 1] = mask |
|
|
|
|
|
|
|
|
if hair_regions is not None: |
|
|
mask_colored[:, :, 2] = np.maximum(mask_colored[:, :, 2], hair_regions * 255) |
|
|
|
|
|
|
|
|
alpha = 0.6 |
|
|
preview = cv2.addWeighted(frame, 1-alpha, mask_colored, alpha, 0) |
|
|
|
|
|
return preview |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Enhanced mask preview creation failed: {e}") |
|
|
return self._create_mask_preview_original(frame, mask) |
|
|
|
|
|
def _replace_background_enhanced(self, frame: np.ndarray, mask: np.ndarray, |
|
|
background: np.ndarray, hair_regions: Optional[np.ndarray]) -> np.ndarray: |
|
|
"""ENHANCED: Replace background with special handling for hair regions""" |
|
|
try: |
|
|
|
|
|
result = replace_background_hq(frame, mask, background) |
|
|
|
|
|
|
|
|
if hair_regions is not None and np.any(hair_regions): |
|
|
result = self._enhance_hair_compositing(frame, mask, background, hair_regions, result) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Enhanced background replacement failed: {e}") |
|
|
return replace_background_hq(frame, mask, background) |
|
|
|
|
|
def _enhance_hair_compositing(self, frame: np.ndarray, mask: np.ndarray, |
|
|
background: np.ndarray, hair_regions: np.ndarray, |
|
|
initial_result: np.ndarray) -> np.ndarray: |
|
|
"""Enhanced compositing specifically for hair regions""" |
|
|
try: |
|
|
|
|
|
hair_mask = hair_regions > 0 |
|
|
|
|
|
if np.any(hair_mask): |
|
|
|
|
|
hair_alpha = cv2.GaussianBlur((hair_regions * mask / 255.0).astype(np.float32), (3, 3), 1.0) |
|
|
hair_alpha = np.clip(hair_alpha, 0, 1) |
|
|
|
|
|
|
|
|
for c in range(3): |
|
|
channel_blend = (frame[:, :, c].astype(np.float32) * hair_alpha + |
|
|
background[:, :, c].astype(np.float32) * (1 - hair_alpha)) |
|
|
|
|
|
initial_result[:, :, c] = np.where( |
|
|
hair_mask, |
|
|
np.clip(channel_blend, 0, 255).astype(np.uint8), |
|
|
initial_result[:, :, c] |
|
|
) |
|
|
|
|
|
return initial_result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Hair compositing enhancement failed: {e}") |
|
|
return initial_result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_single_frame_original( |
|
|
self, |
|
|
frame: np.ndarray, |
|
|
background: np.ndarray, |
|
|
frame_number: int, |
|
|
preview_mask: bool, |
|
|
preview_greenscreen: bool |
|
|
) -> np.ndarray: |
|
|
"""ORIGINAL: Process a single video frame (preserved for rollback)""" |
|
|
|
|
|
try: |
|
|
|
|
|
mask = self._segment_person(frame, frame_number) |
|
|
|
|
|
|
|
|
if self._should_refine_mask(frame_number): |
|
|
refined_mask = self._refine_mask_original(frame, mask, frame_number) |
|
|
self.last_refined_mask = refined_mask.copy() |
|
|
else: |
|
|
|
|
|
refined_mask = self._apply_temporal_consistency_original(mask, frame_number) |
|
|
|
|
|
|
|
|
if preview_mask: |
|
|
return self._create_mask_preview_original(frame, refined_mask) |
|
|
elif preview_greenscreen: |
|
|
return self._create_greenscreen_preview(frame, refined_mask) |
|
|
else: |
|
|
return self._replace_background(frame, refined_mask, background) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Single frame processing failed: {e}") |
|
|
raise |
|
|
|
|
|
def _segment_person(self, frame: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""Perform person segmentation""" |
|
|
try: |
|
|
mask = segment_person_hq(frame, self.sam2_predictor) |
|
|
|
|
|
if mask is None or mask.size == 0: |
|
|
raise exceptions.SegmentationError(frame_number, "Segmentation returned empty mask") |
|
|
|
|
|
|
|
|
if USE_OPTICAL_FLOW_TRACKING: |
|
|
current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
self.optical_flow_data = current_gray |
|
|
|
|
|
return mask |
|
|
|
|
|
except Exception as e: |
|
|
self.stats['segmentation_errors'] += 1 |
|
|
raise exceptions.SegmentationError(frame_number, f"Segmentation failed: {str(e)}") |
|
|
|
|
|
def _segment_person_enhanced(self, frame: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""ENHANCED: Perform person segmentation with improvements""" |
|
|
try: |
|
|
mask = segment_person_hq(frame, self.sam2_predictor) |
|
|
|
|
|
if mask is None or mask.size == 0: |
|
|
raise exceptions.SegmentationError(frame_number, "Segmentation returned empty mask") |
|
|
|
|
|
|
|
|
current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
self.optical_flow_data = current_gray |
|
|
|
|
|
return mask |
|
|
|
|
|
except Exception as e: |
|
|
self.stats['segmentation_errors'] += 1 |
|
|
raise exceptions.SegmentationError(frame_number, f"Enhanced segmentation failed: {str(e)}") |
|
|
|
|
|
def _should_refine_mask(self, frame_number: int) -> bool: |
|
|
"""Determine if mask should be refined for this frame""" |
|
|
|
|
|
return ( |
|
|
frame_number % self.quality_settings['keyframe_interval'] == 0 or |
|
|
self.last_refined_mask is None or |
|
|
not self.quality_settings.get('temporal_consistency', True) |
|
|
) |
|
|
|
|
|
def _refine_mask_original(self, frame: np.ndarray, mask: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""ORIGINAL: Refine mask using MatAnyone or fallback methods""" |
|
|
try: |
|
|
if self.matanyone_model is not None and self.quality_settings.get('edge_refinement', True): |
|
|
refined_mask = refine_mask_hq(frame, mask, self.matanyone_model) |
|
|
else: |
|
|
|
|
|
refined_mask = self._fallback_mask_refinement(mask) |
|
|
|
|
|
return refined_mask |
|
|
|
|
|
except Exception as e: |
|
|
self.stats['refinement_errors'] += 1 |
|
|
logger.warning(f"Mask refinement failed for frame {frame_number}: {e}") |
|
|
|
|
|
return mask |
|
|
|
|
|
def _fallback_mask_refinement(self, mask: np.ndarray) -> np.ndarray: |
|
|
"""ORIGINAL: Fallback mask refinement using basic OpenCV operations""" |
|
|
try: |
|
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) |
|
|
refined = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) |
|
|
refined = cv2.morphologyEx(refined, cv2.MORPH_OPEN, kernel) |
|
|
|
|
|
|
|
|
refined = cv2.GaussianBlur(refined, (3, 3), 1.0) |
|
|
|
|
|
return refined |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Fallback mask refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _fallback_mask_refinement_enhanced(self, mask: np.ndarray) -> np.ndarray: |
|
|
"""ENHANCED: Improved fallback mask refinement""" |
|
|
try: |
|
|
|
|
|
kernel_small = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) |
|
|
kernel_large = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
|
|
|
|
|
|
|
|
refined = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_small) |
|
|
|
|
|
refined = cv2.morphologyEx(refined, cv2.MORPH_CLOSE, kernel_large) |
|
|
|
|
|
|
|
|
refined = cv2.bilateralFilter(refined, 9, 75, 75) |
|
|
|
|
|
return refined |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Enhanced fallback mask refinement failed: {e}") |
|
|
return mask |
|
|
|
|
|
def _apply_temporal_consistency_original(self, current_mask: np.ndarray, frame_number: int) -> np.ndarray: |
|
|
"""ORIGINAL: Apply temporal consistency using previous refined mask""" |
|
|
if self.last_refined_mask is None or not self.quality_settings.get('temporal_consistency', True): |
|
|
return current_mask |
|
|
|
|
|
try: |
|
|
|
|
|
alpha = 0.7 |
|
|
beta = 0.3 |
|
|
|
|
|
|
|
|
if current_mask.shape != self.last_refined_mask.shape: |
|
|
last_mask = cv2.resize(self.last_refined_mask, |
|
|
(current_mask.shape[1], current_mask.shape[0])) |
|
|
else: |
|
|
last_mask = self.last_refined_mask |
|
|
|
|
|
|
|
|
blended_mask = cv2.addWeighted(current_mask, alpha, last_mask, beta, 0) |
|
|
|
|
|
|
|
|
blended_mask = cv2.GaussianBlur(blended_mask, (3, 3), 0.5) |
|
|
|
|
|
return blended_mask |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Temporal consistency application failed: {e}") |
|
|
return current_mask |
|
|
|
|
|
def _create_mask_preview_original(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray: |
|
|
"""ORIGINAL: Create mask visualization preview""" |
|
|
try: |
|
|
|
|
|
mask_colored = np.zeros_like(frame) |
|
|
mask_colored[:, :, 1] = mask |
|
|
|
|
|
|
|
|
alpha = 0.6 |
|
|
preview = cv2.addWeighted(frame, 1-alpha, mask_colored, alpha, 0) |
|
|
|
|
|
return preview |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Mask preview creation failed: {e}") |
|
|
return frame |
|
|
|
|
|
def _create_greenscreen_preview(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray: |
|
|
"""Create green screen preview""" |
|
|
try: |
|
|
|
|
|
green_bg = np.zeros_like(frame) |
|
|
green_bg[:, :] = [0, 255, 0] |
|
|
|
|
|
|
|
|
mask_3ch = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) if len(mask.shape) == 2 else mask |
|
|
mask_norm = mask_3ch.astype(np.float32) / 255.0 |
|
|
|
|
|
result = (frame * mask_norm + green_bg * (1 - mask_norm)).astype(np.uint8) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Greenscreen preview creation failed: {e}") |
|
|
return frame |
|
|
|
|
|
def _replace_background(self, frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray: |
|
|
"""Replace background using the refined mask""" |
|
|
try: |
|
|
result = replace_background_hq(frame, mask, background) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Background replacement failed: {e}") |
|
|
return frame |
|
|
|
|
|
def prepare_background( |
|
|
self, |
|
|
background_choice: str, |
|
|
custom_background_path: Optional[str], |
|
|
width: int, |
|
|
height: int |
|
|
) -> Optional[np.ndarray]: |
|
|
"""Prepare background image for processing (unchanged)""" |
|
|
try: |
|
|
if background_choice == "custom" and custom_background_path: |
|
|
if not os.path.exists(custom_background_path): |
|
|
raise exceptions.BackgroundProcessingError("custom", f"File not found: {custom_background_path}") |
|
|
|
|
|
background = cv2.imread(custom_background_path) |
|
|
if background is None: |
|
|
raise exceptions.BackgroundProcessingError("custom", "Could not read custom background image") |
|
|
|
|
|
logger.info(f"Loaded custom background: {custom_background_path}") |
|
|
|
|
|
else: |
|
|
|
|
|
if background_choice not in PROFESSIONAL_BACKGROUNDS: |
|
|
raise exceptions.BackgroundProcessingError(background_choice, "Unknown professional background") |
|
|
|
|
|
bg_config = PROFESSIONAL_BACKGROUNDS[background_choice] |
|
|
background = create_professional_background(bg_config, width, height) |
|
|
|
|
|
logger.info(f"Generated professional background: {background_choice}") |
|
|
|
|
|
|
|
|
if background.shape[:2] != (height, width): |
|
|
background = cv2.resize(background, (width, height), interpolation=cv2.INTER_LANCZOS4) |
|
|
|
|
|
|
|
|
if background is None or background.size == 0: |
|
|
raise exceptions.BackgroundProcessingError(background_choice, "Background image is empty") |
|
|
|
|
|
return background |
|
|
|
|
|
except Exception as e: |
|
|
if isinstance(e, exceptions.BackgroundProcessingError): |
|
|
logger.error(str(e)) |
|
|
return None |
|
|
else: |
|
|
logger.error(f"Unexpected error preparing background: {e}") |
|
|
return None |
|
|
|
|
|
def _update_processing_stats(self, video_info: Dict[str, Any], |
|
|
processing_time: float, result: Dict[str, Any]): |
|
|
"""Update processing statistics""" |
|
|
self.stats['videos_processed'] += 1 |
|
|
self.stats['total_frames_processed'] += result['successful_frames'] |
|
|
self.stats['total_processing_time'] += processing_time |
|
|
self.stats['successful_frames'] += result['successful_frames'] |
|
|
self.stats['failed_frames'] += result['failed_frames'] |
|
|
|
|
|
|
|
|
if self.stats['total_processing_time'] > 0: |
|
|
self.stats['average_fps'] = self.stats['total_frames_processed'] / self.stats['total_processing_time'] |
|
|
|
|
|
def get_processing_capabilities(self) -> Dict[str, Any]: |
|
|
"""Get current processing capabilities""" |
|
|
capabilities = { |
|
|
'sam2_available': self.sam2_predictor is not None, |
|
|
'matanyone_available': self.matanyone_model is not None, |
|
|
'quality_preset': self.config.quality_preset, |
|
|
'supports_temporal_consistency': self.quality_settings.get('temporal_consistency', False), |
|
|
'supports_edge_refinement': self.quality_settings.get('edge_refinement', False), |
|
|
'keyframe_interval': self.quality_settings['keyframe_interval'], |
|
|
'max_resolution': self.config.get_resolution_limits(), |
|
|
'supported_formats': ['.mp4', '.avi', '.mov', '.mkv'], |
|
|
'memory_limit_gb': self.memory_manager.memory_limit_gb |
|
|
} |
|
|
|
|
|
|
|
|
if USE_TEMPORAL_ENHANCEMENT: |
|
|
capabilities.update({ |
|
|
'temporal_enhancement': True, |
|
|
'hair_detection': USE_HAIR_DETECTION, |
|
|
'optical_flow_tracking': USE_OPTICAL_FLOW_TRACKING, |
|
|
'adaptive_refinement': USE_ADAPTIVE_REFINEMENT |
|
|
}) |
|
|
|
|
|
return capabilities |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get current processor status""" |
|
|
status = { |
|
|
'processing_active': self.processing_active, |
|
|
'models_available': { |
|
|
'sam2': self.sam2_predictor is not None, |
|
|
'matanyone': self.matanyone_model is not None |
|
|
}, |
|
|
'quality_settings': self.quality_settings, |
|
|
'statistics': self.stats.copy(), |
|
|
'cache_size': len(self.frame_cache), |
|
|
'memory_usage': self.memory_manager.get_memory_usage(), |
|
|
'capabilities': self.get_processing_capabilities() |
|
|
} |
|
|
|
|
|
|
|
|
if USE_TEMPORAL_ENHANCEMENT: |
|
|
status.update({ |
|
|
'mask_history_length': len(self.mask_history), |
|
|
'hair_cache_size': len(self.hair_regions_cache), |
|
|
'optical_flow_active': self.optical_flow_data is not None |
|
|
}) |
|
|
|
|
|
return status |
|
|
|
|
|
def optimize_for_video(self, video_info: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Optimize settings for specific video characteristics""" |
|
|
optimizations = { |
|
|
'original_settings': self.quality_settings.copy(), |
|
|
'optimizations_applied': [] |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
if video_info['width'] * video_info['height'] > 1920 * 1080: |
|
|
if self.quality_settings['keyframe_interval'] < 10: |
|
|
self.quality_settings['keyframe_interval'] = 10 |
|
|
optimizations['optimizations_applied'].append('increased_keyframe_interval_for_high_res') |
|
|
|
|
|
|
|
|
if video_info['duration'] > 300: |
|
|
if self.config.memory_cleanup_interval > 20: |
|
|
self.config.memory_cleanup_interval = 20 |
|
|
optimizations['optimizations_applied'].append('increased_memory_cleanup_frequency') |
|
|
|
|
|
|
|
|
if video_info['fps'] < 15: |
|
|
self.quality_settings['temporal_consistency'] = False |
|
|
optimizations['optimizations_applied'].append('disabled_temporal_consistency_for_low_fps') |
|
|
|
|
|
|
|
|
memory_usage = self.memory_manager.get_memory_usage() |
|
|
memory_pressure = self.memory_manager.check_memory_pressure() |
|
|
|
|
|
if memory_pressure['under_pressure']: |
|
|
self.quality_settings['edge_refinement'] = False |
|
|
self.quality_settings['keyframe_interval'] = max(self.quality_settings['keyframe_interval'], 15) |
|
|
optimizations['optimizations_applied'].extend([ |
|
|
'disabled_edge_refinement_for_memory', |
|
|
'increased_keyframe_interval_for_memory' |
|
|
]) |
|
|
|
|
|
optimizations['final_settings'] = self.quality_settings.copy() |
|
|
|
|
|
if optimizations['optimizations_applied']: |
|
|
logger.info(f"Applied video optimizations: {optimizations['optimizations_applied']}") |
|
|
|
|
|
return optimizations |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Video optimization failed: {e}") |
|
|
return optimizations |
|
|
|
|
|
def reset_cache(self): |
|
|
"""Reset frame cache and temporal state""" |
|
|
self.frame_cache.clear() |
|
|
self.last_refined_mask = None |
|
|
self.stats['cache_hits'] = 0 |
|
|
self._reset_temporal_state() |
|
|
logger.debug("Frame cache and temporal state reset") |
|
|
|
|
|
def cleanup(self): |
|
|
"""Clean up processor resources""" |
|
|
try: |
|
|
self.reset_cache() |
|
|
self.processing_active = False |
|
|
logger.info("CoreVideoProcessor cleanup completed") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error during cleanup: {e}") |