""" Utility functions for the CLI. """ import sys import os import subprocess from pathlib import Path from typing import Optional, List, Dict, Any import json import psutil import torch from rich.console import Console from rich.table import Table console = Console() def check_system_requirements() -> Dict[str, Any]: """Check if system meets requirements.""" requirements = { 'python_version': sys.version, 'python_ok': sys.version_info >= (3, 8), 'memory_gb': psutil.virtual_memory().total / (1024**3), 'memory_ok': psutil.virtual_memory().total >= 8 * (1024**3), 'cuda_available': torch.cuda.is_available(), 'gpu_name': None, 'gpu_memory_gb': 0, 'ffmpeg_available': check_ffmpeg(), 'disk_space_gb': get_free_disk_space(), 'disk_ok': get_free_disk_space() >= 10 } if torch.cuda.is_available(): requirements['gpu_name'] = torch.cuda.get_device_name(0) requirements['gpu_memory_gb'] = torch.cuda.get_device_properties(0).total_memory / (1024**3) return requirements def check_ffmpeg() -> bool: """Check if FFmpeg is available.""" try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) return True except (subprocess.CalledProcessError, FileNotFoundError): return False def get_free_disk_space() -> float: """Get free disk space in GB.""" stat = psutil.disk_usage('/') return stat.free / (1024**3) def install_ffmpeg_instructions() -> str: """Get FFmpeg installation instructions for current platform.""" system = sys.platform if system == "darwin": # macOS return "Install FFmpeg using: brew install ffmpeg" elif system == "win32": # Windows return "Download FFmpeg from https://ffmpeg.org/download.html and add to PATH" else: # Linux return "Install FFmpeg using: sudo apt-get install ffmpeg (Ubuntu/Debian) or equivalent" def print_system_info(): """Print system information table.""" reqs = check_system_requirements() table = Table(title="System Information") table.add_column("Component", style="cyan") table.add_column("Status", style="green") table.add_column("Value", style="white") # Python python_status = "✓" if reqs['python_ok'] else "✗" table.add_row("Python", python_status, reqs['python_version'].split()[0]) # Memory memory_status = "✓" if reqs['memory_ok'] else "✗" table.add_row("Memory", memory_status, f"{reqs['memory_gb']:.1f} GB") # GPU if reqs['cuda_available']: table.add_row("GPU", "✓", f"{reqs['gpu_name']} ({reqs['gpu_memory_gb']:.1f} GB)") else: table.add_row("GPU", "✗", "Not available") # FFmpeg ffmpeg_status = "✓" if reqs['ffmpeg_available'] else "✗" table.add_row("FFmpeg", ffmpeg_status, "Installed" if reqs['ffmpeg_available'] else "Not found") # Disk space disk_status = "✓" if reqs['disk_ok'] else "✗" table.add_row("Disk Space", disk_status, f"{reqs['disk_space_gb']:.1f} GB free") console.print(table) # Print warnings if not reqs['memory_ok']: console.print("[yellow]Warning: Less than 8GB RAM available. Processing may be slow.[/yellow]") if not reqs['ffmpeg_available']: console.print(f"[yellow]Warning: FFmpeg not found. {install_ffmpeg_instructions()}[/yellow]") if not reqs['disk_ok']: console.print("[yellow]Warning: Less than 10GB disk space available.[/yellow]") def format_time(seconds: float) -> str: """Format seconds to human-readable time.""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: minutes = seconds / 60 return f"{minutes:.1f}m" else: hours = seconds / 3600 return f"{hours:.1f}h" def estimate_processing_time(video_path: str, use_two_stage: bool = False) -> float: """Estimate processing time for a video.""" import cv2 try: cap = cv2.VideoCapture(video_path) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() # Rough estimates based on hardware if torch.cuda.is_available(): seconds_per_frame = 0.1 if not use_two_stage else 0.2 else: seconds_per_frame = 0.5 if not use_two_stage else 1.0 return frame_count * seconds_per_frame except: return 0 def validate_paths(input_path: str, output_path: str) -> bool: """Validate input and output paths.""" input_file = Path(input_path) output_file = Path(output_path) if not input_file.exists(): console.print(f"[red]Error: Input file does not exist: {input_path}[/red]") return False if not input_file.is_file(): console.print(f"[red]Error: Input path is not a file: {input_path}[/red]") return False # Check if input is a video file valid_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv'} if input_file.suffix.lower() not in valid_extensions: console.print(f"[red]Error: Input file is not a supported video format: {input_file.suffix}[/red]") console.print(f"Supported formats: {', '.join(valid_extensions)}") return False # Check output directory exists or can be created output_dir = output_file.parent if not output_dir.exists(): try: output_dir.mkdir(parents=True, exist_ok=True) except Exception as e: console.print(f"[red]Error: Cannot create output directory: {e}[/red]") return False # Warn if output file exists if output_file.exists(): response = console.input(f"[yellow]Output file exists. Overwrite? (y/n):[/yellow] ") if response.lower() != 'y': return False return True def create_config_file(config_path: Optional[str] = None) -> str: """Create a default configuration file.""" if config_path is None: config_path = Path.home() / ".backgroundfx" / "config.json" else: config_path = Path(config_path) config_path.parent.mkdir(parents=True, exist_ok=True) default_config = { "default_background": "blur", "quality_preset": "high", "use_two_stage": False, "chroma_preset": "standard", "output_format": "mp4", "models_dir": str(Path.home() / ".backgroundfx" / "models"), "temp_dir": "/tmp", "max_memory_gb": 8, "batch_size": 1, "device": "auto" } with open(config_path, 'w') as f: json.dump(default_config, f, indent=2) return str(config_path) def load_config(config_path: str) -> Dict[str, Any]: """Load configuration from file.""" try: with open(config_path, 'r') as f: return json.load(f) except FileNotFoundError: console.print(f"[yellow]Config file not found: {config_path}[/yellow]") return {} except json.JSONDecodeError as e: console.print(f"[red]Error parsing config file: {e}[/red]") return {}