|
|
""" |
|
|
Core Utilities Module for BackgroundFX Pro |
|
|
Contains FileManager, VideoUtils, ImageUtils, and ValidationUtils |
|
|
""" |
|
|
|
|
|
|
|
|
import os |
|
|
if 'OMP_NUM_THREADS' not in os.environ: |
|
|
os.environ['OMP_NUM_THREADS'] = '4' |
|
|
os.environ['MKL_NUM_THREADS'] = '4' |
|
|
|
|
|
import shutil |
|
|
import tempfile |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Optional, List, Union, Tuple, Dict, Any |
|
|
from datetime import datetime |
|
|
import subprocess |
|
|
import re |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import torch |
|
|
from PIL import Image, ImageEnhance, ImageFilter, ImageDraw |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ValidationUtils: |
|
|
"""Validation utilities for BackgroundFX Pro application.""" |
|
|
|
|
|
|
|
|
SUPPORTED_VIDEO_FORMATS = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v'} |
|
|
SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} |
|
|
|
|
|
|
|
|
MAX_VIDEO_SIZE = 500 * 1024 * 1024 |
|
|
MAX_IMAGE_SIZE = 50 * 1024 * 1024 |
|
|
MIN_VIDEO_SIZE = 1024 |
|
|
|
|
|
|
|
|
MAX_VIDEO_DURATION = 300 |
|
|
MIN_VIDEO_DURATION = 1 |
|
|
MAX_RESOLUTION = (3840, 2160) |
|
|
MIN_RESOLUTION = (320, 240) |
|
|
MAX_FPS = 120 |
|
|
MIN_FPS = 10 |
|
|
|
|
|
@staticmethod |
|
|
def validate_video_file(file_path, check_content=False): |
|
|
""" |
|
|
Validate video file for processing. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the video file |
|
|
check_content: Whether to perform deep content validation |
|
|
|
|
|
Returns: |
|
|
tuple: (is_valid, error_message) |
|
|
""" |
|
|
from pathlib import Path |
|
|
|
|
|
if not file_path: |
|
|
return False, "No file path provided" |
|
|
|
|
|
path = Path(file_path) |
|
|
|
|
|
|
|
|
if not path.exists(): |
|
|
return False, f"File not found: {file_path}" |
|
|
|
|
|
|
|
|
if path.suffix.lower() not in ValidationUtils.SUPPORTED_VIDEO_FORMATS: |
|
|
return False, f"Unsupported video format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_VIDEO_FORMATS)}" |
|
|
|
|
|
|
|
|
file_size = path.stat().st_size |
|
|
if file_size > ValidationUtils.MAX_VIDEO_SIZE: |
|
|
size_mb = file_size / (1024 * 1024) |
|
|
return False, f"Video file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_VIDEO_SIZE / (1024 * 1024):.0f}MB)" |
|
|
|
|
|
if file_size < ValidationUtils.MIN_VIDEO_SIZE: |
|
|
return False, "Video file appears to be empty or corrupted" |
|
|
|
|
|
|
|
|
if check_content: |
|
|
try: |
|
|
cap = cv2.VideoCapture(str(file_path)) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
return False, "Unable to open video file - may be corrupted" |
|
|
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
|
|
|
duration = frame_count / fps if fps > 0 else 0 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
if duration > ValidationUtils.MAX_VIDEO_DURATION: |
|
|
return False, f"Video too long: {duration:.1f}s (max: {ValidationUtils.MAX_VIDEO_DURATION}s)" |
|
|
|
|
|
if duration < ValidationUtils.MIN_VIDEO_DURATION: |
|
|
return False, f"Video too short: {duration:.1f}s (min: {ValidationUtils.MIN_VIDEO_DURATION}s)" |
|
|
|
|
|
if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: |
|
|
return False, f"Video resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" |
|
|
|
|
|
if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: |
|
|
return False, f"Video resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" |
|
|
|
|
|
if fps > ValidationUtils.MAX_FPS: |
|
|
return False, f"Frame rate too high: {fps:.1f} fps (max: {ValidationUtils.MAX_FPS} fps)" |
|
|
|
|
|
if fps < ValidationUtils.MIN_FPS: |
|
|
return False, f"Frame rate too low: {fps:.1f} fps (min: {ValidationUtils.MIN_FPS} fps)" |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Error validating video content: {str(e)}" |
|
|
|
|
|
return True, "Video file is valid" |
|
|
|
|
|
@staticmethod |
|
|
def validate_image_file(file_path, check_content=False): |
|
|
""" |
|
|
Validate image file for background replacement. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the image file |
|
|
check_content: Whether to perform deep content validation |
|
|
|
|
|
Returns: |
|
|
tuple: (is_valid, error_message) |
|
|
""" |
|
|
from pathlib import Path |
|
|
|
|
|
if not file_path: |
|
|
return False, "No file path provided" |
|
|
|
|
|
path = Path(file_path) |
|
|
|
|
|
|
|
|
if not path.exists(): |
|
|
return False, f"File not found: {file_path}" |
|
|
|
|
|
|
|
|
if path.suffix.lower() not in ValidationUtils.SUPPORTED_IMAGE_FORMATS: |
|
|
return False, f"Unsupported image format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_IMAGE_FORMATS)}" |
|
|
|
|
|
|
|
|
file_size = path.stat().st_size |
|
|
if file_size > ValidationUtils.MAX_IMAGE_SIZE: |
|
|
size_mb = file_size / (1024 * 1024) |
|
|
return False, f"Image file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_IMAGE_SIZE / (1024 * 1024):.0f}MB)" |
|
|
|
|
|
|
|
|
if check_content: |
|
|
try: |
|
|
img = cv2.imread(str(file_path)) |
|
|
if img is None: |
|
|
return False, "Unable to read image file - may be corrupted" |
|
|
|
|
|
height, width = img.shape[:2] |
|
|
|
|
|
|
|
|
if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: |
|
|
return False, f"Image resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" |
|
|
|
|
|
if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: |
|
|
return False, f"Image resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Error validating image content: {str(e)}" |
|
|
|
|
|
return True, "Image file is valid" |
|
|
|
|
|
@staticmethod |
|
|
def validate_processing_params(params): |
|
|
""" |
|
|
Validate processing parameters. |
|
|
|
|
|
Args: |
|
|
params: Dictionary of processing parameters |
|
|
|
|
|
Returns: |
|
|
tuple: (is_valid, error_message) |
|
|
""" |
|
|
if not params: |
|
|
return False, "No parameters provided" |
|
|
|
|
|
|
|
|
if 'confidence_threshold' in params: |
|
|
conf = params['confidence_threshold'] |
|
|
if not isinstance(conf, (int, float)): |
|
|
return False, "Confidence threshold must be a number" |
|
|
if conf < 0 or conf > 1: |
|
|
return False, "Confidence threshold must be between 0 and 1" |
|
|
|
|
|
|
|
|
if 'mask_dilation' in params: |
|
|
dilation = params['mask_dilation'] |
|
|
if not isinstance(dilation, int): |
|
|
return False, "Mask dilation must be an integer" |
|
|
if dilation < 0 or dilation > 50: |
|
|
return False, "Mask dilation must be between 0 and 50" |
|
|
|
|
|
|
|
|
if 'edge_smoothing' in params: |
|
|
smooth = params['edge_smoothing'] |
|
|
if not isinstance(smooth, int): |
|
|
return False, "Edge smoothing must be an integer" |
|
|
if smooth < 0 or smooth > 100: |
|
|
return False, "Edge smoothing must be between 0 and 100" |
|
|
|
|
|
|
|
|
if 'color_adjustment' in params: |
|
|
color_adj = params['color_adjustment'] |
|
|
if not isinstance(color_adj, bool): |
|
|
return False, "Color adjustment must be a boolean" |
|
|
|
|
|
|
|
|
if 'output_quality' in params: |
|
|
quality = params['output_quality'] |
|
|
if not isinstance(quality, int): |
|
|
return False, "Output quality must be an integer" |
|
|
if quality < 1 or quality > 100: |
|
|
return False, "Output quality must be between 1 and 100" |
|
|
|
|
|
|
|
|
if 'processing_method' in params: |
|
|
method = params['processing_method'] |
|
|
valid_methods = {'sam2', 'matanyone', 'cv_fallback', 'auto'} |
|
|
if method not in valid_methods: |
|
|
return False, f"Invalid processing method. Must be one of: {', '.join(valid_methods)}" |
|
|
|
|
|
return True, "Parameters are valid" |
|
|
|
|
|
@staticmethod |
|
|
def validate_output_path(output_path, create_dirs=False): |
|
|
""" |
|
|
Validate output path for saving results. |
|
|
|
|
|
Args: |
|
|
output_path: Path where output will be saved |
|
|
create_dirs: Whether to create directories if they don't exist |
|
|
|
|
|
Returns: |
|
|
tuple: (is_valid, error_message) |
|
|
""" |
|
|
from pathlib import Path |
|
|
|
|
|
if not output_path: |
|
|
return False, "No output path provided" |
|
|
|
|
|
path = Path(output_path) |
|
|
parent_dir = path.parent |
|
|
|
|
|
|
|
|
if not parent_dir.exists(): |
|
|
if create_dirs: |
|
|
try: |
|
|
parent_dir.mkdir(parents=True, exist_ok=True) |
|
|
except Exception as e: |
|
|
return False, f"Failed to create output directory: {str(e)}" |
|
|
else: |
|
|
return False, f"Output directory does not exist: {parent_dir}" |
|
|
|
|
|
|
|
|
if not os.access(parent_dir, os.W_OK): |
|
|
return False, f"No write permission for directory: {parent_dir}" |
|
|
|
|
|
|
|
|
if path.exists(): |
|
|
if not os.access(path, os.W_OK): |
|
|
return False, f"Cannot overwrite existing file: {output_path}" |
|
|
|
|
|
return True, "Output path is valid" |
|
|
|
|
|
@staticmethod |
|
|
def sanitize_filename(filename): |
|
|
""" |
|
|
Sanitize filename to be safe for filesystem. |
|
|
|
|
|
Args: |
|
|
filename: Original filename |
|
|
|
|
|
Returns: |
|
|
str: Sanitized filename |
|
|
""" |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
path = Path(filename) |
|
|
stem = path.stem |
|
|
suffix = path.suffix |
|
|
|
|
|
|
|
|
|
|
|
stem = re.sub(r'[^\w\-_.]', '_', stem) |
|
|
|
|
|
|
|
|
stem = re.sub(r'_+', '_', stem) |
|
|
|
|
|
|
|
|
stem = stem.strip('_') |
|
|
|
|
|
|
|
|
if not stem: |
|
|
stem = 'output' |
|
|
|
|
|
|
|
|
max_length = 200 |
|
|
if len(stem) > max_length: |
|
|
stem = stem[:max_length] |
|
|
|
|
|
return f"{stem}{suffix}" |
|
|
|
|
|
@staticmethod |
|
|
def validate_memory_available(required_mb=1000): |
|
|
""" |
|
|
Check if sufficient memory is available. |
|
|
|
|
|
Args: |
|
|
required_mb: Required memory in megabytes |
|
|
|
|
|
Returns: |
|
|
tuple: (is_sufficient, available_mb, error_message) |
|
|
""" |
|
|
try: |
|
|
import psutil |
|
|
|
|
|
mem = psutil.virtual_memory() |
|
|
available_mb = mem.available / (1024 * 1024) |
|
|
|
|
|
if available_mb < required_mb: |
|
|
return False, available_mb, f"Insufficient memory: {available_mb:.0f}MB available, {required_mb:.0f}MB required" |
|
|
|
|
|
return True, available_mb, f"Sufficient memory available: {available_mb:.0f}MB" |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
return True, -1, "Memory check skipped (psutil not available)" |
|
|
except Exception as e: |
|
|
return True, -1, f"Memory check failed: {str(e)}" |
|
|
|
|
|
@staticmethod |
|
|
def validate_gpu_available(): |
|
|
""" |
|
|
Check if GPU is available for processing. |
|
|
|
|
|
Returns: |
|
|
tuple: (is_available, device_info) |
|
|
""" |
|
|
try: |
|
|
if torch.cuda.is_available(): |
|
|
device_name = torch.cuda.get_device_name(0) |
|
|
memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) |
|
|
return True, f"GPU available: {device_name} ({memory_gb:.1f}GB)" |
|
|
else: |
|
|
return False, "No GPU available - will use CPU" |
|
|
|
|
|
except ImportError: |
|
|
return False, "PyTorch not available for GPU check" |
|
|
except Exception as e: |
|
|
return False, f"GPU check failed: {str(e)}" |
|
|
|
|
|
@staticmethod |
|
|
def validate_url(url): |
|
|
""" |
|
|
Validate URL format. |
|
|
|
|
|
Args: |
|
|
url: URL string to validate |
|
|
|
|
|
Returns: |
|
|
tuple: (is_valid, error_message) |
|
|
""" |
|
|
if not url: |
|
|
return False, "No URL provided" |
|
|
|
|
|
|
|
|
url_pattern = re.compile( |
|
|
r'^https?://' |
|
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
|
|
r'localhost|' |
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
|
r'(?::\d+)?' |
|
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
|
|
|
|
if url_pattern.match(url): |
|
|
return True, "Valid URL" |
|
|
else: |
|
|
return False, "Invalid URL format" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FileManager: |
|
|
"""Manages file operations for BackgroundFX Pro""" |
|
|
|
|
|
def __init__(self, base_dir: Optional[str] = None): |
|
|
"""Initialize FileManager""" |
|
|
if base_dir: |
|
|
self.base_dir = Path(base_dir) |
|
|
else: |
|
|
self.base_dir = Path(tempfile.gettempdir()) / "backgroundfx_pro" |
|
|
|
|
|
self.base_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
self.uploads_dir = self.base_dir / "uploads" |
|
|
self.outputs_dir = self.base_dir / "outputs" |
|
|
self.temp_dir = self.base_dir / "temp" |
|
|
self.cache_dir = self.base_dir / "cache" |
|
|
|
|
|
for dir_path in [self.uploads_dir, self.outputs_dir, self.temp_dir, self.cache_dir]: |
|
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
logger.info(f"FileManager initialized with base directory: {self.base_dir}") |
|
|
|
|
|
def save_upload(self, file_path: Union[str, Path], filename: Optional[str] = None) -> Path: |
|
|
"""Save an uploaded file to the uploads directory""" |
|
|
file_path = Path(file_path) |
|
|
|
|
|
if filename: |
|
|
dest_path = self.uploads_dir / filename |
|
|
else: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
dest_path = self.uploads_dir / f"{timestamp}_{file_path.name}" |
|
|
|
|
|
shutil.copy2(file_path, dest_path) |
|
|
logger.info(f"Saved upload: {dest_path}") |
|
|
return dest_path |
|
|
|
|
|
def create_output_path(self, filename: str, subfolder: Optional[str] = None) -> Path: |
|
|
"""Create a path for an output file""" |
|
|
if subfolder: |
|
|
output_dir = self.outputs_dir / subfolder |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
else: |
|
|
output_dir = self.outputs_dir |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
name_parts = filename.rsplit('.', 1) |
|
|
if len(name_parts) == 2: |
|
|
output_path = output_dir / f"{name_parts[0]}_{timestamp}.{name_parts[1]}" |
|
|
else: |
|
|
output_path = output_dir / f"{filename}_{timestamp}" |
|
|
|
|
|
return output_path |
|
|
|
|
|
def get_temp_path(self, filename: Optional[str] = None, extension: str = ".tmp") -> Path: |
|
|
"""Get a temporary file path""" |
|
|
if filename: |
|
|
temp_path = self.temp_dir / filename |
|
|
else: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
|
|
temp_path = self.temp_dir / f"temp_{timestamp}{extension}" |
|
|
|
|
|
return temp_path |
|
|
|
|
|
def cleanup_temp(self, max_age_hours: int = 24): |
|
|
"""Clean up old temporary files""" |
|
|
try: |
|
|
current_time = datetime.now().timestamp() |
|
|
max_age_seconds = max_age_hours * 3600 |
|
|
|
|
|
for temp_file in self.temp_dir.iterdir(): |
|
|
if temp_file.is_file(): |
|
|
file_age = current_time - temp_file.stat().st_mtime |
|
|
if file_age > max_age_seconds: |
|
|
temp_file.unlink() |
|
|
logger.debug(f"Deleted old temp file: {temp_file}") |
|
|
|
|
|
logger.info("Temp directory cleanup completed") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error during temp cleanup: {e}") |
|
|
|
|
|
def get_cache_path(self, key: str, extension: str = ".cache") -> Path: |
|
|
"""Get a cache file path based on a key""" |
|
|
safe_key = "".join(c if c.isalnum() or c in '-_' else '_' for c in key) |
|
|
return self.cache_dir / f"{safe_key}{extension}" |
|
|
|
|
|
def list_outputs(self, subfolder: Optional[str] = None, extension: Optional[str] = None) -> List[Path]: |
|
|
"""List output files""" |
|
|
if subfolder: |
|
|
search_dir = self.outputs_dir / subfolder |
|
|
else: |
|
|
search_dir = self.outputs_dir |
|
|
|
|
|
if not search_dir.exists(): |
|
|
return [] |
|
|
|
|
|
if extension: |
|
|
pattern = f"*{extension}" |
|
|
else: |
|
|
pattern = "*" |
|
|
|
|
|
return sorted(search_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True) |
|
|
|
|
|
def delete_file(self, file_path: Union[str, Path]) -> bool: |
|
|
"""Safely delete a file""" |
|
|
try: |
|
|
file_path = Path(file_path) |
|
|
if file_path.exists() and file_path.is_file(): |
|
|
file_path.unlink() |
|
|
logger.info(f"Deleted file: {file_path}") |
|
|
return True |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Error deleting file {file_path}: {e}") |
|
|
return False |
|
|
|
|
|
def get_file_info(self, file_path: Union[str, Path]) -> dict: |
|
|
"""Get information about a file""" |
|
|
file_path = Path(file_path) |
|
|
|
|
|
if not file_path.exists(): |
|
|
return {"exists": False} |
|
|
|
|
|
stat = file_path.stat() |
|
|
return { |
|
|
"exists": True, |
|
|
"name": file_path.name, |
|
|
"size": stat.st_size, |
|
|
"size_mb": stat.st_size / (1024 * 1024), |
|
|
"created": datetime.fromtimestamp(stat.st_ctime), |
|
|
"modified": datetime.fromtimestamp(stat.st_mtime), |
|
|
"extension": file_path.suffix, |
|
|
"path": str(file_path.absolute()) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoUtils: |
|
|
"""Utilities for video processing""" |
|
|
|
|
|
@staticmethod |
|
|
def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]: |
|
|
"""Get detailed video information""" |
|
|
video_path = str(video_path) |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
logger.error(f"Failed to open video: {video_path}") |
|
|
return {"error": "Failed to open video"} |
|
|
|
|
|
try: |
|
|
info = { |
|
|
"fps": cap.get(cv2.CAP_PROP_FPS), |
|
|
"frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
|
|
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
|
|
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
|
|
"codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))), |
|
|
"duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0 |
|
|
} |
|
|
|
|
|
path = Path(video_path) |
|
|
if path.exists(): |
|
|
info["file_size_mb"] = path.stat().st_size / (1024 * 1024) |
|
|
|
|
|
return info |
|
|
|
|
|
finally: |
|
|
cap.release() |
|
|
|
|
|
@staticmethod |
|
|
def _fourcc_to_string(fourcc: int) -> str: |
|
|
"""Convert fourcc code to string""" |
|
|
return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) |
|
|
|
|
|
@staticmethod |
|
|
def extract_frames(video_path: Union[str, Path], |
|
|
output_dir: Union[str, Path], |
|
|
frame_interval: int = 1, |
|
|
max_frames: Optional[int] = None) -> List[Path]: |
|
|
"""Extract frames from video""" |
|
|
video_path = str(video_path) |
|
|
output_dir = Path(output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
logger.error(f"Failed to open video: {video_path}") |
|
|
return [] |
|
|
|
|
|
frame_paths = [] |
|
|
frame_count = 0 |
|
|
extracted_count = 0 |
|
|
|
|
|
try: |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if frame_count % frame_interval == 0: |
|
|
frame_path = output_dir / f"frame_{frame_count:06d}.png" |
|
|
cv2.imwrite(str(frame_path), frame) |
|
|
frame_paths.append(frame_path) |
|
|
extracted_count += 1 |
|
|
|
|
|
if max_frames and extracted_count >= max_frames: |
|
|
break |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
logger.info(f"Extracted {len(frame_paths)} frames from video") |
|
|
return frame_paths |
|
|
|
|
|
finally: |
|
|
cap.release() |
|
|
|
|
|
@staticmethod |
|
|
def create_video_from_frames(frame_paths: List[Union[str, Path]], |
|
|
output_path: Union[str, Path], |
|
|
fps: float = 30.0, |
|
|
codec: str = 'mp4v') -> bool: |
|
|
"""Create video from frame images""" |
|
|
if not frame_paths: |
|
|
logger.error("No frames provided") |
|
|
return False |
|
|
|
|
|
first_frame = cv2.imread(str(frame_paths[0])) |
|
|
if first_frame is None: |
|
|
logger.error(f"Failed to read first frame: {frame_paths[0]}") |
|
|
return False |
|
|
|
|
|
height, width, layers = first_frame.shape |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*codec) |
|
|
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) |
|
|
|
|
|
try: |
|
|
for frame_path in frame_paths: |
|
|
frame = cv2.imread(str(frame_path)) |
|
|
if frame is not None: |
|
|
out.write(frame) |
|
|
else: |
|
|
logger.warning(f"Failed to read frame: {frame_path}") |
|
|
|
|
|
logger.info(f"Created video: {output_path}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating video: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
out.release() |
|
|
|
|
|
@staticmethod |
|
|
def resize_video(input_path: Union[str, Path], |
|
|
output_path: Union[str, Path], |
|
|
target_width: Optional[int] = None, |
|
|
target_height: Optional[int] = None, |
|
|
maintain_aspect: bool = True) -> bool: |
|
|
"""Resize video to target dimensions""" |
|
|
cap = cv2.VideoCapture(str(input_path)) |
|
|
if not cap.isOpened(): |
|
|
logger.error(f"Failed to open video: {input_path}") |
|
|
return False |
|
|
|
|
|
orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) |
|
|
|
|
|
if maintain_aspect: |
|
|
if target_width and not target_height: |
|
|
aspect = orig_width / orig_height |
|
|
target_height = int(target_width / aspect) |
|
|
elif target_height and not target_width: |
|
|
aspect = orig_width / orig_height |
|
|
target_width = int(target_height * aspect) |
|
|
|
|
|
if not target_width: |
|
|
target_width = orig_width |
|
|
if not target_height: |
|
|
target_height = orig_height |
|
|
|
|
|
out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height)) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
resized = cv2.resize(frame, (target_width, target_height)) |
|
|
out.write(resized) |
|
|
|
|
|
logger.info(f"Resized video saved to: {output_path}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error resizing video: {e}") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
@staticmethod |
|
|
def extract_audio(video_path: Union[str, Path], |
|
|
audio_path: Union[str, Path]) -> bool: |
|
|
"""Extract audio from video using ffmpeg""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', str(video_path), |
|
|
'-vn', '-acodec', 'copy', |
|
|
str(audio_path), '-y' |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
|
|
if result.returncode == 0: |
|
|
logger.info(f"Audio extracted to: {audio_path}") |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Failed to extract audio: {result.stderr}") |
|
|
return False |
|
|
|
|
|
except FileNotFoundError: |
|
|
logger.error("ffmpeg not found. Please install ffmpeg.") |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting audio: {e}") |
|
|
return False |
|
|
|
|
|
@staticmethod |
|
|
def add_audio_to_video(video_path: Union[str, Path], |
|
|
audio_path: Union[str, Path], |
|
|
output_path: Union[str, Path]) -> bool: |
|
|
"""Add audio track to video using ffmpeg""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', str(video_path), |
|
|
'-i', str(audio_path), |
|
|
'-c:v', 'copy', '-c:a', 'aac', |
|
|
'-map', '0:v:0', '-map', '1:a:0', |
|
|
str(output_path), '-y' |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
|
|
if result.returncode == 0: |
|
|
logger.info(f"Video with audio saved to: {output_path}") |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Failed to add audio: {result.stderr}") |
|
|
return False |
|
|
|
|
|
except FileNotFoundError: |
|
|
logger.error("ffmpeg not found. Please install ffmpeg.") |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Error adding audio: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageUtils: |
|
|
"""Utilities for image processing and manipulation""" |
|
|
|
|
|
@staticmethod |
|
|
def load_image(image_path: Union[str, Path]) -> Optional[Image.Image]: |
|
|
"""Load an image using PIL""" |
|
|
try: |
|
|
return Image.open(str(image_path)) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load image {image_path}: {e}") |
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
def resize_image(image: Image.Image, |
|
|
max_width: Optional[int] = None, |
|
|
max_height: Optional[int] = None, |
|
|
maintain_aspect: bool = True) -> Image.Image: |
|
|
"""Resize image to fit within max dimensions""" |
|
|
if not max_width and not max_height: |
|
|
return image |
|
|
|
|
|
width, height = image.size |
|
|
|
|
|
if maintain_aspect: |
|
|
scale = 1.0 |
|
|
if max_width: |
|
|
scale = min(scale, max_width / width) |
|
|
if max_height: |
|
|
scale = min(scale, max_height / height) |
|
|
|
|
|
new_width = int(width * scale) |
|
|
new_height = int(height * scale) |
|
|
else: |
|
|
new_width = max_width or width |
|
|
new_height = max_height or height |
|
|
|
|
|
return image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
|
|
|
|
|
@staticmethod |
|
|
def convert_to_cv2(pil_image: Image.Image) -> np.ndarray: |
|
|
"""Convert PIL Image to OpenCV format""" |
|
|
if pil_image.mode != 'RGB': |
|
|
pil_image = pil_image.convert('RGB') |
|
|
|
|
|
np_image = np.array(pil_image) |
|
|
return cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
@staticmethod |
|
|
def convert_from_cv2(cv2_image: np.ndarray) -> Image.Image: |
|
|
"""Convert OpenCV image to PIL format""" |
|
|
rgb_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB) |
|
|
return Image.fromarray(rgb_image) |
|
|
|
|
|
@staticmethod |
|
|
def apply_blur(image: Image.Image, radius: float = 5.0) -> Image.Image: |
|
|
"""Apply Gaussian blur to image""" |
|
|
return image.filter(ImageFilter.GaussianBlur(radius=radius)) |
|
|
|
|
|
@staticmethod |
|
|
def adjust_brightness(image: Image.Image, factor: float = 1.0) -> Image.Image: |
|
|
"""Adjust image brightness""" |
|
|
enhancer = ImageEnhance.Brightness(image) |
|
|
return enhancer.enhance(factor) |
|
|
|
|
|
@staticmethod |
|
|
def adjust_contrast(image: Image.Image, factor: float = 1.0) -> Image.Image: |
|
|
"""Adjust image contrast""" |
|
|
enhancer = ImageEnhance.Contrast(image) |
|
|
return enhancer.enhance(factor) |
|
|
|
|
|
@staticmethod |
|
|
def adjust_saturation(image: Image.Image, factor: float = 1.0) -> Image.Image: |
|
|
"""Adjust image saturation""" |
|
|
enhancer = ImageEnhance.Color(image) |
|
|
return enhancer.enhance(factor) |
|
|
|
|
|
@staticmethod |
|
|
def crop_center(image: Image.Image, crop_width: int, crop_height: int) -> Image.Image: |
|
|
"""Crop image from center""" |
|
|
width, height = image.size |
|
|
|
|
|
left = (width - crop_width) // 2 |
|
|
top = (height - crop_height) // 2 |
|
|
right = left + crop_width |
|
|
bottom = top + crop_height |
|
|
|
|
|
return image.crop((left, top, right, bottom)) |
|
|
|
|
|
@staticmethod |
|
|
def create_thumbnail(image: Image.Image, size: Tuple[int, int] = (128, 128)) -> Image.Image: |
|
|
"""Create thumbnail preserving aspect ratio""" |
|
|
img_copy = image.copy() |
|
|
img_copy.thumbnail(size, Image.Resampling.LANCZOS) |
|
|
return img_copy |
|
|
|
|
|
@staticmethod |
|
|
def apply_mask(image: Image.Image, mask: Image.Image, alpha: float = 1.0) -> Image.Image: |
|
|
"""Apply mask to image""" |
|
|
if image.mode != 'RGBA': |
|
|
image = image.convert('RGBA') |
|
|
|
|
|
if mask.mode != 'L': |
|
|
mask = mask.convert('L') |
|
|
|
|
|
if mask.size != image.size: |
|
|
mask = mask.resize(image.size, Image.Resampling.LANCZOS) |
|
|
|
|
|
if alpha < 1.0: |
|
|
mask = ImageEnhance.Brightness(mask).enhance(alpha) |
|
|
|
|
|
image.putalpha(mask) |
|
|
return image |
|
|
|
|
|
@staticmethod |
|
|
def composite_images(foreground: Image.Image, |
|
|
background: Image.Image, |
|
|
position: Tuple[int, int] = (0, 0), |
|
|
alpha: float = 1.0) -> Image.Image: |
|
|
"""Composite foreground image over background""" |
|
|
if foreground.mode != 'RGBA': |
|
|
foreground = foreground.convert('RGBA') |
|
|
if background.mode != 'RGBA': |
|
|
background = background.convert('RGBA') |
|
|
|
|
|
if alpha < 1.0: |
|
|
foreground = foreground.copy() |
|
|
foreground.putalpha( |
|
|
ImageEnhance.Brightness(foreground.split()[3]).enhance(alpha) |
|
|
) |
|
|
|
|
|
output = background.copy() |
|
|
output.paste(foreground, position, foreground) |
|
|
|
|
|
return output |
|
|
|
|
|
@staticmethod |
|
|
def get_image_info(image_path: Union[str, Path]) -> Dict[str, Any]: |
|
|
"""Get image file information""" |
|
|
try: |
|
|
image_path = Path(image_path) |
|
|
|
|
|
if not image_path.exists(): |
|
|
return {"exists": False} |
|
|
|
|
|
with Image.open(str(image_path)) as img: |
|
|
info = { |
|
|
"exists": True, |
|
|
"filename": image_path.name, |
|
|
"format": img.format, |
|
|
"mode": img.mode, |
|
|
"size": img.size, |
|
|
"width": img.width, |
|
|
"height": img.height, |
|
|
"file_size_mb": image_path.stat().st_size / (1024 * 1024) |
|
|
} |
|
|
|
|
|
if hasattr(img, '_getexif') and img._getexif(): |
|
|
info["has_exif"] = True |
|
|
else: |
|
|
info["has_exif"] = False |
|
|
|
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting image info for {image_path}: {e}") |
|
|
return {"exists": False, "error": str(e)} |
|
|
|
|
|
@staticmethod |
|
|
def save_image(image: Image.Image, |
|
|
output_path: Union[str, Path], |
|
|
quality: int = 95, |
|
|
optimize: bool = True) -> bool: |
|
|
"""Save image with specified quality""" |
|
|
try: |
|
|
output_path = Path(output_path) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
save_kwargs = {} |
|
|
ext = output_path.suffix.lower() |
|
|
|
|
|
if ext in ['.jpg', '.jpeg']: |
|
|
save_kwargs['quality'] = quality |
|
|
save_kwargs['optimize'] = optimize |
|
|
elif ext == '.png': |
|
|
save_kwargs['optimize'] = optimize |
|
|
|
|
|
image.save(str(output_path), **save_kwargs) |
|
|
logger.info(f"Saved image to: {output_path}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save image to {output_path}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_video_file(file_path: str) -> tuple: |
|
|
"""Validate if file is a valid video file.""" |
|
|
import os |
|
|
import cv2 |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
return False, f"File not found: {file_path}" |
|
|
|
|
|
try: |
|
|
cap = cv2.VideoCapture(file_path) |
|
|
ret = cap.isOpened() |
|
|
cap.release() |
|
|
if ret: |
|
|
return True, "Video file is valid" |
|
|
else: |
|
|
return False, "Unable to open video file - may be corrupted" |
|
|
except Exception as e: |
|
|
return False, f"Error validating video: {str(e)}" |
|
|
|