|
|
""" |
|
|
Video processing API module for BackgroundFX Pro. |
|
|
Wraps CoreVideoProcessor with additional API features for streaming, batching, and real-time processing. |
|
|
""" |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import torch |
|
|
from typing import Dict, List, Optional, Tuple, Union, Callable, Generator, Any |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
from pathlib import Path |
|
|
import time |
|
|
import threading |
|
|
from queue import Queue, Empty |
|
|
import tempfile |
|
|
import shutil |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
import subprocess |
|
|
import json |
|
|
import os |
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
|
|
|
from ..utils.logger import setup_logger |
|
|
from ..utils.device import DeviceManager |
|
|
from ..utils import TimeEstimator, MemoryMonitor |
|
|
from ..core.temporal import TemporalCoherence |
|
|
from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode |
|
|
|
|
|
|
|
|
from core_video import CoreVideoProcessor |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
|
|
|
class VideoStreamMode(Enum): |
|
|
"""Video streaming modes.""" |
|
|
FILE = "file" |
|
|
WEBCAM = "webcam" |
|
|
RTSP = "rtsp" |
|
|
HTTP = "http" |
|
|
VIRTUAL = "virtual" |
|
|
SCREEN = "screen" |
|
|
|
|
|
|
|
|
class OutputFormat(Enum): |
|
|
"""Output format options.""" |
|
|
MP4 = "mp4" |
|
|
AVI = "avi" |
|
|
MOV = "mov" |
|
|
WEBM = "webm" |
|
|
HLS = "hls" |
|
|
DASH = "dash" |
|
|
FRAMES = "frames" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class StreamConfig: |
|
|
"""Configuration for video streaming.""" |
|
|
|
|
|
source: Union[str, int] = 0 |
|
|
stream_mode: VideoStreamMode = VideoStreamMode.FILE |
|
|
|
|
|
|
|
|
output_path: Optional[str] = None |
|
|
output_format: OutputFormat = OutputFormat.MP4 |
|
|
output_codec: str = "h264" |
|
|
output_bitrate: str = "5M" |
|
|
output_fps: Optional[float] = None |
|
|
|
|
|
|
|
|
buffer_size: int = 30 |
|
|
chunk_duration: float = 2.0 |
|
|
enable_adaptive_bitrate: bool = False |
|
|
|
|
|
|
|
|
enable_preview: bool = False |
|
|
preview_scale: float = 0.5 |
|
|
low_latency: bool = False |
|
|
|
|
|
|
|
|
hardware_acceleration: bool = True |
|
|
num_threads: int = 4 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class VideoStats: |
|
|
"""Enhanced video processing statistics.""" |
|
|
|
|
|
start_time: float = 0.0 |
|
|
total_duration: float = 0.0 |
|
|
processing_fps: float = 0.0 |
|
|
|
|
|
|
|
|
frames_total: int = 0 |
|
|
frames_processed: int = 0 |
|
|
frames_dropped: int = 0 |
|
|
frames_cached: int = 0 |
|
|
|
|
|
|
|
|
avg_quality_score: float = 0.0 |
|
|
min_quality_score: float = 1.0 |
|
|
max_quality_score: float = 0.0 |
|
|
|
|
|
|
|
|
cpu_usage: float = 0.0 |
|
|
gpu_usage: float = 0.0 |
|
|
memory_usage_mb: float = 0.0 |
|
|
|
|
|
|
|
|
error_count: int = 0 |
|
|
warnings: List[str] = field(default_factory=list) |
|
|
|
|
|
|
|
|
class VideoProcessorAPI: |
|
|
""" |
|
|
API wrapper for video processing with streaming and real-time capabilities. |
|
|
Extends CoreVideoProcessor with additional features. |
|
|
""" |
|
|
|
|
|
def __init__(self, core_processor: Optional[CoreVideoProcessor] = None): |
|
|
""" |
|
|
Initialize Video Processor API. |
|
|
|
|
|
Args: |
|
|
core_processor: Optional existing CoreVideoProcessor instance |
|
|
""" |
|
|
self.logger = setup_logger(f"{__name__}.VideoProcessorAPI") |
|
|
|
|
|
|
|
|
self.core_processor = core_processor |
|
|
self.pipeline = ProcessingPipeline(PipelineConfig(mode=ProcessingMode.VIDEO)) |
|
|
|
|
|
|
|
|
self.is_processing = False |
|
|
self.is_streaming = False |
|
|
self.should_stop = False |
|
|
|
|
|
|
|
|
self.stats = VideoStats() |
|
|
|
|
|
|
|
|
self.input_queue = Queue(maxsize=100) |
|
|
self.output_queue = Queue(maxsize=100) |
|
|
self.preview_queue = Queue(maxsize=10) |
|
|
|
|
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=8) |
|
|
self.stream_thread = None |
|
|
self.process_threads = [] |
|
|
|
|
|
|
|
|
self.ffmpeg_process = None |
|
|
|
|
|
|
|
|
self.webrtc_peers = {} |
|
|
|
|
|
self.logger.info("VideoProcessorAPI initialized") |
|
|
|
|
|
async def process_video_async(self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
background: Optional[Union[str, np.ndarray]] = None, |
|
|
progress_callback: Optional[Callable] = None) -> VideoStats: |
|
|
""" |
|
|
Asynchronously process a video file. |
|
|
|
|
|
Args: |
|
|
input_path: Path to input video |
|
|
output_path: Path to output video |
|
|
background: Background image or path |
|
|
progress_callback: Progress callback function |
|
|
|
|
|
Returns: |
|
|
Processing statistics |
|
|
""" |
|
|
return await asyncio.get_event_loop().run_in_executor( |
|
|
None, |
|
|
self.process_video, |
|
|
input_path, |
|
|
output_path, |
|
|
background, |
|
|
progress_callback |
|
|
) |
|
|
|
|
|
def process_video(self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
background: Optional[Union[str, np.ndarray]] = None, |
|
|
progress_callback: Optional[Callable] = None) -> VideoStats: |
|
|
""" |
|
|
Process a video file using either CoreVideoProcessor or Pipeline. |
|
|
|
|
|
Args: |
|
|
input_path: Path to input video |
|
|
output_path: Path to output video |
|
|
background: Background image or path |
|
|
progress_callback: Progress callback function |
|
|
|
|
|
Returns: |
|
|
Processing statistics |
|
|
""" |
|
|
self.stats = VideoStats(start_time=time.time()) |
|
|
self.is_processing = True |
|
|
|
|
|
try: |
|
|
|
|
|
if self.core_processor: |
|
|
return self._process_with_core( |
|
|
input_path, output_path, background, progress_callback |
|
|
) |
|
|
else: |
|
|
|
|
|
return self._process_with_pipeline( |
|
|
input_path, output_path, background, progress_callback |
|
|
) |
|
|
|
|
|
finally: |
|
|
self.is_processing = False |
|
|
self.stats.total_duration = time.time() - self.stats.start_time |
|
|
|
|
|
def _process_with_pipeline(self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
background: Optional[Union[str, np.ndarray]], |
|
|
progress_callback: Optional[Callable]) -> VideoStats: |
|
|
"""Process video using the Pipeline system.""" |
|
|
|
|
|
cap = cv2.VideoCapture(input_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video: {input_path}") |
|
|
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
self.stats.frames_total = total_frames |
|
|
|
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
frame_idx = 0 |
|
|
|
|
|
try: |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
result = self.pipeline.process_image(frame, background) |
|
|
|
|
|
if result.success and result.output_image is not None: |
|
|
out.write(result.output_image) |
|
|
self.stats.frames_processed += 1 |
|
|
|
|
|
|
|
|
self._update_quality_stats(result.quality_score) |
|
|
else: |
|
|
|
|
|
out.write(frame) |
|
|
self.stats.frames_dropped += 1 |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
progress = frame_idx / total_frames |
|
|
progress_callback(progress, { |
|
|
'current_frame': frame_idx, |
|
|
'total_frames': total_frames, |
|
|
'fps': self.stats.frames_processed / (time.time() - self.stats.start_time) |
|
|
}) |
|
|
|
|
|
|
|
|
if self.should_stop: |
|
|
break |
|
|
|
|
|
finally: |
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
self.stats.processing_fps = self.stats.frames_processed / (time.time() - self.stats.start_time) |
|
|
return self.stats |
|
|
|
|
|
def _process_with_core(self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
background: Optional[Union[str, np.ndarray]], |
|
|
progress_callback: Optional[Callable]) -> VideoStats: |
|
|
"""Process video using CoreVideoProcessor.""" |
|
|
|
|
|
|
|
|
if isinstance(background, str): |
|
|
if os.path.exists(background): |
|
|
bg_choice = "custom" |
|
|
custom_bg = background |
|
|
else: |
|
|
bg_choice = background |
|
|
custom_bg = None |
|
|
elif isinstance(background, np.ndarray): |
|
|
|
|
|
temp_bg = tempfile.NamedTemporaryFile(suffix='.png', delete=False) |
|
|
cv2.imwrite(temp_bg.name, background) |
|
|
bg_choice = "custom" |
|
|
custom_bg = temp_bg.name |
|
|
else: |
|
|
bg_choice = "blur" |
|
|
custom_bg = None |
|
|
|
|
|
|
|
|
output, message = self.core_processor.process_video( |
|
|
input_path, |
|
|
bg_choice, |
|
|
custom_bg, |
|
|
progress_callback |
|
|
) |
|
|
|
|
|
if output: |
|
|
|
|
|
shutil.move(output, output_path) |
|
|
|
|
|
|
|
|
core_stats = self.core_processor.stats |
|
|
self.stats.frames_processed = core_stats.get('successful_frames', 0) |
|
|
self.stats.frames_dropped = core_stats.get('failed_frames', 0) |
|
|
self.stats.processing_fps = core_stats.get('average_fps', 0) |
|
|
|
|
|
return self.stats |
|
|
|
|
|
def start_stream_processing(self, |
|
|
config: StreamConfig, |
|
|
background: Optional[Union[str, np.ndarray]] = None) -> bool: |
|
|
""" |
|
|
Start real-time stream processing. |
|
|
|
|
|
Args: |
|
|
config: Stream configuration |
|
|
background: Background for replacement |
|
|
|
|
|
Returns: |
|
|
True if stream started successfully |
|
|
""" |
|
|
if self.is_streaming: |
|
|
self.logger.warning("Stream already active") |
|
|
return False |
|
|
|
|
|
self.is_streaming = True |
|
|
self.should_stop = False |
|
|
|
|
|
|
|
|
self.stream_thread = threading.Thread( |
|
|
target=self._stream_input_handler, |
|
|
args=(config,) |
|
|
) |
|
|
self.stream_thread.start() |
|
|
|
|
|
|
|
|
for i in range(config.num_threads): |
|
|
thread = threading.Thread( |
|
|
target=self._stream_processor, |
|
|
args=(background,) |
|
|
) |
|
|
thread.start() |
|
|
self.process_threads.append(thread) |
|
|
|
|
|
|
|
|
if config.output_format in [OutputFormat.HLS, OutputFormat.DASH]: |
|
|
self._start_adaptive_streaming(config) |
|
|
else: |
|
|
self._start_output_handler(config) |
|
|
|
|
|
self.logger.info(f"Stream processing started: {config.stream_mode.value}") |
|
|
return True |
|
|
|
|
|
def _stream_input_handler(self, config: StreamConfig): |
|
|
"""Handle input stream capture.""" |
|
|
try: |
|
|
|
|
|
if config.stream_mode == VideoStreamMode.FILE: |
|
|
cap = cv2.VideoCapture(config.source) |
|
|
elif config.stream_mode == VideoStreamMode.WEBCAM: |
|
|
cap = cv2.VideoCapture(int(config.source)) |
|
|
elif config.stream_mode in [VideoStreamMode.RTSP, VideoStreamMode.HTTP]: |
|
|
cap = cv2.VideoCapture(config.source) |
|
|
elif config.stream_mode == VideoStreamMode.SCREEN: |
|
|
|
|
|
cap = self._setup_screen_capture() |
|
|
else: |
|
|
raise ValueError(f"Unsupported stream mode: {config.stream_mode}") |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise ValueError("Failed to open stream") |
|
|
|
|
|
frame_count = 0 |
|
|
|
|
|
while self.is_streaming and not self.should_stop: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
if config.stream_mode == VideoStreamMode.FILE: |
|
|
|
|
|
break |
|
|
else: |
|
|
|
|
|
time.sleep(0.1) |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
self.input_queue.put(frame, timeout=0.1) |
|
|
frame_count += 1 |
|
|
except: |
|
|
|
|
|
self.stats.frames_dropped += 1 |
|
|
|
|
|
|
|
|
if config.stream_mode != VideoStreamMode.FILE: |
|
|
time.sleep(1.0 / 30) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Stream input handler error: {e}") |
|
|
finally: |
|
|
self.is_streaming = False |
|
|
|
|
|
def _stream_processor(self, background: Optional[Union[str, np.ndarray]]): |
|
|
"""Process frames from input queue.""" |
|
|
while self.is_streaming or not self.input_queue.empty(): |
|
|
try: |
|
|
frame = self.input_queue.get(timeout=0.5) |
|
|
|
|
|
|
|
|
result = self.pipeline.process_image(frame, background) |
|
|
|
|
|
if result.success and result.output_image is not None: |
|
|
|
|
|
self.output_queue.put(result.output_image) |
|
|
|
|
|
|
|
|
self.stats.frames_processed += 1 |
|
|
self._update_quality_stats(result.quality_score) |
|
|
|
|
|
|
|
|
if not self.preview_queue.full(): |
|
|
preview = cv2.resize(result.output_image, None, fx=0.5, fy=0.5) |
|
|
try: |
|
|
self.preview_queue.put_nowait(preview) |
|
|
except: |
|
|
pass |
|
|
|
|
|
except Empty: |
|
|
continue |
|
|
except Exception as e: |
|
|
self.logger.error(f"Stream processor error: {e}") |
|
|
self.stats.error_count += 1 |
|
|
|
|
|
def _start_output_handler(self, config: StreamConfig): |
|
|
"""Start output stream handler.""" |
|
|
output_thread = threading.Thread( |
|
|
target=self._output_handler, |
|
|
args=(config,) |
|
|
) |
|
|
output_thread.start() |
|
|
self.process_threads.append(output_thread) |
|
|
|
|
|
def _output_handler(self, config: StreamConfig): |
|
|
"""Handle output stream writing.""" |
|
|
try: |
|
|
if config.output_format == OutputFormat.FRAMES: |
|
|
|
|
|
self._save_frames_output(config) |
|
|
else: |
|
|
|
|
|
self._save_video_output(config) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Output handler error: {e}") |
|
|
|
|
|
def _save_video_output(self, config: StreamConfig): |
|
|
"""Save processed frames to video file.""" |
|
|
out = None |
|
|
frame_count = 0 |
|
|
|
|
|
try: |
|
|
while self.is_streaming or not self.output_queue.empty(): |
|
|
try: |
|
|
frame = self.output_queue.get(timeout=0.5) |
|
|
|
|
|
|
|
|
if out is None: |
|
|
h, w = frame.shape[:2] |
|
|
fps = config.output_fps or 30.0 |
|
|
|
|
|
if config.output_format == OutputFormat.MP4: |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
elif config.output_format == OutputFormat.AVI: |
|
|
fourcc = cv2.VideoWriter_fourcc(*'XVID') |
|
|
else: |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
|
|
|
out = cv2.VideoWriter( |
|
|
config.output_path, |
|
|
fourcc, |
|
|
fps, |
|
|
(w, h) |
|
|
) |
|
|
|
|
|
out.write(frame) |
|
|
frame_count += 1 |
|
|
|
|
|
except Empty: |
|
|
continue |
|
|
|
|
|
finally: |
|
|
if out: |
|
|
out.release() |
|
|
self.logger.info(f"Saved {frame_count} frames to {config.output_path}") |
|
|
|
|
|
def _save_frames_output(self, config: StreamConfig): |
|
|
"""Save processed frames as individual images.""" |
|
|
output_dir = Path(config.output_path) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
frame_count = 0 |
|
|
|
|
|
while self.is_streaming or not self.output_queue.empty(): |
|
|
try: |
|
|
frame = self.output_queue.get(timeout=0.5) |
|
|
|
|
|
|
|
|
frame_path = output_dir / f"frame_{frame_count:06d}.png" |
|
|
cv2.imwrite(str(frame_path), frame) |
|
|
frame_count += 1 |
|
|
|
|
|
except Empty: |
|
|
continue |
|
|
|
|
|
def _start_adaptive_streaming(self, config: StreamConfig): |
|
|
"""Start HLS or DASH adaptive streaming.""" |
|
|
try: |
|
|
|
|
|
if config.output_format == OutputFormat.HLS: |
|
|
self._start_hls_streaming(config) |
|
|
elif config.output_format == OutputFormat.DASH: |
|
|
self._start_dash_streaming(config) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Adaptive streaming setup failed: {e}") |
|
|
|
|
|
def _start_hls_streaming(self, config: StreamConfig): |
|
|
"""Start HLS streaming with FFmpeg.""" |
|
|
output_dir = Path(config.output_path) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', |
|
|
'-f', 'rawvideo', |
|
|
'-pix_fmt', 'bgr24', |
|
|
'-s', '1920x1080', |
|
|
'-r', '30', |
|
|
'-i', '-', |
|
|
'-c:v', 'libx264', |
|
|
'-preset', 'ultrafast', |
|
|
'-tune', 'zerolatency', |
|
|
'-f', 'hls', |
|
|
'-hls_time', str(config.chunk_duration), |
|
|
'-hls_list_size', '10', |
|
|
'-hls_flags', 'delete_segments', |
|
|
str(output_dir / 'stream.m3u8') |
|
|
] |
|
|
|
|
|
|
|
|
self.ffmpeg_process = subprocess.Popen( |
|
|
cmd, |
|
|
stdin=subprocess.PIPE, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE |
|
|
) |
|
|
|
|
|
|
|
|
ffmpeg_thread = threading.Thread( |
|
|
target=self._pipe_to_ffmpeg |
|
|
) |
|
|
ffmpeg_thread.start() |
|
|
self.process_threads.append(ffmpeg_thread) |
|
|
|
|
|
self.logger.info(f"HLS streaming started: {output_dir / 'stream.m3u8'}") |
|
|
|
|
|
def _pipe_to_ffmpeg(self): |
|
|
"""Pipe processed frames to FFmpeg.""" |
|
|
while self.is_streaming or not self.output_queue.empty(): |
|
|
try: |
|
|
frame = self.output_queue.get(timeout=0.5) |
|
|
|
|
|
if self.ffmpeg_process and self.ffmpeg_process.stdin: |
|
|
self.ffmpeg_process.stdin.write(frame.tobytes()) |
|
|
|
|
|
except Empty: |
|
|
continue |
|
|
except Exception as e: |
|
|
self.logger.error(f"FFmpeg pipe error: {e}") |
|
|
break |
|
|
|
|
|
def _setup_screen_capture(self) -> cv2.VideoCapture: |
|
|
"""Setup screen capture (platform-specific).""" |
|
|
|
|
|
|
|
|
return cv2.VideoCapture(0) |
|
|
|
|
|
def _update_quality_stats(self, quality_score: float): |
|
|
"""Update quality statistics.""" |
|
|
n = self.stats.frames_processed |
|
|
if n == 0: |
|
|
self.stats.avg_quality_score = quality_score |
|
|
else: |
|
|
self.stats.avg_quality_score = ( |
|
|
(self.stats.avg_quality_score * n + quality_score) / (n + 1) |
|
|
) |
|
|
|
|
|
self.stats.min_quality_score = min(self.stats.min_quality_score, quality_score) |
|
|
self.stats.max_quality_score = max(self.stats.max_quality_score, quality_score) |
|
|
|
|
|
def stop_stream_processing(self): |
|
|
"""Stop stream processing.""" |
|
|
self.should_stop = True |
|
|
self.is_streaming = False |
|
|
|
|
|
|
|
|
if self.stream_thread: |
|
|
self.stream_thread.join(timeout=5) |
|
|
|
|
|
for thread in self.process_threads: |
|
|
thread.join(timeout=5) |
|
|
|
|
|
|
|
|
if self.ffmpeg_process: |
|
|
self.ffmpeg_process.terminate() |
|
|
self.ffmpeg_process.wait(timeout=5) |
|
|
|
|
|
self.logger.info("Stream processing stopped") |
|
|
|
|
|
def get_preview_frame(self) -> Optional[np.ndarray]: |
|
|
"""Get a preview frame from the preview queue.""" |
|
|
try: |
|
|
return self.preview_queue.get_nowait() |
|
|
except Empty: |
|
|
return None |
|
|
|
|
|
def get_stats(self) -> VideoStats: |
|
|
"""Get current processing statistics.""" |
|
|
if self.is_processing or self.is_streaming: |
|
|
self.stats.processing_fps = ( |
|
|
self.stats.frames_processed / |
|
|
(time.time() - self.stats.start_time) |
|
|
) |
|
|
return self.stats |
|
|
|
|
|
def process_video_batch(self, |
|
|
input_paths: List[str], |
|
|
output_dir: str, |
|
|
background: Optional[Union[str, np.ndarray]] = None, |
|
|
parallel: bool = True) -> List[VideoStats]: |
|
|
""" |
|
|
Process multiple videos in batch. |
|
|
|
|
|
Args: |
|
|
input_paths: List of input video paths |
|
|
output_dir: Output directory |
|
|
background: Background for all videos |
|
|
parallel: Process in parallel |
|
|
|
|
|
Returns: |
|
|
List of processing statistics |
|
|
""" |
|
|
output_dir = Path(output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
results = [] |
|
|
|
|
|
if parallel: |
|
|
|
|
|
futures = [] |
|
|
|
|
|
for input_path in input_paths: |
|
|
input_name = Path(input_path).stem |
|
|
output_path = output_dir / f"{input_name}_processed.mp4" |
|
|
|
|
|
future = self.executor.submit( |
|
|
self.process_video, |
|
|
input_path, |
|
|
str(output_path), |
|
|
background |
|
|
) |
|
|
futures.append(future) |
|
|
|
|
|
|
|
|
for future in as_completed(futures): |
|
|
try: |
|
|
stats = future.result(timeout=3600) |
|
|
results.append(stats) |
|
|
except Exception as e: |
|
|
self.logger.error(f"Batch processing error: {e}") |
|
|
results.append(VideoStats(error_count=1)) |
|
|
else: |
|
|
|
|
|
for input_path in input_paths: |
|
|
input_name = Path(input_path).stem |
|
|
output_path = output_dir / f"{input_name}_processed.mp4" |
|
|
|
|
|
stats = self.process_video( |
|
|
input_path, |
|
|
str(output_path), |
|
|
background |
|
|
) |
|
|
results.append(stats) |
|
|
|
|
|
return results |
|
|
|
|
|
def export_to_format(self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
format: OutputFormat, |
|
|
**kwargs) -> bool: |
|
|
""" |
|
|
Export processed video to specific format. |
|
|
|
|
|
Args: |
|
|
input_path: Input video path |
|
|
output_path: Output path |
|
|
format: Target format |
|
|
**kwargs: Format-specific options |
|
|
|
|
|
Returns: |
|
|
True if successful |
|
|
""" |
|
|
try: |
|
|
if format == OutputFormat.WEBM: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', input_path, |
|
|
'-c:v', 'libvpx-vp9', |
|
|
'-crf', '30', |
|
|
'-b:v', '0', |
|
|
output_path |
|
|
] |
|
|
elif format == OutputFormat.HLS: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', input_path, |
|
|
'-c:v', 'libx264', |
|
|
'-hls_time', '10', |
|
|
'-hls_list_size', '0', |
|
|
'-f', 'hls', |
|
|
output_path |
|
|
] |
|
|
else: |
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', '-i', input_path, |
|
|
'-c:v', 'libx264', |
|
|
'-preset', 'medium', |
|
|
'-crf', '23', |
|
|
output_path |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
return result.returncode == 0 |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Export failed: {e}") |
|
|
return False |
|
|
|
|
|
def cleanup(self): |
|
|
"""Cleanup resources.""" |
|
|
self.stop_stream_processing() |
|
|
self.executor.shutdown(wait=True) |
|
|
|
|
|
if self.core_processor: |
|
|
self.core_processor.cleanup() |
|
|
|
|
|
self.logger.info("VideoProcessorAPI cleanup complete") |