MogensR's picture
Update utils/utils.py
55a34dc
"""
Core Utilities Module for BackgroundFX Pro
Contains FileManager, VideoUtils, ImageUtils, and ValidationUtils
"""
# Set OMP_NUM_THREADS at the very beginning to prevent libgomp errors
import os
if 'OMP_NUM_THREADS' not in os.environ:
os.environ['OMP_NUM_THREADS'] = '4'
os.environ['MKL_NUM_THREADS'] = '4'
import shutil
import tempfile
import logging
from pathlib import Path
from typing import Optional, List, Union, Tuple, Dict, Any
from datetime import datetime
import subprocess
import re
import cv2
import numpy as np
import torch
from PIL import Image, ImageEnhance, ImageFilter, ImageDraw
logger = logging.getLogger(__name__)
# ============================================================================
# VALIDATION UTILS CLASS
# ============================================================================
class ValidationUtils:
"""Validation utilities for BackgroundFX Pro application."""
# Supported formats
SUPPORTED_VIDEO_FORMATS = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v'}
SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
# Size limits (in bytes)
MAX_VIDEO_SIZE = 500 * 1024 * 1024 # 500MB
MAX_IMAGE_SIZE = 50 * 1024 * 1024 # 50MB
MIN_VIDEO_SIZE = 1024 # 1KB (to avoid empty files)
# Video constraints
MAX_VIDEO_DURATION = 300 # 5 minutes in seconds
MIN_VIDEO_DURATION = 1 # 1 second minimum
MAX_RESOLUTION = (3840, 2160) # 4K
MIN_RESOLUTION = (320, 240) # Minimum reasonable resolution
MAX_FPS = 120
MIN_FPS = 10
@staticmethod
def validate_video_file(file_path, check_content=False):
"""
Validate video file for processing.
Args:
file_path: Path to the video file
check_content: Whether to perform deep content validation
Returns:
tuple: (is_valid, error_message)
"""
from pathlib import Path
if not file_path:
return False, "No file path provided"
path = Path(file_path)
# Check if file exists
if not path.exists():
return False, f"File not found: {file_path}"
# Check file extension
if path.suffix.lower() not in ValidationUtils.SUPPORTED_VIDEO_FORMATS:
return False, f"Unsupported video format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_VIDEO_FORMATS)}"
# Check file size
file_size = path.stat().st_size
if file_size > ValidationUtils.MAX_VIDEO_SIZE:
size_mb = file_size / (1024 * 1024)
return False, f"Video file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_VIDEO_SIZE / (1024 * 1024):.0f}MB)"
if file_size < ValidationUtils.MIN_VIDEO_SIZE:
return False, "Video file appears to be empty or corrupted"
# Deep content validation if requested
if check_content:
try:
cap = cv2.VideoCapture(str(file_path))
if not cap.isOpened():
return False, "Unable to open video file - may be corrupted"
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Calculate duration
duration = frame_count / fps if fps > 0 else 0
cap.release()
# Validate properties
if duration > ValidationUtils.MAX_VIDEO_DURATION:
return False, f"Video too long: {duration:.1f}s (max: {ValidationUtils.MAX_VIDEO_DURATION}s)"
if duration < ValidationUtils.MIN_VIDEO_DURATION:
return False, f"Video too short: {duration:.1f}s (min: {ValidationUtils.MIN_VIDEO_DURATION}s)"
if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]:
return False, f"Video resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})"
if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]:
return False, f"Video resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})"
if fps > ValidationUtils.MAX_FPS:
return False, f"Frame rate too high: {fps:.1f} fps (max: {ValidationUtils.MAX_FPS} fps)"
if fps < ValidationUtils.MIN_FPS:
return False, f"Frame rate too low: {fps:.1f} fps (min: {ValidationUtils.MIN_FPS} fps)"
except Exception as e:
return False, f"Error validating video content: {str(e)}"
return True, "Video file is valid"
@staticmethod
def validate_image_file(file_path, check_content=False):
"""
Validate image file for background replacement.
Args:
file_path: Path to the image file
check_content: Whether to perform deep content validation
Returns:
tuple: (is_valid, error_message)
"""
from pathlib import Path
if not file_path:
return False, "No file path provided"
path = Path(file_path)
# Check if file exists
if not path.exists():
return False, f"File not found: {file_path}"
# Check file extension
if path.suffix.lower() not in ValidationUtils.SUPPORTED_IMAGE_FORMATS:
return False, f"Unsupported image format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_IMAGE_FORMATS)}"
# Check file size
file_size = path.stat().st_size
if file_size > ValidationUtils.MAX_IMAGE_SIZE:
size_mb = file_size / (1024 * 1024)
return False, f"Image file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_IMAGE_SIZE / (1024 * 1024):.0f}MB)"
# Deep content validation if requested
if check_content:
try:
img = cv2.imread(str(file_path))
if img is None:
return False, "Unable to read image file - may be corrupted"
height, width = img.shape[:2]
# Check dimensions
if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]:
return False, f"Image resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})"
if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]:
return False, f"Image resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})"
except Exception as e:
return False, f"Error validating image content: {str(e)}"
return True, "Image file is valid"
@staticmethod
def validate_processing_params(params):
"""
Validate processing parameters.
Args:
params: Dictionary of processing parameters
Returns:
tuple: (is_valid, error_message)
"""
if not params:
return False, "No parameters provided"
# Validate confidence threshold
if 'confidence_threshold' in params:
conf = params['confidence_threshold']
if not isinstance(conf, (int, float)):
return False, "Confidence threshold must be a number"
if conf < 0 or conf > 1:
return False, "Confidence threshold must be between 0 and 1"
# Validate mask dilation
if 'mask_dilation' in params:
dilation = params['mask_dilation']
if not isinstance(dilation, int):
return False, "Mask dilation must be an integer"
if dilation < 0 or dilation > 50:
return False, "Mask dilation must be between 0 and 50"
# Validate edge smoothing
if 'edge_smoothing' in params:
smooth = params['edge_smoothing']
if not isinstance(smooth, int):
return False, "Edge smoothing must be an integer"
if smooth < 0 or smooth > 100:
return False, "Edge smoothing must be between 0 and 100"
# Validate color adjustment
if 'color_adjustment' in params:
color_adj = params['color_adjustment']
if not isinstance(color_adj, bool):
return False, "Color adjustment must be a boolean"
# Validate output quality
if 'output_quality' in params:
quality = params['output_quality']
if not isinstance(quality, int):
return False, "Output quality must be an integer"
if quality < 1 or quality > 100:
return False, "Output quality must be between 1 and 100"
# Validate processing method
if 'processing_method' in params:
method = params['processing_method']
valid_methods = {'sam2', 'matanyone', 'cv_fallback', 'auto'}
if method not in valid_methods:
return False, f"Invalid processing method. Must be one of: {', '.join(valid_methods)}"
return True, "Parameters are valid"
@staticmethod
def validate_output_path(output_path, create_dirs=False):
"""
Validate output path for saving results.
Args:
output_path: Path where output will be saved
create_dirs: Whether to create directories if they don't exist
Returns:
tuple: (is_valid, error_message)
"""
from pathlib import Path
if not output_path:
return False, "No output path provided"
path = Path(output_path)
parent_dir = path.parent
# Check if parent directory exists
if not parent_dir.exists():
if create_dirs:
try:
parent_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
return False, f"Failed to create output directory: {str(e)}"
else:
return False, f"Output directory does not exist: {parent_dir}"
# Check write permissions
if not os.access(parent_dir, os.W_OK):
return False, f"No write permission for directory: {parent_dir}"
# Check if file already exists
if path.exists():
if not os.access(path, os.W_OK):
return False, f"Cannot overwrite existing file: {output_path}"
return True, "Output path is valid"
@staticmethod
def sanitize_filename(filename):
"""
Sanitize filename to be safe for filesystem.
Args:
filename: Original filename
Returns:
str: Sanitized filename
"""
from pathlib import Path
# Get the stem and suffix separately
path = Path(filename)
stem = path.stem
suffix = path.suffix
# Remove or replace invalid characters
# Keep only alphanumeric, dash, underscore, and dot
stem = re.sub(r'[^\w\-_.]', '_', stem)
# Remove multiple underscores
stem = re.sub(r'_+', '_', stem)
# Remove leading/trailing underscores
stem = stem.strip('_')
# Ensure filename is not empty
if not stem:
stem = 'output'
# Limit length (keep it reasonable for most filesystems)
max_length = 200
if len(stem) > max_length:
stem = stem[:max_length]
return f"{stem}{suffix}"
@staticmethod
def validate_memory_available(required_mb=1000):
"""
Check if sufficient memory is available.
Args:
required_mb: Required memory in megabytes
Returns:
tuple: (is_sufficient, available_mb, error_message)
"""
try:
import psutil
mem = psutil.virtual_memory()
available_mb = mem.available / (1024 * 1024)
if available_mb < required_mb:
return False, available_mb, f"Insufficient memory: {available_mb:.0f}MB available, {required_mb:.0f}MB required"
return True, available_mb, f"Sufficient memory available: {available_mb:.0f}MB"
except ImportError:
# If psutil not available, assume sufficient memory
return True, -1, "Memory check skipped (psutil not available)"
except Exception as e:
return True, -1, f"Memory check failed: {str(e)}"
@staticmethod
def validate_gpu_available():
"""
Check if GPU is available for processing.
Returns:
tuple: (is_available, device_info)
"""
try:
if torch.cuda.is_available():
device_name = torch.cuda.get_device_name(0)
memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3)
return True, f"GPU available: {device_name} ({memory_gb:.1f}GB)"
else:
return False, "No GPU available - will use CPU"
except ImportError:
return False, "PyTorch not available for GPU check"
except Exception as e:
return False, f"GPU check failed: {str(e)}"
@staticmethod
def validate_url(url):
"""
Validate URL format.
Args:
url: URL string to validate
Returns:
tuple: (is_valid, error_message)
"""
if not url:
return False, "No URL provided"
# Basic URL pattern
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if url_pattern.match(url):
return True, "Valid URL"
else:
return False, "Invalid URL format"
# ============================================================================
# FILE MANAGER CLASS
# ============================================================================
class FileManager:
"""Manages file operations for BackgroundFX Pro"""
def __init__(self, base_dir: Optional[str] = None):
"""Initialize FileManager"""
if base_dir:
self.base_dir = Path(base_dir)
else:
self.base_dir = Path(tempfile.gettempdir()) / "backgroundfx_pro"
self.base_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories
self.uploads_dir = self.base_dir / "uploads"
self.outputs_dir = self.base_dir / "outputs"
self.temp_dir = self.base_dir / "temp"
self.cache_dir = self.base_dir / "cache"
for dir_path in [self.uploads_dir, self.outputs_dir, self.temp_dir, self.cache_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
logger.info(f"FileManager initialized with base directory: {self.base_dir}")
def save_upload(self, file_path: Union[str, Path], filename: Optional[str] = None) -> Path:
"""Save an uploaded file to the uploads directory"""
file_path = Path(file_path)
if filename:
dest_path = self.uploads_dir / filename
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dest_path = self.uploads_dir / f"{timestamp}_{file_path.name}"
shutil.copy2(file_path, dest_path)
logger.info(f"Saved upload: {dest_path}")
return dest_path
def create_output_path(self, filename: str, subfolder: Optional[str] = None) -> Path:
"""Create a path for an output file"""
if subfolder:
output_dir = self.outputs_dir / subfolder
output_dir.mkdir(parents=True, exist_ok=True)
else:
output_dir = self.outputs_dir
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
name_parts = filename.rsplit('.', 1)
if len(name_parts) == 2:
output_path = output_dir / f"{name_parts[0]}_{timestamp}.{name_parts[1]}"
else:
output_path = output_dir / f"{filename}_{timestamp}"
return output_path
def get_temp_path(self, filename: Optional[str] = None, extension: str = ".tmp") -> Path:
"""Get a temporary file path"""
if filename:
temp_path = self.temp_dir / filename
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
temp_path = self.temp_dir / f"temp_{timestamp}{extension}"
return temp_path
def cleanup_temp(self, max_age_hours: int = 24):
"""Clean up old temporary files"""
try:
current_time = datetime.now().timestamp()
max_age_seconds = max_age_hours * 3600
for temp_file in self.temp_dir.iterdir():
if temp_file.is_file():
file_age = current_time - temp_file.stat().st_mtime
if file_age > max_age_seconds:
temp_file.unlink()
logger.debug(f"Deleted old temp file: {temp_file}")
logger.info("Temp directory cleanup completed")
except Exception as e:
logger.warning(f"Error during temp cleanup: {e}")
def get_cache_path(self, key: str, extension: str = ".cache") -> Path:
"""Get a cache file path based on a key"""
safe_key = "".join(c if c.isalnum() or c in '-_' else '_' for c in key)
return self.cache_dir / f"{safe_key}{extension}"
def list_outputs(self, subfolder: Optional[str] = None, extension: Optional[str] = None) -> List[Path]:
"""List output files"""
if subfolder:
search_dir = self.outputs_dir / subfolder
else:
search_dir = self.outputs_dir
if not search_dir.exists():
return []
if extension:
pattern = f"*{extension}"
else:
pattern = "*"
return sorted(search_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
def delete_file(self, file_path: Union[str, Path]) -> bool:
"""Safely delete a file"""
try:
file_path = Path(file_path)
if file_path.exists() and file_path.is_file():
file_path.unlink()
logger.info(f"Deleted file: {file_path}")
return True
return False
except Exception as e:
logger.error(f"Error deleting file {file_path}: {e}")
return False
def get_file_info(self, file_path: Union[str, Path]) -> dict:
"""Get information about a file"""
file_path = Path(file_path)
if not file_path.exists():
return {"exists": False}
stat = file_path.stat()
return {
"exists": True,
"name": file_path.name,
"size": stat.st_size,
"size_mb": stat.st_size / (1024 * 1024),
"created": datetime.fromtimestamp(stat.st_ctime),
"modified": datetime.fromtimestamp(stat.st_mtime),
"extension": file_path.suffix,
"path": str(file_path.absolute())
}
# ============================================================================
# VIDEO UTILS CLASS
# ============================================================================
class VideoUtils:
"""Utilities for video processing"""
@staticmethod
def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]:
"""Get detailed video information"""
video_path = str(video_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"Failed to open video: {video_path}")
return {"error": "Failed to open video"}
try:
info = {
"fps": cap.get(cv2.CAP_PROP_FPS),
"frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
"codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))),
"duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0
}
path = Path(video_path)
if path.exists():
info["file_size_mb"] = path.stat().st_size / (1024 * 1024)
return info
finally:
cap.release()
@staticmethod
def _fourcc_to_string(fourcc: int) -> str:
"""Convert fourcc code to string"""
return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
@staticmethod
def extract_frames(video_path: Union[str, Path],
output_dir: Union[str, Path],
frame_interval: int = 1,
max_frames: Optional[int] = None) -> List[Path]:
"""Extract frames from video"""
video_path = str(video_path)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"Failed to open video: {video_path}")
return []
frame_paths = []
frame_count = 0
extracted_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_path = output_dir / f"frame_{frame_count:06d}.png"
cv2.imwrite(str(frame_path), frame)
frame_paths.append(frame_path)
extracted_count += 1
if max_frames and extracted_count >= max_frames:
break
frame_count += 1
logger.info(f"Extracted {len(frame_paths)} frames from video")
return frame_paths
finally:
cap.release()
@staticmethod
def create_video_from_frames(frame_paths: List[Union[str, Path]],
output_path: Union[str, Path],
fps: float = 30.0,
codec: str = 'mp4v') -> bool:
"""Create video from frame images"""
if not frame_paths:
logger.error("No frames provided")
return False
first_frame = cv2.imread(str(frame_paths[0]))
if first_frame is None:
logger.error(f"Failed to read first frame: {frame_paths[0]}")
return False
height, width, layers = first_frame.shape
fourcc = cv2.VideoWriter_fourcc(*codec)
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
try:
for frame_path in frame_paths:
frame = cv2.imread(str(frame_path))
if frame is not None:
out.write(frame)
else:
logger.warning(f"Failed to read frame: {frame_path}")
logger.info(f"Created video: {output_path}")
return True
except Exception as e:
logger.error(f"Error creating video: {e}")
return False
finally:
out.release()
@staticmethod
def resize_video(input_path: Union[str, Path],
output_path: Union[str, Path],
target_width: Optional[int] = None,
target_height: Optional[int] = None,
maintain_aspect: bool = True) -> bool:
"""Resize video to target dimensions"""
cap = cv2.VideoCapture(str(input_path))
if not cap.isOpened():
logger.error(f"Failed to open video: {input_path}")
return False
orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
if maintain_aspect:
if target_width and not target_height:
aspect = orig_width / orig_height
target_height = int(target_width / aspect)
elif target_height and not target_width:
aspect = orig_width / orig_height
target_width = int(target_height * aspect)
if not target_width:
target_width = orig_width
if not target_height:
target_height = orig_height
out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height))
try:
while True:
ret, frame = cap.read()
if not ret:
break
resized = cv2.resize(frame, (target_width, target_height))
out.write(resized)
logger.info(f"Resized video saved to: {output_path}")
return True
except Exception as e:
logger.error(f"Error resizing video: {e}")
return False
finally:
cap.release()
out.release()
@staticmethod
def extract_audio(video_path: Union[str, Path],
audio_path: Union[str, Path]) -> bool:
"""Extract audio from video using ffmpeg"""
try:
cmd = [
'ffmpeg', '-i', str(video_path),
'-vn', '-acodec', 'copy',
str(audio_path), '-y'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Audio extracted to: {audio_path}")
return True
else:
logger.error(f"Failed to extract audio: {result.stderr}")
return False
except FileNotFoundError:
logger.error("ffmpeg not found. Please install ffmpeg.")
return False
except Exception as e:
logger.error(f"Error extracting audio: {e}")
return False
@staticmethod
def add_audio_to_video(video_path: Union[str, Path],
audio_path: Union[str, Path],
output_path: Union[str, Path]) -> bool:
"""Add audio track to video using ffmpeg"""
try:
cmd = [
'ffmpeg', '-i', str(video_path),
'-i', str(audio_path),
'-c:v', 'copy', '-c:a', 'aac',
'-map', '0:v:0', '-map', '1:a:0',
str(output_path), '-y'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Video with audio saved to: {output_path}")
return True
else:
logger.error(f"Failed to add audio: {result.stderr}")
return False
except FileNotFoundError:
logger.error("ffmpeg not found. Please install ffmpeg.")
return False
except Exception as e:
logger.error(f"Error adding audio: {e}")
return False
# ============================================================================
# IMAGE UTILS CLASS
# ============================================================================
class ImageUtils:
"""Utilities for image processing and manipulation"""
@staticmethod
def load_image(image_path: Union[str, Path]) -> Optional[Image.Image]:
"""Load an image using PIL"""
try:
return Image.open(str(image_path))
except Exception as e:
logger.error(f"Failed to load image {image_path}: {e}")
return None
@staticmethod
def resize_image(image: Image.Image,
max_width: Optional[int] = None,
max_height: Optional[int] = None,
maintain_aspect: bool = True) -> Image.Image:
"""Resize image to fit within max dimensions"""
if not max_width and not max_height:
return image
width, height = image.size
if maintain_aspect:
scale = 1.0
if max_width:
scale = min(scale, max_width / width)
if max_height:
scale = min(scale, max_height / height)
new_width = int(width * scale)
new_height = int(height * scale)
else:
new_width = max_width or width
new_height = max_height or height
return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
@staticmethod
def convert_to_cv2(pil_image: Image.Image) -> np.ndarray:
"""Convert PIL Image to OpenCV format"""
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
np_image = np.array(pil_image)
return cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR)
@staticmethod
def convert_from_cv2(cv2_image: np.ndarray) -> Image.Image:
"""Convert OpenCV image to PIL format"""
rgb_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB)
return Image.fromarray(rgb_image)
@staticmethod
def apply_blur(image: Image.Image, radius: float = 5.0) -> Image.Image:
"""Apply Gaussian blur to image"""
return image.filter(ImageFilter.GaussianBlur(radius=radius))
@staticmethod
def adjust_brightness(image: Image.Image, factor: float = 1.0) -> Image.Image:
"""Adjust image brightness"""
enhancer = ImageEnhance.Brightness(image)
return enhancer.enhance(factor)
@staticmethod
def adjust_contrast(image: Image.Image, factor: float = 1.0) -> Image.Image:
"""Adjust image contrast"""
enhancer = ImageEnhance.Contrast(image)
return enhancer.enhance(factor)
@staticmethod
def adjust_saturation(image: Image.Image, factor: float = 1.0) -> Image.Image:
"""Adjust image saturation"""
enhancer = ImageEnhance.Color(image)
return enhancer.enhance(factor)
@staticmethod
def crop_center(image: Image.Image, crop_width: int, crop_height: int) -> Image.Image:
"""Crop image from center"""
width, height = image.size
left = (width - crop_width) // 2
top = (height - crop_height) // 2
right = left + crop_width
bottom = top + crop_height
return image.crop((left, top, right, bottom))
@staticmethod
def create_thumbnail(image: Image.Image, size: Tuple[int, int] = (128, 128)) -> Image.Image:
"""Create thumbnail preserving aspect ratio"""
img_copy = image.copy()
img_copy.thumbnail(size, Image.Resampling.LANCZOS)
return img_copy
@staticmethod
def apply_mask(image: Image.Image, mask: Image.Image, alpha: float = 1.0) -> Image.Image:
"""Apply mask to image"""
if image.mode != 'RGBA':
image = image.convert('RGBA')
if mask.mode != 'L':
mask = mask.convert('L')
if mask.size != image.size:
mask = mask.resize(image.size, Image.Resampling.LANCZOS)
if alpha < 1.0:
mask = ImageEnhance.Brightness(mask).enhance(alpha)
image.putalpha(mask)
return image
@staticmethod
def composite_images(foreground: Image.Image,
background: Image.Image,
position: Tuple[int, int] = (0, 0),
alpha: float = 1.0) -> Image.Image:
"""Composite foreground image over background"""
if foreground.mode != 'RGBA':
foreground = foreground.convert('RGBA')
if background.mode != 'RGBA':
background = background.convert('RGBA')
if alpha < 1.0:
foreground = foreground.copy()
foreground.putalpha(
ImageEnhance.Brightness(foreground.split()[3]).enhance(alpha)
)
output = background.copy()
output.paste(foreground, position, foreground)
return output
@staticmethod
def get_image_info(image_path: Union[str, Path]) -> Dict[str, Any]:
"""Get image file information"""
try:
image_path = Path(image_path)
if not image_path.exists():
return {"exists": False}
with Image.open(str(image_path)) as img:
info = {
"exists": True,
"filename": image_path.name,
"format": img.format,
"mode": img.mode,
"size": img.size,
"width": img.width,
"height": img.height,
"file_size_mb": image_path.stat().st_size / (1024 * 1024)
}
if hasattr(img, '_getexif') and img._getexif():
info["has_exif"] = True
else:
info["has_exif"] = False
return info
except Exception as e:
logger.error(f"Error getting image info for {image_path}: {e}")
return {"exists": False, "error": str(e)}
@staticmethod
def save_image(image: Image.Image,
output_path: Union[str, Path],
quality: int = 95,
optimize: bool = True) -> bool:
"""Save image with specified quality"""
try:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
save_kwargs = {}
ext = output_path.suffix.lower()
if ext in ['.jpg', '.jpeg']:
save_kwargs['quality'] = quality
save_kwargs['optimize'] = optimize
elif ext == '.png':
save_kwargs['optimize'] = optimize
image.save(str(output_path), **save_kwargs)
logger.info(f"Saved image to: {output_path}")
return True
except Exception as e:
logger.error(f"Failed to save image to {output_path}: {e}")
return False
# ============================================================================
# DEFAULT INSTANCES
# ============================================================================
def validate_video_file(file_path: str) -> tuple:
"""Validate if file is a valid video file."""
import os
import cv2
if not os.path.exists(file_path):
return False, f"File not found: {file_path}"
try:
cap = cv2.VideoCapture(file_path)
ret = cap.isOpened()
cap.release()
if ret:
return True, "Video file is valid"
else:
return False, "Unable to open video file - may be corrupted"
except Exception as e:
return False, f"Error validating video: {str(e)}"