File size: 7,239 Bytes
cd21456 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
"""
Utility functions for the CLI.
"""
import sys
import os
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Any
import json
import psutil
import torch
from rich.console import Console
from rich.table import Table
console = Console()
def check_system_requirements() -> Dict[str, Any]:
"""Check if system meets requirements."""
requirements = {
'python_version': sys.version,
'python_ok': sys.version_info >= (3, 8),
'memory_gb': psutil.virtual_memory().total / (1024**3),
'memory_ok': psutil.virtual_memory().total >= 8 * (1024**3),
'cuda_available': torch.cuda.is_available(),
'gpu_name': None,
'gpu_memory_gb': 0,
'ffmpeg_available': check_ffmpeg(),
'disk_space_gb': get_free_disk_space(),
'disk_ok': get_free_disk_space() >= 10
}
if torch.cuda.is_available():
requirements['gpu_name'] = torch.cuda.get_device_name(0)
requirements['gpu_memory_gb'] = torch.cuda.get_device_properties(0).total_memory / (1024**3)
return requirements
def check_ffmpeg() -> bool:
"""Check if FFmpeg is available."""
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def get_free_disk_space() -> float:
"""Get free disk space in GB."""
stat = psutil.disk_usage('/')
return stat.free / (1024**3)
def install_ffmpeg_instructions() -> str:
"""Get FFmpeg installation instructions for current platform."""
system = sys.platform
if system == "darwin": # macOS
return "Install FFmpeg using: brew install ffmpeg"
elif system == "win32": # Windows
return "Download FFmpeg from https://ffmpeg.org/download.html and add to PATH"
else: # Linux
return "Install FFmpeg using: sudo apt-get install ffmpeg (Ubuntu/Debian) or equivalent"
def print_system_info():
"""Print system information table."""
reqs = check_system_requirements()
table = Table(title="System Information")
table.add_column("Component", style="cyan")
table.add_column("Status", style="green")
table.add_column("Value", style="white")
# Python
python_status = "β" if reqs['python_ok'] else "β"
table.add_row("Python", python_status, reqs['python_version'].split()[0])
# Memory
memory_status = "β" if reqs['memory_ok'] else "β"
table.add_row("Memory", memory_status, f"{reqs['memory_gb']:.1f} GB")
# GPU
if reqs['cuda_available']:
table.add_row("GPU", "β", f"{reqs['gpu_name']} ({reqs['gpu_memory_gb']:.1f} GB)")
else:
table.add_row("GPU", "β", "Not available")
# FFmpeg
ffmpeg_status = "β" if reqs['ffmpeg_available'] else "β"
table.add_row("FFmpeg", ffmpeg_status, "Installed" if reqs['ffmpeg_available'] else "Not found")
# Disk space
disk_status = "β" if reqs['disk_ok'] else "β"
table.add_row("Disk Space", disk_status, f"{reqs['disk_space_gb']:.1f} GB free")
console.print(table)
# Print warnings
if not reqs['memory_ok']:
console.print("[yellow]Warning: Less than 8GB RAM available. Processing may be slow.[/yellow]")
if not reqs['ffmpeg_available']:
console.print(f"[yellow]Warning: FFmpeg not found. {install_ffmpeg_instructions()}[/yellow]")
if not reqs['disk_ok']:
console.print("[yellow]Warning: Less than 10GB disk space available.[/yellow]")
def format_time(seconds: float) -> str:
"""Format seconds to human-readable time."""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"
def estimate_processing_time(video_path: str, use_two_stage: bool = False) -> float:
"""Estimate processing time for a video."""
import cv2
try:
cap = cv2.VideoCapture(video_path)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
# Rough estimates based on hardware
if torch.cuda.is_available():
seconds_per_frame = 0.1 if not use_two_stage else 0.2
else:
seconds_per_frame = 0.5 if not use_two_stage else 1.0
return frame_count * seconds_per_frame
except:
return 0
def validate_paths(input_path: str, output_path: str) -> bool:
"""Validate input and output paths."""
input_file = Path(input_path)
output_file = Path(output_path)
if not input_file.exists():
console.print(f"[red]Error: Input file does not exist: {input_path}[/red]")
return False
if not input_file.is_file():
console.print(f"[red]Error: Input path is not a file: {input_path}[/red]")
return False
# Check if input is a video file
valid_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv'}
if input_file.suffix.lower() not in valid_extensions:
console.print(f"[red]Error: Input file is not a supported video format: {input_file.suffix}[/red]")
console.print(f"Supported formats: {', '.join(valid_extensions)}")
return False
# Check output directory exists or can be created
output_dir = output_file.parent
if not output_dir.exists():
try:
output_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
console.print(f"[red]Error: Cannot create output directory: {e}[/red]")
return False
# Warn if output file exists
if output_file.exists():
response = console.input(f"[yellow]Output file exists. Overwrite? (y/n):[/yellow] ")
if response.lower() != 'y':
return False
return True
def create_config_file(config_path: Optional[str] = None) -> str:
"""Create a default configuration file."""
if config_path is None:
config_path = Path.home() / ".backgroundfx" / "config.json"
else:
config_path = Path(config_path)
config_path.parent.mkdir(parents=True, exist_ok=True)
default_config = {
"default_background": "blur",
"quality_preset": "high",
"use_two_stage": False,
"chroma_preset": "standard",
"output_format": "mp4",
"models_dir": str(Path.home() / ".backgroundfx" / "models"),
"temp_dir": "/tmp",
"max_memory_gb": 8,
"batch_size": 1,
"device": "auto"
}
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=2)
return str(config_path)
def load_config(config_path: str) -> Dict[str, Any]:
"""Load configuration from file."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
console.print(f"[yellow]Config file not found: {config_path}[/yellow]")
return {}
except json.JSONDecodeError as e:
console.print(f"[red]Error parsing config file: {e}[/red]")
return {} |