|
|
""" |
|
|
Command-line interface for BackgroundFX Pro. |
|
|
Integrates with existing app.py infrastructure. |
|
|
""" |
|
|
|
|
|
import click |
|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple |
|
|
import logging |
|
|
from rich.console import Console |
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn |
|
|
from rich.table import Table |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
|
|
|
from app import ( |
|
|
VideoProcessor, |
|
|
processor as app_processor, |
|
|
PROFESSIONAL_BACKGROUNDS, |
|
|
TWO_STAGE_AVAILABLE, |
|
|
CHROMA_PRESETS |
|
|
) |
|
|
|
|
|
console = Console() |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@click.group() |
|
|
@click.option('--verbose', '-v', is_flag=True, help='Verbose output') |
|
|
@click.option('--debug', is_flag=True, help='Debug mode') |
|
|
def cli(verbose: bool, debug: bool): |
|
|
""" |
|
|
BackgroundFX Pro CLI - Professional video background replacement. |
|
|
|
|
|
Uses the same processing engine as the Gradio UI. |
|
|
""" |
|
|
|
|
|
log_level = logging.DEBUG if debug else (logging.INFO if verbose else logging.WARNING) |
|
|
logging.basicConfig( |
|
|
level=log_level, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
|
|
|
|
|
|
@cli.command() |
|
|
@click.option('--force', is_flag=True, help='Force reload models') |
|
|
def load_models(force: bool): |
|
|
"""Load AI models for processing.""" |
|
|
console.print("[bold blue]Loading models...[/bold blue]") |
|
|
|
|
|
def progress_callback(progress: float, message: str): |
|
|
console.print(f" {int(progress*100)}% - {message}") |
|
|
|
|
|
|
|
|
if force or not app_processor.models_loaded: |
|
|
result = app_processor.load_models(progress_callback) |
|
|
console.print(f"[green]β[/green] {result}") |
|
|
else: |
|
|
console.print("[yellow]Models already loaded[/yellow]") |
|
|
|
|
|
|
|
|
@cli.command() |
|
|
@click.argument('input_video', type=click.Path(exists=True)) |
|
|
@click.argument('output_video', type=click.Path()) |
|
|
@click.option('--background', '-b', |
|
|
type=click.Choice(list(PROFESSIONAL_BACKGROUNDS.keys()) + ['custom']), |
|
|
default='blur', |
|
|
help='Background type') |
|
|
@click.option('--background-image', '-i', type=click.Path(exists=True), |
|
|
help='Custom background image (when using custom background)') |
|
|
@click.option('--two-stage', is_flag=True, |
|
|
help='Use two-stage processing (cinema quality)') |
|
|
@click.option('--chroma-preset', |
|
|
type=click.Choice(list(CHROMA_PRESETS.keys()) if TWO_STAGE_AVAILABLE else ['standard']), |
|
|
default='standard', |
|
|
help='Chroma keying preset for two-stage') |
|
|
@click.option('--preview-mask', is_flag=True, |
|
|
help='Generate mask preview video') |
|
|
@click.option('--preview-greenscreen', is_flag=True, |
|
|
help='Generate greenscreen preview video') |
|
|
def process(input_video: str, output_video: str, background: str, |
|
|
background_image: Optional[str], two_stage: bool, |
|
|
chroma_preset: str, preview_mask: bool, preview_greenscreen: bool): |
|
|
"""Process a video file.""" |
|
|
|
|
|
|
|
|
if not app_processor.models_loaded: |
|
|
console.print("[yellow]Loading models first...[/yellow]") |
|
|
|
|
|
def progress_callback(progress: float, message: str): |
|
|
console.print(f" {int(progress*100)}% - {message}") |
|
|
|
|
|
result = app_processor.load_models(progress_callback) |
|
|
console.print(f"[green]β[/green] {result}") |
|
|
|
|
|
|
|
|
if background == 'custom' and not background_image: |
|
|
console.print("[red]Error: Custom background requires --background-image[/red]") |
|
|
sys.exit(1) |
|
|
|
|
|
console.print(f"[bold blue]Processing video:[/bold blue] {input_video}") |
|
|
console.print(f" Background: {background}") |
|
|
console.print(f" Two-stage: {'Yes' if two_stage else 'No'}") |
|
|
|
|
|
with Progress( |
|
|
SpinnerColumn(), |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
BarColumn(), |
|
|
console=console |
|
|
) as progress: |
|
|
|
|
|
task = progress.add_task("Processing...", total=100) |
|
|
|
|
|
def progress_callback(value: float, message: str): |
|
|
progress.update(task, completed=int(value * 100), description=message) |
|
|
|
|
|
|
|
|
result_path, message = app_processor.process_video( |
|
|
video_path=input_video, |
|
|
background_choice=background, |
|
|
custom_background_path=background_image if background == 'custom' else None, |
|
|
progress_callback=progress_callback, |
|
|
use_two_stage=two_stage, |
|
|
chroma_preset=chroma_preset, |
|
|
preview_mask=preview_mask, |
|
|
preview_greenscreen=preview_greenscreen |
|
|
) |
|
|
|
|
|
if result_path: |
|
|
|
|
|
import shutil |
|
|
shutil.move(result_path, output_video) |
|
|
|
|
|
console.print(f"[green]β Success![/green]") |
|
|
console.print(f" Output: {output_video}") |
|
|
console.print(f" {message}") |
|
|
else: |
|
|
console.print(f"[red]β Failed:[/red] {message}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@cli.command() |
|
|
def status(): |
|
|
"""Show system and model status.""" |
|
|
status_info = app_processor.get_status() |
|
|
|
|
|
|
|
|
table = Table(title="BackgroundFX Pro Status") |
|
|
table.add_column("Component", style="cyan") |
|
|
table.add_column("Status", style="green") |
|
|
|
|
|
table.add_row("Models Loaded", "β" if status_info['models_loaded'] else "β") |
|
|
table.add_row("Device", str(status_info['device'])) |
|
|
table.add_row("Two-Stage Available", "β" if status_info['two_stage_available'] else "β") |
|
|
|
|
|
if 'memory_usage' in status_info: |
|
|
mem = status_info['memory_usage'] |
|
|
table.add_row("Memory Usage", f"{mem['percent']:.1f}% ({mem['used_gb']:.1f}/{mem['total_gb']:.1f} GB)") |
|
|
|
|
|
if 'models' in status_info: |
|
|
models = status_info['models'] |
|
|
table.add_row("SAM2 Predictor", "β" if models.get('sam2_loaded') else "β") |
|
|
table.add_row("MatAnyone", "β" if models.get('matanyone_loaded') else "β") |
|
|
|
|
|
console.print(table) |
|
|
|
|
|
|
|
|
@cli.command() |
|
|
def list_backgrounds(): |
|
|
"""List available background options.""" |
|
|
table = Table(title="Available Backgrounds") |
|
|
table.add_column("ID", style="cyan") |
|
|
table.add_column("Description", style="white") |
|
|
table.add_column("Type", style="yellow") |
|
|
|
|
|
for bg_id, bg_info in PROFESSIONAL_BACKGROUNDS.items(): |
|
|
table.add_row( |
|
|
bg_id, |
|
|
bg_info.get('description', 'Professional background'), |
|
|
bg_info.get('type', 'gradient') |
|
|
) |
|
|
|
|
|
table.add_row("custom", "Use your own image", "image") |
|
|
|
|
|
console.print(table) |
|
|
|
|
|
|
|
|
@cli.command() |
|
|
def cleanup(): |
|
|
"""Clean up resources and cache.""" |
|
|
console.print("[bold blue]Cleaning up resources...[/bold blue]") |
|
|
|
|
|
app_processor.cleanup_resources() |
|
|
|
|
|
|
|
|
import tempfile |
|
|
import shutil |
|
|
temp_dir = Path(tempfile.gettempdir()) |
|
|
|
|
|
patterns = ['processed_video_*.mp4', 'mask_preview_*.mp4', 'greenscreen_preview_*.mp4'] |
|
|
removed = 0 |
|
|
|
|
|
for pattern in patterns: |
|
|
for file in temp_dir.glob(pattern): |
|
|
try: |
|
|
file.unlink() |
|
|
removed += 1 |
|
|
except: |
|
|
pass |
|
|
|
|
|
console.print(f"[green]β[/green] Cleaned up {removed} temporary files") |
|
|
console.print("[green]β[/green] Memory resources freed") |
|
|
|
|
|
|
|
|
@cli.command() |
|
|
@click.argument('input_dir', type=click.Path(exists=True)) |
|
|
@click.argument('output_dir', type=click.Path()) |
|
|
@click.option('--background', '-b', default='blur', help='Background type') |
|
|
@click.option('--pattern', '-p', default='*.mp4', help='File pattern to match') |
|
|
@click.option('--two-stage', is_flag=True, help='Use two-stage processing') |
|
|
def batch(input_dir: str, output_dir: str, background: str, pattern: str, two_stage: bool): |
|
|
"""Process multiple videos in batch.""" |
|
|
input_path = Path(input_dir) |
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
videos = list(input_path.glob(pattern)) |
|
|
|
|
|
if not videos: |
|
|
console.print(f"[yellow]No files matching '{pattern}' found in {input_dir}[/yellow]") |
|
|
return |
|
|
|
|
|
console.print(f"[bold blue]Found {len(videos)} videos to process[/bold blue]") |
|
|
|
|
|
|
|
|
if not app_processor.models_loaded: |
|
|
console.print("[yellow]Loading models...[/yellow]") |
|
|
app_processor.load_models() |
|
|
|
|
|
|
|
|
success_count = 0 |
|
|
|
|
|
for i, video_file in enumerate(videos, 1): |
|
|
console.print(f"\n[bold]Processing {i}/{len(videos)}:[/bold] {video_file.name}") |
|
|
|
|
|
output_file = output_path / f"processed_{video_file.name}" |
|
|
|
|
|
def progress_callback(value: float, message: str): |
|
|
console.print(f" {int(value*100)}% - {message}", end='\r') |
|
|
|
|
|
result_path, message = app_processor.process_video( |
|
|
video_path=str(video_file), |
|
|
background_choice=background, |
|
|
custom_background_path=None, |
|
|
progress_callback=progress_callback, |
|
|
use_two_stage=two_stage, |
|
|
chroma_preset='standard' |
|
|
) |
|
|
|
|
|
if result_path: |
|
|
import shutil |
|
|
shutil.move(result_path, str(output_file)) |
|
|
console.print(f" [green]β[/green] Saved to {output_file.name}") |
|
|
success_count += 1 |
|
|
else: |
|
|
console.print(f" [red]β[/red] Failed: {message}") |
|
|
|
|
|
console.print(f"\n[bold]Batch complete:[/bold] {success_count}/{len(videos)} successful") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main CLI entry point.""" |
|
|
cli() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |