MogensR commited on
Commit
b06a17f
·
1 Parent(s): d7530fc

Create video_processor.py

Browse files
Files changed (1) hide show
  1. video_processor.py +667 -0
video_processor.py ADDED
@@ -0,0 +1,667 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Core Video Processing Module
3
+ Handles the main video processing pipeline, frame processing, and background replacement
4
+ """
5
+
6
+ import os
7
+ import cv2
8
+ import numpy as np
9
+ import time
10
+ import logging
11
+ import threading
12
+ from typing import Optional, Tuple, Dict, Any, Callable
13
+ from pathlib import Path
14
+
15
+ from app_config import ProcessingConfig
16
+ from memory_manager import MemoryManager
17
+ from progress_tracker import ProgressTracker
18
+ from exceptions import VideoProcessingError, VideoFileError, BackgroundProcessingError, SegmentationError
19
+
20
+ # Import utilities
21
+ from utilities import (
22
+ segment_person_hq,
23
+ refine_mask_hq,
24
+ replace_background_hq,
25
+ create_professional_background,
26
+ PROFESSIONAL_BACKGROUNDS,
27
+ validate_video_file
28
+ )
29
+
30
+ logger = logging.getLogger(__name__)
31
+
32
+ class CoreVideoProcessor:
33
+ """
34
+ Core video processing pipeline for background replacement
35
+ """
36
+
37
+ def __init__(self, sam2_predictor: Any, matanyone_model: Any,
38
+ config: ProcessingConfig, memory_manager: MemoryManager):
39
+ self.sam2_predictor = sam2_predictor
40
+ self.matanyone_model = matanyone_model
41
+ self.config = config
42
+ self.memory_manager = memory_manager
43
+
44
+ # Processing state
45
+ self.processing_active = False
46
+ self.last_refined_mask = None
47
+ self.frame_cache = {}
48
+
49
+ # Statistics
50
+ self.stats = {
51
+ 'videos_processed': 0,
52
+ 'total_frames_processed': 0,
53
+ 'total_processing_time': 0.0,
54
+ 'average_fps': 0.0,
55
+ 'failed_frames': 0,
56
+ 'successful_frames': 0,
57
+ 'cache_hits': 0,
58
+ 'segmentation_errors': 0,
59
+ 'refinement_errors': 0
60
+ }
61
+
62
+ # Quality settings based on config
63
+ self.quality_settings = config.get_quality_settings()
64
+
65
+ logger.info("CoreVideoProcessor initialized")
66
+ logger.info(f"Quality preset: {config.quality_preset}")
67
+ logger.info(f"Quality settings: {self.quality_settings}")
68
+
69
+ def process_video(
70
+ self,
71
+ video_path: str,
72
+ background_choice: str,
73
+ custom_background_path: Optional[str] = None,
74
+ progress_callback: Optional[Callable] = None,
75
+ cancel_event: Optional[threading.Event] = None,
76
+ preview_mask: bool = False,
77
+ preview_greenscreen: bool = False
78
+ ) -> Tuple[Optional[str], str]:
79
+ """
80
+ Process video with background replacement
81
+
82
+ Args:
83
+ video_path: Input video path
84
+ background_choice: Background type or name
85
+ custom_background_path: Path to custom background (if applicable)
86
+ progress_callback: Progress update callback
87
+ cancel_event: Event to cancel processing
88
+ preview_mask: Generate mask preview instead of final output
89
+ preview_greenscreen: Generate greenscreen preview
90
+
91
+ Returns:
92
+ Tuple of (output_path, status_message)
93
+ """
94
+ if self.processing_active:
95
+ return None, "Processing already in progress"
96
+
97
+ self.processing_active = True
98
+ start_time = time.time()
99
+
100
+ try:
101
+ # Validate input video
102
+ is_valid, validation_msg = validate_video_file(video_path)
103
+ if not is_valid:
104
+ return None, f"Invalid video file: {validation_msg}"
105
+
106
+ # Open video file
107
+ cap = cv2.VideoCapture(video_path)
108
+ if not cap.isOpened():
109
+ return None, "Could not open video file"
110
+
111
+ # Get video properties
112
+ video_info = self._get_video_info(cap)
113
+ logger.info(f"Processing video: {video_info}")
114
+
115
+ # Check memory requirements
116
+ memory_check = self.memory_manager.can_process_video(
117
+ video_info['width'], video_info['height']
118
+ )
119
+
120
+ if not memory_check['can_process']:
121
+ cap.release()
122
+ return None, f"Insufficient memory: {memory_check['recommendations']}"
123
+
124
+ # Prepare background
125
+ background = self.prepare_background(
126
+ background_choice, custom_background_path,
127
+ video_info['width'], video_info['height']
128
+ )
129
+
130
+ if background is None:
131
+ cap.release()
132
+ return None, "Failed to prepare background"
133
+
134
+ # Setup output video
135
+ output_path = self._setup_output_video(video_info, preview_mask, preview_greenscreen)
136
+ out = self._create_video_writer(output_path, video_info)
137
+
138
+ if out is None:
139
+ cap.release()
140
+ return None, "Could not create output video writer"
141
+
142
+ # Process video frames
143
+ result = self._process_video_frames(
144
+ cap, out, background, video_info,
145
+ progress_callback, cancel_event,
146
+ preview_mask, preview_greenscreen
147
+ )
148
+
149
+ # Cleanup
150
+ cap.release()
151
+ out.release()
152
+
153
+ if result['success']:
154
+ # Update statistics
155
+ processing_time = time.time() - start_time
156
+ self._update_processing_stats(video_info, processing_time, result)
157
+
158
+ success_msg = (
159
+ f"Processing completed successfully!\n"
160
+ f"Processed: {result['successful_frames']}/{result['total_frames']} frames\n"
161
+ f"Time: {processing_time:.1f}s\n"
162
+ f"Average FPS: {result['total_frames'] / processing_time:.1f}\n"
163
+ f"Background: {background_choice}"
164
+ )
165
+
166
+ return output_path, success_msg
167
+ else:
168
+ # Clean up failed output
169
+ try:
170
+ os.remove(output_path)
171
+ except:
172
+ pass
173
+ return None, result['error_message']
174
+
175
+ except Exception as e:
176
+ logger.error(f"Video processing failed: {e}")
177
+ return None, f"Processing failed: {str(e)}"
178
+
179
+ finally:
180
+ self.processing_active = False
181
+
182
+ def _get_video_info(self, cap: cv2.VideoCapture) -> Dict[str, Any]:
183
+ """Extract comprehensive video information"""
184
+ return {
185
+ 'fps': cap.get(cv2.CAP_PROP_FPS),
186
+ 'total_frames': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
187
+ 'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
188
+ 'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
189
+ 'duration': cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS),
190
+ 'codec': int(cap.get(cv2.CAP_PROP_FOURCC))
191
+ }
192
+
193
+ def _setup_output_video(self, video_info: Dict[str, Any],
194
+ preview_mask: bool, preview_greenscreen: bool) -> str:
195
+ """Setup output video path"""
196
+ timestamp = int(time.time())
197
+
198
+ if preview_mask:
199
+ filename = f"mask_preview_{timestamp}.mp4"
200
+ elif preview_greenscreen:
201
+ filename = f"greenscreen_preview_{timestamp}.mp4"
202
+ else:
203
+ filename = f"processed_video_{timestamp}.mp4"
204
+
205
+ return os.path.join(self.config.temp_dir, filename)
206
+
207
+ def _create_video_writer(self, output_path: str,
208
+ video_info: Dict[str, Any]) -> Optional[cv2.VideoWriter]:
209
+ """Create video writer with optimal settings"""
210
+ try:
211
+ # Choose codec based on quality settings
212
+ if self.config.output_quality == 'high':
213
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
214
+ else:
215
+ fourcc = cv2.VideoWriter_fourcc(*'XVID')
216
+
217
+ writer = cv2.VideoWriter(
218
+ output_path,
219
+ fourcc,
220
+ video_info['fps'],
221
+ (video_info['width'], video_info['height'])
222
+ )
223
+
224
+ if not writer.isOpened():
225
+ logger.error("Failed to open video writer")
226
+ return None
227
+
228
+ return writer
229
+
230
+ except Exception as e:
231
+ logger.error(f"Error creating video writer: {e}")
232
+ return None
233
+
234
+ def _process_video_frames(
235
+ self,
236
+ cap: cv2.VideoCapture,
237
+ out: cv2.VideoWriter,
238
+ background: np.ndarray,
239
+ video_info: Dict[str, Any],
240
+ progress_callback: Optional[Callable],
241
+ cancel_event: Optional[threading.Event],
242
+ preview_mask: bool,
243
+ preview_greenscreen: bool
244
+ ) -> Dict[str, Any]:
245
+ """Process all video frames"""
246
+
247
+ # Initialize progress tracking
248
+ progress_tracker = ProgressTracker(
249
+ total_frames=video_info['total_frames'],
250
+ callback=progress_callback,
251
+ track_performance=True
252
+ )
253
+
254
+ frame_count = 0
255
+ successful_frames = 0
256
+ failed_frames = 0
257
+
258
+ # Reset mask cache
259
+ self.last_refined_mask = None
260
+ self.frame_cache.clear()
261
+
262
+ try:
263
+ progress_tracker.set_stage("Processing frames")
264
+
265
+ while True:
266
+ # Check for cancellation
267
+ if cancel_event and cancel_event.is_set():
268
+ return {
269
+ 'success': False,
270
+ 'error_message': 'Processing cancelled by user',
271
+ 'total_frames': frame_count,
272
+ 'successful_frames': successful_frames,
273
+ 'failed_frames': failed_frames
274
+ }
275
+
276
+ # Read frame
277
+ ret, frame = cap.read()
278
+ if not ret:
279
+ break
280
+
281
+ try:
282
+ # Update progress
283
+ progress_tracker.update(frame_count, "Processing frame")
284
+
285
+ # Process frame
286
+ processed_frame = self._process_single_frame(
287
+ frame, background, frame_count,
288
+ preview_mask, preview_greenscreen
289
+ )
290
+
291
+ # Write processed frame
292
+ out.write(processed_frame)
293
+ successful_frames += 1
294
+
295
+ # Memory management
296
+ if frame_count % self.config.memory_cleanup_interval == 0:
297
+ self.memory_manager.auto_cleanup_if_needed()
298
+
299
+ except Exception as frame_error:
300
+ logger.warning(f"Frame {frame_count} processing failed: {frame_error}")
301
+
302
+ # Write original frame as fallback
303
+ out.write(frame)
304
+ failed_frames += 1
305
+ self.stats['failed_frames'] += 1
306
+
307
+ frame_count += 1
308
+
309
+ # Skip frames if configured (for performance)
310
+ if self.config.frame_skip > 1:
311
+ for _ in range(self.config.frame_skip - 1):
312
+ ret, _ = cap.read()
313
+ if not ret:
314
+ break
315
+ frame_count += 1
316
+
317
+ # Finalize progress tracking
318
+ final_stats = progress_tracker.finalize()
319
+
320
+ return {
321
+ 'success': successful_frames > 0,
322
+ 'error_message': f'No frames processed successfully' if successful_frames == 0 else '',
323
+ 'total_frames': frame_count,
324
+ 'successful_frames': successful_frames,
325
+ 'failed_frames': failed_frames,
326
+ 'processing_stats': final_stats
327
+ }
328
+
329
+ except Exception as e:
330
+ logger.error(f"Frame processing loop failed: {e}")
331
+ return {
332
+ 'success': False,
333
+ 'error_message': f'Frame processing failed: {str(e)}',
334
+ 'total_frames': frame_count,
335
+ 'successful_frames': successful_frames,
336
+ 'failed_frames': failed_frames
337
+ }
338
+
339
+ def _process_single_frame(
340
+ self,
341
+ frame: np.ndarray,
342
+ background: np.ndarray,
343
+ frame_number: int,
344
+ preview_mask: bool,
345
+ preview_greenscreen: bool
346
+ ) -> np.ndarray:
347
+ """Process a single video frame"""
348
+
349
+ try:
350
+ # Person segmentation
351
+ mask = self._segment_person(frame, frame_number)
352
+
353
+ # Mask refinement (keyframe-based for performance)
354
+ if self._should_refine_mask(frame_number):
355
+ refined_mask = self._refine_mask(frame, mask, frame_number)
356
+ self.last_refined_mask = refined_mask.copy()
357
+ else:
358
+ # Use temporal consistency with previous refined mask
359
+ refined_mask = self._apply_temporal_consistency(mask, frame_number)
360
+
361
+ # Generate output based on mode
362
+ if preview_mask:
363
+ return self._create_mask_preview(frame, refined_mask)
364
+ elif preview_greenscreen:
365
+ return self._create_greenscreen_preview(frame, refined_mask)
366
+ else:
367
+ return self._replace_background(frame, refined_mask, background)
368
+
369
+ except Exception as e:
370
+ logger.warning(f"Single frame processing failed: {e}")
371
+ raise
372
+
373
+ def _segment_person(self, frame: np.ndarray, frame_number: int) -> np.ndarray:
374
+ """Perform person segmentation"""
375
+ try:
376
+ mask = segment_person_hq(frame, self.sam2_predictor)
377
+
378
+ if mask is None or mask.size == 0:
379
+ raise SegmentationError(frame_number, "Segmentation returned empty mask")
380
+
381
+ return mask
382
+
383
+ except Exception as e:
384
+ self.stats['segmentation_errors'] += 1
385
+ raise SegmentationError(frame_number, f"Segmentation failed: {str(e)}")
386
+
387
+ def _should_refine_mask(self, frame_number: int) -> bool:
388
+ """Determine if mask should be refined for this frame"""
389
+ # Refine on keyframes or if no previous refined mask exists
390
+ return (
391
+ frame_number % self.quality_settings['keyframe_interval'] == 0 or
392
+ self.last_refined_mask is None or
393
+ not self.quality_settings.get('temporal_consistency', True)
394
+ )
395
+
396
+ def _refine_mask(self, frame: np.ndarray, mask: np.ndarray, frame_number: int) -> np.ndarray:
397
+ """Refine mask using MatAnyone or fallback methods"""
398
+ try:
399
+ if self.matanyone_model is not None and self.quality_settings.get('edge_refinement', True):
400
+ refined_mask = refine_mask_hq(frame, mask, self.matanyone_model)
401
+ else:
402
+ # Fallback refinement using OpenCV operations
403
+ refined_mask = self._fallback_mask_refinement(mask)
404
+
405
+ return refined_mask
406
+
407
+ except Exception as e:
408
+ self.stats['refinement_errors'] += 1
409
+ logger.warning(f"Mask refinement failed for frame {frame_number}: {e}")
410
+ # Return original mask as fallback
411
+ return mask
412
+
413
+ def _fallback_mask_refinement(self, mask: np.ndarray) -> np.ndarray:
414
+ """Fallback mask refinement using basic OpenCV operations"""
415
+ try:
416
+ # Morphological operations to clean up mask
417
+ kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
418
+ refined = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
419
+ refined = cv2.morphologyEx(refined, cv2.MORPH_OPEN, kernel)
420
+
421
+ # Smooth edges
422
+ refined = cv2.GaussianBlur(refined, (3, 3), 1.0)
423
+
424
+ return refined
425
+
426
+ except Exception as e:
427
+ logger.warning(f"Fallback mask refinement failed: {e}")
428
+ return mask
429
+
430
+ def _apply_temporal_consistency(self, current_mask: np.ndarray, frame_number: int) -> np.ndarray:
431
+ """Apply temporal consistency using previous refined mask"""
432
+ if self.last_refined_mask is None or not self.quality_settings.get('temporal_consistency', True):
433
+ return current_mask
434
+
435
+ try:
436
+ # Blend current mask with previous refined mask
437
+ alpha = 0.7 # Weight for current mask
438
+ beta = 0.3 # Weight for previous mask
439
+
440
+ # Ensure masks have same shape
441
+ if current_mask.shape != self.last_refined_mask.shape:
442
+ last_mask = cv2.resize(self.last_refined_mask,
443
+ (current_mask.shape[1], current_mask.shape[0]))
444
+ else:
445
+ last_mask = self.last_refined_mask
446
+
447
+ # Weighted blend
448
+ blended_mask = cv2.addWeighted(current_mask, alpha, last_mask, beta, 0)
449
+
450
+ # Apply slight smoothing for temporal stability
451
+ blended_mask = cv2.GaussianBlur(blended_mask, (3, 3), 0.5)
452
+
453
+ return blended_mask
454
+
455
+ except Exception as e:
456
+ logger.warning(f"Temporal consistency application failed: {e}")
457
+ return current_mask
458
+
459
+ def _create_mask_preview(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray:
460
+ """Create mask visualization preview"""
461
+ try:
462
+ # Create colored mask overlay
463
+ mask_colored = np.zeros_like(frame)
464
+ mask_colored[:, :, 1] = mask # Green channel for person
465
+
466
+ # Blend with original frame
467
+ alpha = 0.6
468
+ preview = cv2.addWeighted(frame, 1-alpha, mask_colored, alpha, 0)
469
+
470
+ return preview
471
+
472
+ except Exception as e:
473
+ logger.warning(f"Mask preview creation failed: {e}")
474
+ return frame
475
+
476
+ def _create_greenscreen_preview(self, frame: np.ndarray, mask: np.ndarray) -> np.ndarray:
477
+ """Create green screen preview"""
478
+ try:
479
+ # Create pure green background
480
+ green_bg = np.zeros_like(frame)
481
+ green_bg[:, :] = [0, 255, 0] # Pure green in BGR
482
+
483
+ # Apply mask
484
+ mask_3ch = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) if len(mask.shape) == 2 else mask
485
+ mask_norm = mask_3ch.astype(np.float32) / 255.0
486
+
487
+ result = (frame * mask_norm + green_bg * (1 - mask_norm)).astype(np.uint8)
488
+
489
+ return result
490
+
491
+ except Exception as e:
492
+ logger.warning(f"Greenscreen preview creation failed: {e}")
493
+ return frame
494
+
495
+ def _replace_background(self, frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray:
496
+ """Replace background using the refined mask"""
497
+ try:
498
+ result = replace_background_hq(frame, mask, background)
499
+ return result
500
+
501
+ except Exception as e:
502
+ logger.warning(f"Background replacement failed: {e}")
503
+ return frame
504
+
505
+ def prepare_background(
506
+ self,
507
+ background_choice: str,
508
+ custom_background_path: Optional[str],
509
+ width: int,
510
+ height: int
511
+ ) -> Optional[np.ndarray]:
512
+ """
513
+ Prepare background image for processing
514
+
515
+ Args:
516
+ background_choice: Background type or name
517
+ custom_background_path: Path to custom background
518
+ width: Target width
519
+ height: Target height
520
+
521
+ Returns:
522
+ Prepared background image or None if failed
523
+ """
524
+ try:
525
+ if background_choice == "custom" and custom_background_path:
526
+ if not os.path.exists(custom_background_path):
527
+ raise BackgroundProcessingError("custom", f"File not found: {custom_background_path}")
528
+
529
+ background = cv2.imread(custom_background_path)
530
+ if background is None:
531
+ raise BackgroundProcessingError("custom", "Could not read custom background image")
532
+
533
+ logger.info(f"Loaded custom background: {custom_background_path}")
534
+
535
+ else:
536
+ # Use professional background
537
+ if background_choice not in PROFESSIONAL_BACKGROUNDS:
538
+ raise BackgroundProcessingError(background_choice, "Unknown professional background")
539
+
540
+ bg_config = PROFESSIONAL_BACKGROUNDS[background_choice]
541
+ background = create_professional_background(bg_config, width, height)
542
+
543
+ logger.info(f"Generated professional background: {background_choice}")
544
+
545
+ # Resize to match video dimensions
546
+ if background.shape[:2] != (height, width):
547
+ background = cv2.resize(background, (width, height), interpolation=cv2.INTER_LANCZOS4)
548
+
549
+ # Validate background
550
+ if background is None or background.size == 0:
551
+ raise BackgroundProcessingError(background_choice, "Background image is empty")
552
+
553
+ return background
554
+
555
+ except Exception as e:
556
+ if isinstance(e, BackgroundProcessingError):
557
+ logger.error(str(e))
558
+ return None
559
+ else:
560
+ logger.error(f"Unexpected error preparing background: {e}")
561
+ return None
562
+
563
+ def _update_processing_stats(self, video_info: Dict[str, Any],
564
+ processing_time: float, result: Dict[str, Any]):
565
+ """Update processing statistics"""
566
+ self.stats['videos_processed'] += 1
567
+ self.stats['total_frames_processed'] += result['successful_frames']
568
+ self.stats['total_processing_time'] += processing_time
569
+ self.stats['successful_frames'] += result['successful_frames']
570
+ self.stats['failed_frames'] += result['failed_frames']
571
+
572
+ # Calculate average FPS across all processing
573
+ if self.stats['total_processing_time'] > 0:
574
+ self.stats['average_fps'] = self.stats['total_frames_processed'] / self.stats['total_processing_time']
575
+
576
+ def get_processing_capabilities(self) -> Dict[str, Any]:
577
+ """Get current processing capabilities"""
578
+ return {
579
+ 'sam2_available': self.sam2_predictor is not None,
580
+ 'matanyone_available': self.matanyone_model is not None,
581
+ 'quality_preset': self.config.quality_preset,
582
+ 'supports_temporal_consistency': self.quality_settings.get('temporal_consistency', False),
583
+ 'supports_edge_refinement': self.quality_settings.get('edge_refinement', False),
584
+ 'keyframe_interval': self.quality_settings['keyframe_interval'],
585
+ 'max_resolution': self.config.get_resolution_limits(),
586
+ 'supported_formats': ['.mp4', '.avi', '.mov', '.mkv'],
587
+ 'memory_limit_gb': self.memory_manager.memory_limit_gb
588
+ }
589
+
590
+ def get_status(self) -> Dict[str, Any]:
591
+ """Get current processor status"""
592
+ return {
593
+ 'processing_active': self.processing_active,
594
+ 'models_available': {
595
+ 'sam2': self.sam2_predictor is not None,
596
+ 'matanyone': self.matanyone_model is not None
597
+ },
598
+ 'quality_settings': self.quality_settings,
599
+ 'statistics': self.stats.copy(),
600
+ 'cache_size': len(self.frame_cache),
601
+ 'memory_usage': self.memory_manager.get_memory_usage(),
602
+ 'capabilities': self.get_processing_capabilities()
603
+ }
604
+
605
+ def optimize_for_video(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
606
+ """Optimize settings for specific video characteristics"""
607
+ optimizations = {
608
+ 'original_settings': self.quality_settings.copy(),
609
+ 'optimizations_applied': []
610
+ }
611
+
612
+ try:
613
+ # High resolution video optimizations
614
+ if video_info['width'] * video_info['height'] > 1920 * 1080:
615
+ if self.quality_settings['keyframe_interval'] < 10:
616
+ self.quality_settings['keyframe_interval'] = 10
617
+ optimizations['optimizations_applied'].append('increased_keyframe_interval_for_high_res')
618
+
619
+ # Long video optimizations
620
+ if video_info['duration'] > 300: # 5 minutes
621
+ if self.config.memory_cleanup_interval > 20:
622
+ self.config.memory_cleanup_interval = 20
623
+ optimizations['optimizations_applied'].append('increased_memory_cleanup_frequency')
624
+
625
+ # Low FPS video optimizations
626
+ if video_info['fps'] < 15:
627
+ self.quality_settings['temporal_consistency'] = False
628
+ optimizations['optimizations_applied'].append('disabled_temporal_consistency_for_low_fps')
629
+
630
+ # Memory-constrained optimizations
631
+ memory_usage = self.memory_manager.get_memory_usage()
632
+ memory_pressure = self.memory_manager.check_memory_pressure()
633
+
634
+ if memory_pressure['under_pressure']:
635
+ self.quality_settings['edge_refinement'] = False
636
+ self.quality_settings['keyframe_interval'] = max(self.quality_settings['keyframe_interval'], 15)
637
+ optimizations['optimizations_applied'].extend([
638
+ 'disabled_edge_refinement_for_memory',
639
+ 'increased_keyframe_interval_for_memory'
640
+ ])
641
+
642
+ optimizations['final_settings'] = self.quality_settings.copy()
643
+
644
+ if optimizations['optimizations_applied']:
645
+ logger.info(f"Applied video optimizations: {optimizations['optimizations_applied']}")
646
+
647
+ return optimizations
648
+
649
+ except Exception as e:
650
+ logger.warning(f"Video optimization failed: {e}")
651
+ return optimizations
652
+
653
+ def reset_cache(self):
654
+ """Reset frame cache and temporal state"""
655
+ self.frame_cache.clear()
656
+ self.last_refined_mask = None
657
+ self.stats['cache_hits'] = 0
658
+ logger.debug("Frame cache and temporal state reset")
659
+
660
+ def cleanup(self):
661
+ """Clean up processor resources"""
662
+ try:
663
+ self.reset_cache()
664
+ self.processing_active = False
665
+ logger.info("CoreVideoProcessor cleanup completed")
666
+ except Exception as e:
667
+ logger.warning(f"Error during cleanup: {e}")