#!/usr/bin/env python3 """ Benchmark script for BackgroundFX Pro. Tests performance across different configurations and hardware. """ import time import psutil import torch import cv2 import numpy as np from pathlib import Path import json import argparse from typing import Dict, List, Any import statistics from datetime import datetime # Add parent directory to path import sys sys.path.append(str(Path(__file__).parent.parent)) from api import ProcessingPipeline, PipelineConfig from models import ModelRegistry, ModelLoader class Benchmarker: """Performance benchmarking tool.""" def __init__(self, output_file: str = None): """Initialize benchmarker.""" self.results = { 'timestamp': datetime.now().isoformat(), 'system_info': self._get_system_info(), 'benchmarks': [] } self.output_file = output_file or f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" def _get_system_info(self) -> Dict[str, Any]: """Collect system information.""" info = { 'cpu': { 'count': psutil.cpu_count(), 'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 0, 'model': self._get_cpu_model() }, 'memory': { 'total_gb': psutil.virtual_memory().total / (1024**3), 'available_gb': psutil.virtual_memory().available / (1024**3) }, 'gpu': self._get_gpu_info(), 'python_version': sys.version, 'torch_version': torch.__version__, 'cuda_available': torch.cuda.is_available() } return info def _get_cpu_model(self) -> str: """Get CPU model name.""" try: import platform return platform.processor() except: return "Unknown" def _get_gpu_info(self) -> Dict[str, Any]: """Get GPU information.""" if torch.cuda.is_available(): return { 'name': torch.cuda.get_device_name(0), 'memory_gb': torch.cuda.get_device_properties(0).total_memory / (1024**3), 'compute_capability': torch.cuda.get_device_capability(0) } return {'available': False} def benchmark_image_processing(self, sizes: List[tuple] = None, qualities: List[str] = None, num_iterations: int = 5) -> Dict[str, Any]: """Benchmark image processing performance.""" print("\n=== Image Processing Benchmark ===") sizes = sizes or [(512, 512), (1024, 1024), (1920, 1080)] qualities = qualities or ['low', 'medium', 'high'] results = { 'test': 'image_processing', 'iterations': num_iterations, 'results': [] } for size in sizes: for quality in qualities: print(f"Testing {size[0]}x{size[1]} @ {quality} quality...") # Create test image image = np.random.randint(0, 255, (*size, 3), dtype=np.uint8) # Configure pipeline config = PipelineConfig( quality_preset=quality, use_gpu=torch.cuda.is_available(), enable_cache=False ) try: pipeline = ProcessingPipeline(config) # Warmup pipeline.process_image(image, None) # Benchmark times = [] memory_usage = [] for _ in range(num_iterations): start_mem = psutil.Process().memory_info().rss / (1024**2) start_time = time.time() result = pipeline.process_image(image, None) elapsed = time.time() - start_time end_mem = psutil.Process().memory_info().rss / (1024**2) times.append(elapsed) memory_usage.append(end_mem - start_mem) # Calculate statistics result_data = { 'size': f"{size[0]}x{size[1]}", 'quality': quality, 'avg_time': statistics.mean(times), 'std_time': statistics.stdev(times) if len(times) > 1 else 0, 'min_time': min(times), 'max_time': max(times), 'fps': 1.0 / statistics.mean(times), 'avg_memory_mb': statistics.mean(memory_usage) } results['results'].append(result_data) print(f" Average: {result_data['avg_time']:.3f}s ({result_data['fps']:.1f} FPS)") except Exception as e: print(f" Failed: {str(e)}") results['results'].append({ 'size': f"{size[0]}x{size[1]}", 'quality': quality, 'error': str(e) }) self.results['benchmarks'].append(results) return results def benchmark_model_loading(self) -> Dict[str, Any]: """Benchmark model loading times.""" print("\n=== Model Loading Benchmark ===") results = { 'test': 'model_loading', 'results': [] } registry = ModelRegistry() loader = ModelLoader(registry, device='cuda' if torch.cuda.is_available() else 'cpu') # Test loading different models models_to_test = ['rmbg-1.4', 'u2netp', 'modnet'] for model_id in models_to_test: print(f"Loading {model_id}...") # Clear cache loader.unload_all() # Measure loading time start_time = time.time() start_mem = psutil.Process().memory_info().rss / (1024**2) try: loaded = loader.load_model(model_id) elapsed = time.time() - start_time end_mem = psutil.Process().memory_info().rss / (1024**2) if loaded: result_data = { 'model': model_id, 'load_time': elapsed, 'memory_usage_mb': end_mem - start_mem, 'device': loaded.device } print(f" Loaded in {elapsed:.2f}s, Memory: {end_mem - start_mem:.1f}MB") else: result_data = { 'model': model_id, 'error': 'Failed to load' } print(f" Failed to load") except Exception as e: result_data = { 'model': model_id, 'error': str(e) } print(f" Error: {str(e)}") results['results'].append(result_data) self.results['benchmarks'].append(results) return results def benchmark_video_processing(self, duration: int = 5, fps: int = 30, size: tuple = (1280, 720)) -> Dict[str, Any]: """Benchmark video processing performance.""" print("\n=== Video Processing Benchmark ===") results = { 'test': 'video_processing', 'video_specs': { 'duration': duration, 'fps': fps, 'size': f"{size[0]}x{size[1]}", 'total_frames': duration * fps }, 'results': [] } # Create test video import tempfile video_path = Path(tempfile.mkdtemp()) / "test_video.mp4" fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(video_path), fourcc, fps, size) print(f"Creating test video: {duration}s @ {fps}fps, {size[0]}x{size[1]}") for i in range(duration * fps): frame = np.random.randint(0, 255, (*size[::-1], 3), dtype=np.uint8) # Add moving rectangle for motion x = int((i / (duration * fps)) * size[0]) cv2.rectangle(frame, (x, 100), (x + 100, 200), (0, 255, 0), -1) out.write(frame) out.release() # Test different quality settings for quality in ['low', 'medium', 'high']: print(f"Processing at {quality} quality...") from api import VideoProcessorAPI processor = VideoProcessorAPI() start_time = time.time() start_mem = psutil.Process().memory_info().rss / (1024**2) try: output_path = video_path.parent / f"output_{quality}.mp4" stats = processor.process_video( str(video_path), str(output_path), background=None ) elapsed = time.time() - start_time end_mem = psutil.Process().memory_info().rss / (1024**2) result_data = { 'quality': quality, 'total_time': elapsed, 'frames_processed': stats.frames_processed, 'processing_fps': stats.processing_fps, 'time_per_frame': elapsed / stats.frames_processed if stats.frames_processed > 0 else 0, 'memory_usage_mb': end_mem - start_mem } print(f" Processed in {elapsed:.2f}s @ {stats.processing_fps:.1f} FPS") except Exception as e: result_data = { 'quality': quality, 'error': str(e) } print(f" Failed: {str(e)}") results['results'].append(result_data) # Cleanup video_path.unlink(missing_ok=True) self.results['benchmarks'].append(results) return results def benchmark_batch_processing(self, batch_sizes: List[int] = None, num_workers_list: List[int] = None) -> Dict[str, Any]: """Benchmark batch processing performance.""" print("\n=== Batch Processing Benchmark ===") batch_sizes = batch_sizes or [1, 5, 10, 20] num_workers_list = num_workers_list or [1, 2, 4, 8] results = { 'test': 'batch_processing', 'results': [] } # Create test images test_images = [] for i in range(max(batch_sizes)): img = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8) test_images.append(img) for batch_size in batch_sizes: for num_workers in num_workers_list: print(f"Testing batch_size={batch_size}, workers={num_workers}...") config = PipelineConfig( batch_size=batch_size, num_workers=num_workers, use_gpu=torch.cuda.is_available(), enable_cache=False ) try: pipeline = ProcessingPipeline(config) start_time = time.time() results_batch = pipeline.process_batch(test_images[:batch_size]) elapsed = time.time() - start_time successful = sum(1 for r in results_batch if r.success) result_data = { 'batch_size': batch_size, 'num_workers': num_workers, 'total_time': elapsed, 'time_per_image': elapsed / batch_size, 'throughput': batch_size / elapsed, 'successful': successful } print(f" {elapsed:.2f}s total, {result_data['throughput']:.1f} images/sec") except Exception as e: result_data = { 'batch_size': batch_size, 'num_workers': num_workers, 'error': str(e) } print(f" Failed: {str(e)}") results['results'].append(result_data) self.results['benchmarks'].append(results) return results def save_results(self): """Save benchmark results to file.""" with open(self.output_file, 'w') as f: json.dump(self.results, f, indent=2) print(f"\nResults saved to: {self.output_file}") def print_summary(self): """Print benchmark summary.""" print("\n" + "="*50) print("BENCHMARK SUMMARY") print("="*50) for benchmark in self.results['benchmarks']: print(f"\n{benchmark['test'].upper()}:") if 'results' in benchmark: for result in benchmark['results']: if 'error' not in result: if benchmark['test'] == 'image_processing': print(f" {result['size']} @ {result['quality']}: {result['fps']:.1f} FPS") elif benchmark['test'] == 'model_loading': print(f" {result['model']}: {result['load_time']:.2f}s") elif benchmark['test'] == 'video_processing': print(f" {result['quality']}: {result['processing_fps']:.1f} FPS") elif benchmark['test'] == 'batch_processing': print(f" Batch {result['batch_size']} x {result['num_workers']} workers: {result['throughput']:.1f} img/s") def main(): """Main benchmark function.""" parser = argparse.ArgumentParser(description='BackgroundFX Pro Performance Benchmark') parser.add_argument('--tests', nargs='+', choices=['image', 'model', 'video', 'batch', 'all'], default=['all'], help='Tests to run') parser.add_argument('--output', '-o', help='Output file for results') parser.add_argument('--iterations', '-i', type=int, default=5, help='Number of iterations for each test') args = parser.parse_args() benchmarker = Benchmarker(args.output) tests_to_run = args.tests if 'all' in tests_to_run: tests_to_run = ['image', 'model', 'video', 'batch'] print("BackgroundFX Pro Performance Benchmark") print("="*50) print("System Information:") print(f" CPU: {benchmarker.results['system_info']['cpu']['model']}") print(f" Memory: {benchmarker.results['system_info']['memory']['total_gb']:.1f}GB") if benchmarker.results['system_info']['cuda_available']: print(f" GPU: {benchmarker.results['system_info']['gpu']['name']}") else: print(" GPU: Not available") # Run selected benchmarks if 'image' in tests_to_run: benchmarker.benchmark_image_processing(num_iterations=args.iterations) if 'model' in tests_to_run: benchmarker.benchmark_model_loading() if 'video' in tests_to_run: benchmarker.benchmark_video_processing() if 'batch' in tests_to_run: benchmarker.benchmark_batch_processing() # Save and display results benchmarker.save_results() benchmarker.print_summary() if __name__ == "__main__": main()