""" Video processing API module for BackgroundFX Pro. Wraps CoreVideoProcessor with additional API features for streaming, batching, and real-time processing. """ import cv2 import numpy as np import torch from typing import Dict, List, Optional, Tuple, Union, Callable, Generator, Any from dataclasses import dataclass, field from enum import Enum from pathlib import Path import time import threading from queue import Queue, Empty import tempfile import shutil from concurrent.futures import ThreadPoolExecutor, as_completed import subprocess import json import os import asyncio from datetime import datetime from ..utils.logger import setup_logger from ..utils.device import DeviceManager from ..utils import TimeEstimator, MemoryMonitor from ..core.temporal import TemporalCoherence from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode # Import your existing CoreVideoProcessor from core_video import CoreVideoProcessor logger = setup_logger(__name__) class VideoStreamMode(Enum): """Video streaming modes.""" FILE = "file" WEBCAM = "webcam" RTSP = "rtsp" HTTP = "http" VIRTUAL = "virtual" SCREEN = "screen" class OutputFormat(Enum): """Output format options.""" MP4 = "mp4" AVI = "avi" MOV = "mov" WEBM = "webm" HLS = "hls" DASH = "dash" FRAMES = "frames" @dataclass class StreamConfig: """Configuration for video streaming.""" # Input configuration source: Union[str, int] = 0 # File path, camera index, or URL stream_mode: VideoStreamMode = VideoStreamMode.FILE # Output configuration output_path: Optional[str] = None output_format: OutputFormat = OutputFormat.MP4 output_codec: str = "h264" output_bitrate: str = "5M" output_fps: Optional[float] = None # Streaming settings buffer_size: int = 30 chunk_duration: float = 2.0 # For HLS/DASH enable_adaptive_bitrate: bool = False # Real-time settings enable_preview: bool = False preview_scale: float = 0.5 low_latency: bool = False # Performance hardware_acceleration: bool = True num_threads: int = 4 @dataclass class VideoStats: """Enhanced video processing statistics.""" # Timing start_time: float = 0.0 total_duration: float = 0.0 processing_fps: float = 0.0 # Frame stats frames_total: int = 0 frames_processed: int = 0 frames_dropped: int = 0 frames_cached: int = 0 # Quality metrics avg_quality_score: float = 0.0 min_quality_score: float = 1.0 max_quality_score: float = 0.0 # Performance cpu_usage: float = 0.0 gpu_usage: float = 0.0 memory_usage_mb: float = 0.0 # Errors error_count: int = 0 warnings: List[str] = field(default_factory=list) class VideoProcessorAPI: """ API wrapper for video processing with streaming and real-time capabilities. Extends CoreVideoProcessor with additional features. """ def __init__(self, core_processor: Optional[CoreVideoProcessor] = None): """ Initialize Video Processor API. Args: core_processor: Optional existing CoreVideoProcessor instance """ self.logger = setup_logger(f"{__name__}.VideoProcessorAPI") # Use provided core processor or create pipeline-based one self.core_processor = core_processor self.pipeline = ProcessingPipeline(PipelineConfig(mode=ProcessingMode.VIDEO)) # State management self.is_processing = False self.is_streaming = False self.should_stop = False # Statistics self.stats = VideoStats() # Streaming components self.input_queue = Queue(maxsize=100) self.output_queue = Queue(maxsize=100) self.preview_queue = Queue(maxsize=10) # Thread pool self.executor = ThreadPoolExecutor(max_workers=8) self.stream_thread = None self.process_threads = [] # FFmpeg process for advanced streaming self.ffmpeg_process = None # WebRTC support self.webrtc_peers = {} self.logger.info("VideoProcessorAPI initialized") async def process_video_async(self, input_path: str, output_path: str, background: Optional[Union[str, np.ndarray]] = None, progress_callback: Optional[Callable] = None) -> VideoStats: """ Asynchronously process a video file. Args: input_path: Path to input video output_path: Path to output video background: Background image or path progress_callback: Progress callback function Returns: Processing statistics """ return await asyncio.get_event_loop().run_in_executor( None, self.process_video, input_path, output_path, background, progress_callback ) def process_video(self, input_path: str, output_path: str, background: Optional[Union[str, np.ndarray]] = None, progress_callback: Optional[Callable] = None) -> VideoStats: """ Process a video file using either CoreVideoProcessor or Pipeline. Args: input_path: Path to input video output_path: Path to output video background: Background image or path progress_callback: Progress callback function Returns: Processing statistics """ self.stats = VideoStats(start_time=time.time()) self.is_processing = True try: # If we have CoreVideoProcessor, use it if self.core_processor: return self._process_with_core( input_path, output_path, background, progress_callback ) else: # Use pipeline-based processing return self._process_with_pipeline( input_path, output_path, background, progress_callback ) finally: self.is_processing = False self.stats.total_duration = time.time() - self.stats.start_time def _process_with_pipeline(self, input_path: str, output_path: str, background: Optional[Union[str, np.ndarray]], progress_callback: Optional[Callable]) -> VideoStats: """Process video using the Pipeline system.""" cap = cv2.VideoCapture(input_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {input_path}") # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.stats.frames_total = total_frames # Setup output writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) frame_idx = 0 try: while True: ret, frame = cap.read() if not ret: break # Process frame through pipeline result = self.pipeline.process_image(frame, background) if result.success and result.output_image is not None: out.write(result.output_image) self.stats.frames_processed += 1 # Update quality metrics self._update_quality_stats(result.quality_score) else: # Write original frame on failure out.write(frame) self.stats.frames_dropped += 1 frame_idx += 1 # Progress callback if progress_callback: progress = frame_idx / total_frames progress_callback(progress, { 'current_frame': frame_idx, 'total_frames': total_frames, 'fps': self.stats.frames_processed / (time.time() - self.stats.start_time) }) # Check if should stop if self.should_stop: break finally: cap.release() out.release() self.stats.processing_fps = self.stats.frames_processed / (time.time() - self.stats.start_time) return self.stats def _process_with_core(self, input_path: str, output_path: str, background: Optional[Union[str, np.ndarray]], progress_callback: Optional[Callable]) -> VideoStats: """Process video using CoreVideoProcessor.""" # Determine background choice if isinstance(background, str): if os.path.exists(background): bg_choice = "custom" custom_bg = background else: bg_choice = background custom_bg = None elif isinstance(background, np.ndarray): # Save background to temp file temp_bg = tempfile.NamedTemporaryFile(suffix='.png', delete=False) cv2.imwrite(temp_bg.name, background) bg_choice = "custom" custom_bg = temp_bg.name else: bg_choice = "blur" custom_bg = None # Process with CoreVideoProcessor output, message = self.core_processor.process_video( input_path, bg_choice, custom_bg, progress_callback ) if output: # Move output to desired location shutil.move(output, output_path) # Extract stats from core processor core_stats = self.core_processor.stats self.stats.frames_processed = core_stats.get('successful_frames', 0) self.stats.frames_dropped = core_stats.get('failed_frames', 0) self.stats.processing_fps = core_stats.get('average_fps', 0) return self.stats def start_stream_processing(self, config: StreamConfig, background: Optional[Union[str, np.ndarray]] = None) -> bool: """ Start real-time stream processing. Args: config: Stream configuration background: Background for replacement Returns: True if stream started successfully """ if self.is_streaming: self.logger.warning("Stream already active") return False self.is_streaming = True self.should_stop = False # Start input stream thread self.stream_thread = threading.Thread( target=self._stream_input_handler, args=(config,) ) self.stream_thread.start() # Start processing threads for i in range(config.num_threads): thread = threading.Thread( target=self._stream_processor, args=(background,) ) thread.start() self.process_threads.append(thread) # Start output handler if config.output_format in [OutputFormat.HLS, OutputFormat.DASH]: self._start_adaptive_streaming(config) else: self._start_output_handler(config) self.logger.info(f"Stream processing started: {config.stream_mode.value}") return True def _stream_input_handler(self, config: StreamConfig): """Handle input stream capture.""" try: # Open input stream if config.stream_mode == VideoStreamMode.FILE: cap = cv2.VideoCapture(config.source) elif config.stream_mode == VideoStreamMode.WEBCAM: cap = cv2.VideoCapture(int(config.source)) elif config.stream_mode in [VideoStreamMode.RTSP, VideoStreamMode.HTTP]: cap = cv2.VideoCapture(config.source) elif config.stream_mode == VideoStreamMode.SCREEN: # Screen capture (platform-specific) cap = self._setup_screen_capture() else: raise ValueError(f"Unsupported stream mode: {config.stream_mode}") if not cap.isOpened(): raise ValueError("Failed to open stream") frame_count = 0 while self.is_streaming and not self.should_stop: ret, frame = cap.read() if not ret: if config.stream_mode == VideoStreamMode.FILE: # End of file break else: # Retry for live streams time.sleep(0.1) continue # Add frame to processing queue try: self.input_queue.put(frame, timeout=0.1) frame_count += 1 except: # Queue full, drop frame self.stats.frames_dropped += 1 # Control frame rate for live streams if config.stream_mode != VideoStreamMode.FILE: time.sleep(1.0 / 30) # 30 FPS limit cap.release() except Exception as e: self.logger.error(f"Stream input handler error: {e}") finally: self.is_streaming = False def _stream_processor(self, background: Optional[Union[str, np.ndarray]]): """Process frames from input queue.""" while self.is_streaming or not self.input_queue.empty(): try: frame = self.input_queue.get(timeout=0.5) # Process frame result = self.pipeline.process_image(frame, background) if result.success and result.output_image is not None: # Add to output queue self.output_queue.put(result.output_image) # Update stats self.stats.frames_processed += 1 self._update_quality_stats(result.quality_score) # Add to preview queue if enabled if not self.preview_queue.full(): preview = cv2.resize(result.output_image, None, fx=0.5, fy=0.5) try: self.preview_queue.put_nowait(preview) except: pass except Empty: continue except Exception as e: self.logger.error(f"Stream processor error: {e}") self.stats.error_count += 1 def _start_output_handler(self, config: StreamConfig): """Start output stream handler.""" output_thread = threading.Thread( target=self._output_handler, args=(config,) ) output_thread.start() self.process_threads.append(output_thread) def _output_handler(self, config: StreamConfig): """Handle output stream writing.""" try: if config.output_format == OutputFormat.FRAMES: # Save individual frames self._save_frames_output(config) else: # Video file output self._save_video_output(config) except Exception as e: self.logger.error(f"Output handler error: {e}") def _save_video_output(self, config: StreamConfig): """Save processed frames to video file.""" out = None frame_count = 0 try: while self.is_streaming or not self.output_queue.empty(): try: frame = self.output_queue.get(timeout=0.5) # Initialize writer on first frame if out is None: h, w = frame.shape[:2] fps = config.output_fps or 30.0 if config.output_format == OutputFormat.MP4: fourcc = cv2.VideoWriter_fourcc(*'mp4v') elif config.output_format == OutputFormat.AVI: fourcc = cv2.VideoWriter_fourcc(*'XVID') else: fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter( config.output_path, fourcc, fps, (w, h) ) out.write(frame) frame_count += 1 except Empty: continue finally: if out: out.release() self.logger.info(f"Saved {frame_count} frames to {config.output_path}") def _save_frames_output(self, config: StreamConfig): """Save processed frames as individual images.""" output_dir = Path(config.output_path) output_dir.mkdir(parents=True, exist_ok=True) frame_count = 0 while self.is_streaming or not self.output_queue.empty(): try: frame = self.output_queue.get(timeout=0.5) # Save frame frame_path = output_dir / f"frame_{frame_count:06d}.png" cv2.imwrite(str(frame_path), frame) frame_count += 1 except Empty: continue def _start_adaptive_streaming(self, config: StreamConfig): """Start HLS or DASH adaptive streaming.""" try: # Prepare FFmpeg command for streaming if config.output_format == OutputFormat.HLS: self._start_hls_streaming(config) elif config.output_format == OutputFormat.DASH: self._start_dash_streaming(config) except Exception as e: self.logger.error(f"Adaptive streaming setup failed: {e}") def _start_hls_streaming(self, config: StreamConfig): """Start HLS streaming with FFmpeg.""" output_dir = Path(config.output_path) output_dir.mkdir(parents=True, exist_ok=True) # FFmpeg command for HLS cmd = [ 'ffmpeg', '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', '1920x1080', # Will be updated with actual size '-r', '30', '-i', '-', # Input from pipe '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency', '-f', 'hls', '-hls_time', str(config.chunk_duration), '-hls_list_size', '10', '-hls_flags', 'delete_segments', str(output_dir / 'stream.m3u8') ] # Start FFmpeg process self.ffmpeg_process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Start thread to pipe frames to FFmpeg ffmpeg_thread = threading.Thread( target=self._pipe_to_ffmpeg ) ffmpeg_thread.start() self.process_threads.append(ffmpeg_thread) self.logger.info(f"HLS streaming started: {output_dir / 'stream.m3u8'}") def _pipe_to_ffmpeg(self): """Pipe processed frames to FFmpeg.""" while self.is_streaming or not self.output_queue.empty(): try: frame = self.output_queue.get(timeout=0.5) if self.ffmpeg_process and self.ffmpeg_process.stdin: self.ffmpeg_process.stdin.write(frame.tobytes()) except Empty: continue except Exception as e: self.logger.error(f"FFmpeg pipe error: {e}") break def _setup_screen_capture(self) -> cv2.VideoCapture: """Setup screen capture (platform-specific).""" # This would need platform-specific implementation # For now, return a dummy capture return cv2.VideoCapture(0) def _update_quality_stats(self, quality_score: float): """Update quality statistics.""" n = self.stats.frames_processed if n == 0: self.stats.avg_quality_score = quality_score else: self.stats.avg_quality_score = ( (self.stats.avg_quality_score * n + quality_score) / (n + 1) ) self.stats.min_quality_score = min(self.stats.min_quality_score, quality_score) self.stats.max_quality_score = max(self.stats.max_quality_score, quality_score) def stop_stream_processing(self): """Stop stream processing.""" self.should_stop = True self.is_streaming = False # Wait for threads to finish if self.stream_thread: self.stream_thread.join(timeout=5) for thread in self.process_threads: thread.join(timeout=5) # Stop FFmpeg if running if self.ffmpeg_process: self.ffmpeg_process.terminate() self.ffmpeg_process.wait(timeout=5) self.logger.info("Stream processing stopped") def get_preview_frame(self) -> Optional[np.ndarray]: """Get a preview frame from the preview queue.""" try: return self.preview_queue.get_nowait() except Empty: return None def get_stats(self) -> VideoStats: """Get current processing statistics.""" if self.is_processing or self.is_streaming: self.stats.processing_fps = ( self.stats.frames_processed / (time.time() - self.stats.start_time) ) return self.stats def process_video_batch(self, input_paths: List[str], output_dir: str, background: Optional[Union[str, np.ndarray]] = None, parallel: bool = True) -> List[VideoStats]: """ Process multiple videos in batch. Args: input_paths: List of input video paths output_dir: Output directory background: Background for all videos parallel: Process in parallel Returns: List of processing statistics """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) results = [] if parallel: # Process in parallel futures = [] for input_path in input_paths: input_name = Path(input_path).stem output_path = output_dir / f"{input_name}_processed.mp4" future = self.executor.submit( self.process_video, input_path, str(output_path), background ) futures.append(future) # Collect results for future in as_completed(futures): try: stats = future.result(timeout=3600) # 1 hour timeout results.append(stats) except Exception as e: self.logger.error(f"Batch processing error: {e}") results.append(VideoStats(error_count=1)) else: # Process sequentially for input_path in input_paths: input_name = Path(input_path).stem output_path = output_dir / f"{input_name}_processed.mp4" stats = self.process_video( input_path, str(output_path), background ) results.append(stats) return results def export_to_format(self, input_path: str, output_path: str, format: OutputFormat, **kwargs) -> bool: """ Export processed video to specific format. Args: input_path: Input video path output_path: Output path format: Target format **kwargs: Format-specific options Returns: True if successful """ try: if format == OutputFormat.WEBM: cmd = [ 'ffmpeg', '-i', input_path, '-c:v', 'libvpx-vp9', '-crf', '30', '-b:v', '0', output_path ] elif format == OutputFormat.HLS: cmd = [ 'ffmpeg', '-i', input_path, '-c:v', 'libx264', '-hls_time', '10', '-hls_list_size', '0', '-f', 'hls', output_path ] else: # Default MP4 conversion cmd = [ 'ffmpeg', '-i', input_path, '-c:v', 'libx264', '-preset', 'medium', '-crf', '23', output_path ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception as e: self.logger.error(f"Export failed: {e}") return False def cleanup(self): """Cleanup resources.""" self.stop_stream_processing() self.executor.shutdown(wait=True) if self.core_processor: self.core_processor.cleanup() self.logger.info("VideoProcessorAPI cleanup complete")