VideoBackgroundReplacer / api /video_processor.py
MogensR's picture
Rename video_processor.py to api/video_processor.py
00a34b8
"""
Video processing API module for BackgroundFX Pro.
Wraps CoreVideoProcessor with additional API features for streaming, batching, and real-time processing.
"""
import cv2
import numpy as np
import torch
from typing import Dict, List, Optional, Tuple, Union, Callable, Generator, Any
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import time
import threading
from queue import Queue, Empty
import tempfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess
import json
import os
import asyncio
from datetime import datetime
from ..utils.logger import setup_logger
from ..utils.device import DeviceManager
from ..utils import TimeEstimator, MemoryMonitor
from ..core.temporal import TemporalCoherence
from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode
# Import your existing CoreVideoProcessor
from core_video import CoreVideoProcessor
logger = setup_logger(__name__)
class VideoStreamMode(Enum):
"""Video streaming modes."""
FILE = "file"
WEBCAM = "webcam"
RTSP = "rtsp"
HTTP = "http"
VIRTUAL = "virtual"
SCREEN = "screen"
class OutputFormat(Enum):
"""Output format options."""
MP4 = "mp4"
AVI = "avi"
MOV = "mov"
WEBM = "webm"
HLS = "hls"
DASH = "dash"
FRAMES = "frames"
@dataclass
class StreamConfig:
"""Configuration for video streaming."""
# Input configuration
source: Union[str, int] = 0 # File path, camera index, or URL
stream_mode: VideoStreamMode = VideoStreamMode.FILE
# Output configuration
output_path: Optional[str] = None
output_format: OutputFormat = OutputFormat.MP4
output_codec: str = "h264"
output_bitrate: str = "5M"
output_fps: Optional[float] = None
# Streaming settings
buffer_size: int = 30
chunk_duration: float = 2.0 # For HLS/DASH
enable_adaptive_bitrate: bool = False
# Real-time settings
enable_preview: bool = False
preview_scale: float = 0.5
low_latency: bool = False
# Performance
hardware_acceleration: bool = True
num_threads: int = 4
@dataclass
class VideoStats:
"""Enhanced video processing statistics."""
# Timing
start_time: float = 0.0
total_duration: float = 0.0
processing_fps: float = 0.0
# Frame stats
frames_total: int = 0
frames_processed: int = 0
frames_dropped: int = 0
frames_cached: int = 0
# Quality metrics
avg_quality_score: float = 0.0
min_quality_score: float = 1.0
max_quality_score: float = 0.0
# Performance
cpu_usage: float = 0.0
gpu_usage: float = 0.0
memory_usage_mb: float = 0.0
# Errors
error_count: int = 0
warnings: List[str] = field(default_factory=list)
class VideoProcessorAPI:
"""
API wrapper for video processing with streaming and real-time capabilities.
Extends CoreVideoProcessor with additional features.
"""
def __init__(self, core_processor: Optional[CoreVideoProcessor] = None):
"""
Initialize Video Processor API.
Args:
core_processor: Optional existing CoreVideoProcessor instance
"""
self.logger = setup_logger(f"{__name__}.VideoProcessorAPI")
# Use provided core processor or create pipeline-based one
self.core_processor = core_processor
self.pipeline = ProcessingPipeline(PipelineConfig(mode=ProcessingMode.VIDEO))
# State management
self.is_processing = False
self.is_streaming = False
self.should_stop = False
# Statistics
self.stats = VideoStats()
# Streaming components
self.input_queue = Queue(maxsize=100)
self.output_queue = Queue(maxsize=100)
self.preview_queue = Queue(maxsize=10)
# Thread pool
self.executor = ThreadPoolExecutor(max_workers=8)
self.stream_thread = None
self.process_threads = []
# FFmpeg process for advanced streaming
self.ffmpeg_process = None
# WebRTC support
self.webrtc_peers = {}
self.logger.info("VideoProcessorAPI initialized")
async def process_video_async(self,
input_path: str,
output_path: str,
background: Optional[Union[str, np.ndarray]] = None,
progress_callback: Optional[Callable] = None) -> VideoStats:
"""
Asynchronously process a video file.
Args:
input_path: Path to input video
output_path: Path to output video
background: Background image or path
progress_callback: Progress callback function
Returns:
Processing statistics
"""
return await asyncio.get_event_loop().run_in_executor(
None,
self.process_video,
input_path,
output_path,
background,
progress_callback
)
def process_video(self,
input_path: str,
output_path: str,
background: Optional[Union[str, np.ndarray]] = None,
progress_callback: Optional[Callable] = None) -> VideoStats:
"""
Process a video file using either CoreVideoProcessor or Pipeline.
Args:
input_path: Path to input video
output_path: Path to output video
background: Background image or path
progress_callback: Progress callback function
Returns:
Processing statistics
"""
self.stats = VideoStats(start_time=time.time())
self.is_processing = True
try:
# If we have CoreVideoProcessor, use it
if self.core_processor:
return self._process_with_core(
input_path, output_path, background, progress_callback
)
else:
# Use pipeline-based processing
return self._process_with_pipeline(
input_path, output_path, background, progress_callback
)
finally:
self.is_processing = False
self.stats.total_duration = time.time() - self.stats.start_time
def _process_with_pipeline(self,
input_path: str,
output_path: str,
background: Optional[Union[str, np.ndarray]],
progress_callback: Optional[Callable]) -> VideoStats:
"""Process video using the Pipeline system."""
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {input_path}")
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.stats.frames_total = total_frames
# Setup output writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
frame_idx = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
# Process frame through pipeline
result = self.pipeline.process_image(frame, background)
if result.success and result.output_image is not None:
out.write(result.output_image)
self.stats.frames_processed += 1
# Update quality metrics
self._update_quality_stats(result.quality_score)
else:
# Write original frame on failure
out.write(frame)
self.stats.frames_dropped += 1
frame_idx += 1
# Progress callback
if progress_callback:
progress = frame_idx / total_frames
progress_callback(progress, {
'current_frame': frame_idx,
'total_frames': total_frames,
'fps': self.stats.frames_processed / (time.time() - self.stats.start_time)
})
# Check if should stop
if self.should_stop:
break
finally:
cap.release()
out.release()
self.stats.processing_fps = self.stats.frames_processed / (time.time() - self.stats.start_time)
return self.stats
def _process_with_core(self,
input_path: str,
output_path: str,
background: Optional[Union[str, np.ndarray]],
progress_callback: Optional[Callable]) -> VideoStats:
"""Process video using CoreVideoProcessor."""
# Determine background choice
if isinstance(background, str):
if os.path.exists(background):
bg_choice = "custom"
custom_bg = background
else:
bg_choice = background
custom_bg = None
elif isinstance(background, np.ndarray):
# Save background to temp file
temp_bg = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
cv2.imwrite(temp_bg.name, background)
bg_choice = "custom"
custom_bg = temp_bg.name
else:
bg_choice = "blur"
custom_bg = None
# Process with CoreVideoProcessor
output, message = self.core_processor.process_video(
input_path,
bg_choice,
custom_bg,
progress_callback
)
if output:
# Move output to desired location
shutil.move(output, output_path)
# Extract stats from core processor
core_stats = self.core_processor.stats
self.stats.frames_processed = core_stats.get('successful_frames', 0)
self.stats.frames_dropped = core_stats.get('failed_frames', 0)
self.stats.processing_fps = core_stats.get('average_fps', 0)
return self.stats
def start_stream_processing(self,
config: StreamConfig,
background: Optional[Union[str, np.ndarray]] = None) -> bool:
"""
Start real-time stream processing.
Args:
config: Stream configuration
background: Background for replacement
Returns:
True if stream started successfully
"""
if self.is_streaming:
self.logger.warning("Stream already active")
return False
self.is_streaming = True
self.should_stop = False
# Start input stream thread
self.stream_thread = threading.Thread(
target=self._stream_input_handler,
args=(config,)
)
self.stream_thread.start()
# Start processing threads
for i in range(config.num_threads):
thread = threading.Thread(
target=self._stream_processor,
args=(background,)
)
thread.start()
self.process_threads.append(thread)
# Start output handler
if config.output_format in [OutputFormat.HLS, OutputFormat.DASH]:
self._start_adaptive_streaming(config)
else:
self._start_output_handler(config)
self.logger.info(f"Stream processing started: {config.stream_mode.value}")
return True
def _stream_input_handler(self, config: StreamConfig):
"""Handle input stream capture."""
try:
# Open input stream
if config.stream_mode == VideoStreamMode.FILE:
cap = cv2.VideoCapture(config.source)
elif config.stream_mode == VideoStreamMode.WEBCAM:
cap = cv2.VideoCapture(int(config.source))
elif config.stream_mode in [VideoStreamMode.RTSP, VideoStreamMode.HTTP]:
cap = cv2.VideoCapture(config.source)
elif config.stream_mode == VideoStreamMode.SCREEN:
# Screen capture (platform-specific)
cap = self._setup_screen_capture()
else:
raise ValueError(f"Unsupported stream mode: {config.stream_mode}")
if not cap.isOpened():
raise ValueError("Failed to open stream")
frame_count = 0
while self.is_streaming and not self.should_stop:
ret, frame = cap.read()
if not ret:
if config.stream_mode == VideoStreamMode.FILE:
# End of file
break
else:
# Retry for live streams
time.sleep(0.1)
continue
# Add frame to processing queue
try:
self.input_queue.put(frame, timeout=0.1)
frame_count += 1
except:
# Queue full, drop frame
self.stats.frames_dropped += 1
# Control frame rate for live streams
if config.stream_mode != VideoStreamMode.FILE:
time.sleep(1.0 / 30) # 30 FPS limit
cap.release()
except Exception as e:
self.logger.error(f"Stream input handler error: {e}")
finally:
self.is_streaming = False
def _stream_processor(self, background: Optional[Union[str, np.ndarray]]):
"""Process frames from input queue."""
while self.is_streaming or not self.input_queue.empty():
try:
frame = self.input_queue.get(timeout=0.5)
# Process frame
result = self.pipeline.process_image(frame, background)
if result.success and result.output_image is not None:
# Add to output queue
self.output_queue.put(result.output_image)
# Update stats
self.stats.frames_processed += 1
self._update_quality_stats(result.quality_score)
# Add to preview queue if enabled
if not self.preview_queue.full():
preview = cv2.resize(result.output_image, None, fx=0.5, fy=0.5)
try:
self.preview_queue.put_nowait(preview)
except:
pass
except Empty:
continue
except Exception as e:
self.logger.error(f"Stream processor error: {e}")
self.stats.error_count += 1
def _start_output_handler(self, config: StreamConfig):
"""Start output stream handler."""
output_thread = threading.Thread(
target=self._output_handler,
args=(config,)
)
output_thread.start()
self.process_threads.append(output_thread)
def _output_handler(self, config: StreamConfig):
"""Handle output stream writing."""
try:
if config.output_format == OutputFormat.FRAMES:
# Save individual frames
self._save_frames_output(config)
else:
# Video file output
self._save_video_output(config)
except Exception as e:
self.logger.error(f"Output handler error: {e}")
def _save_video_output(self, config: StreamConfig):
"""Save processed frames to video file."""
out = None
frame_count = 0
try:
while self.is_streaming or not self.output_queue.empty():
try:
frame = self.output_queue.get(timeout=0.5)
# Initialize writer on first frame
if out is None:
h, w = frame.shape[:2]
fps = config.output_fps or 30.0
if config.output_format == OutputFormat.MP4:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
elif config.output_format == OutputFormat.AVI:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
else:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(
config.output_path,
fourcc,
fps,
(w, h)
)
out.write(frame)
frame_count += 1
except Empty:
continue
finally:
if out:
out.release()
self.logger.info(f"Saved {frame_count} frames to {config.output_path}")
def _save_frames_output(self, config: StreamConfig):
"""Save processed frames as individual images."""
output_dir = Path(config.output_path)
output_dir.mkdir(parents=True, exist_ok=True)
frame_count = 0
while self.is_streaming or not self.output_queue.empty():
try:
frame = self.output_queue.get(timeout=0.5)
# Save frame
frame_path = output_dir / f"frame_{frame_count:06d}.png"
cv2.imwrite(str(frame_path), frame)
frame_count += 1
except Empty:
continue
def _start_adaptive_streaming(self, config: StreamConfig):
"""Start HLS or DASH adaptive streaming."""
try:
# Prepare FFmpeg command for streaming
if config.output_format == OutputFormat.HLS:
self._start_hls_streaming(config)
elif config.output_format == OutputFormat.DASH:
self._start_dash_streaming(config)
except Exception as e:
self.logger.error(f"Adaptive streaming setup failed: {e}")
def _start_hls_streaming(self, config: StreamConfig):
"""Start HLS streaming with FFmpeg."""
output_dir = Path(config.output_path)
output_dir.mkdir(parents=True, exist_ok=True)
# FFmpeg command for HLS
cmd = [
'ffmpeg',
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', '1920x1080', # Will be updated with actual size
'-r', '30',
'-i', '-', # Input from pipe
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-f', 'hls',
'-hls_time', str(config.chunk_duration),
'-hls_list_size', '10',
'-hls_flags', 'delete_segments',
str(output_dir / 'stream.m3u8')
]
# Start FFmpeg process
self.ffmpeg_process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Start thread to pipe frames to FFmpeg
ffmpeg_thread = threading.Thread(
target=self._pipe_to_ffmpeg
)
ffmpeg_thread.start()
self.process_threads.append(ffmpeg_thread)
self.logger.info(f"HLS streaming started: {output_dir / 'stream.m3u8'}")
def _pipe_to_ffmpeg(self):
"""Pipe processed frames to FFmpeg."""
while self.is_streaming or not self.output_queue.empty():
try:
frame = self.output_queue.get(timeout=0.5)
if self.ffmpeg_process and self.ffmpeg_process.stdin:
self.ffmpeg_process.stdin.write(frame.tobytes())
except Empty:
continue
except Exception as e:
self.logger.error(f"FFmpeg pipe error: {e}")
break
def _setup_screen_capture(self) -> cv2.VideoCapture:
"""Setup screen capture (platform-specific)."""
# This would need platform-specific implementation
# For now, return a dummy capture
return cv2.VideoCapture(0)
def _update_quality_stats(self, quality_score: float):
"""Update quality statistics."""
n = self.stats.frames_processed
if n == 0:
self.stats.avg_quality_score = quality_score
else:
self.stats.avg_quality_score = (
(self.stats.avg_quality_score * n + quality_score) / (n + 1)
)
self.stats.min_quality_score = min(self.stats.min_quality_score, quality_score)
self.stats.max_quality_score = max(self.stats.max_quality_score, quality_score)
def stop_stream_processing(self):
"""Stop stream processing."""
self.should_stop = True
self.is_streaming = False
# Wait for threads to finish
if self.stream_thread:
self.stream_thread.join(timeout=5)
for thread in self.process_threads:
thread.join(timeout=5)
# Stop FFmpeg if running
if self.ffmpeg_process:
self.ffmpeg_process.terminate()
self.ffmpeg_process.wait(timeout=5)
self.logger.info("Stream processing stopped")
def get_preview_frame(self) -> Optional[np.ndarray]:
"""Get a preview frame from the preview queue."""
try:
return self.preview_queue.get_nowait()
except Empty:
return None
def get_stats(self) -> VideoStats:
"""Get current processing statistics."""
if self.is_processing or self.is_streaming:
self.stats.processing_fps = (
self.stats.frames_processed /
(time.time() - self.stats.start_time)
)
return self.stats
def process_video_batch(self,
input_paths: List[str],
output_dir: str,
background: Optional[Union[str, np.ndarray]] = None,
parallel: bool = True) -> List[VideoStats]:
"""
Process multiple videos in batch.
Args:
input_paths: List of input video paths
output_dir: Output directory
background: Background for all videos
parallel: Process in parallel
Returns:
List of processing statistics
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
results = []
if parallel:
# Process in parallel
futures = []
for input_path in input_paths:
input_name = Path(input_path).stem
output_path = output_dir / f"{input_name}_processed.mp4"
future = self.executor.submit(
self.process_video,
input_path,
str(output_path),
background
)
futures.append(future)
# Collect results
for future in as_completed(futures):
try:
stats = future.result(timeout=3600) # 1 hour timeout
results.append(stats)
except Exception as e:
self.logger.error(f"Batch processing error: {e}")
results.append(VideoStats(error_count=1))
else:
# Process sequentially
for input_path in input_paths:
input_name = Path(input_path).stem
output_path = output_dir / f"{input_name}_processed.mp4"
stats = self.process_video(
input_path,
str(output_path),
background
)
results.append(stats)
return results
def export_to_format(self,
input_path: str,
output_path: str,
format: OutputFormat,
**kwargs) -> bool:
"""
Export processed video to specific format.
Args:
input_path: Input video path
output_path: Output path
format: Target format
**kwargs: Format-specific options
Returns:
True if successful
"""
try:
if format == OutputFormat.WEBM:
cmd = [
'ffmpeg', '-i', input_path,
'-c:v', 'libvpx-vp9',
'-crf', '30',
'-b:v', '0',
output_path
]
elif format == OutputFormat.HLS:
cmd = [
'ffmpeg', '-i', input_path,
'-c:v', 'libx264',
'-hls_time', '10',
'-hls_list_size', '0',
'-f', 'hls',
output_path
]
else:
# Default MP4 conversion
cmd = [
'ffmpeg', '-i', input_path,
'-c:v', 'libx264',
'-preset', 'medium',
'-crf', '23',
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
except Exception as e:
self.logger.error(f"Export failed: {e}")
return False
def cleanup(self):
"""Cleanup resources."""
self.stop_stream_processing()
self.executor.shutdown(wait=True)
if self.core_processor:
self.core_processor.cleanup()
self.logger.info("VideoProcessorAPI cleanup complete")