MogensR's picture
Create scripts/benchmark.py
e74e423
raw
history blame
16.4 kB
#!/usr/bin/env python3
"""
Benchmark script for BackgroundFX Pro.
Tests performance across different configurations and hardware.
"""
import time
import psutil
import torch
import cv2
import numpy as np
from pathlib import Path
import json
import argparse
from typing import Dict, List, Any
import statistics
from datetime import datetime
# Add parent directory to path
import sys
sys.path.append(str(Path(__file__).parent.parent))
from api import ProcessingPipeline, PipelineConfig
from models import ModelRegistry, ModelLoader
class Benchmarker:
"""Performance benchmarking tool."""
def __init__(self, output_file: str = None):
"""Initialize benchmarker."""
self.results = {
'timestamp': datetime.now().isoformat(),
'system_info': self._get_system_info(),
'benchmarks': []
}
self.output_file = output_file or f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
def _get_system_info(self) -> Dict[str, Any]:
"""Collect system information."""
info = {
'cpu': {
'count': psutil.cpu_count(),
'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 0,
'model': self._get_cpu_model()
},
'memory': {
'total_gb': psutil.virtual_memory().total / (1024**3),
'available_gb': psutil.virtual_memory().available / (1024**3)
},
'gpu': self._get_gpu_info(),
'python_version': sys.version,
'torch_version': torch.__version__,
'cuda_available': torch.cuda.is_available()
}
return info
def _get_cpu_model(self) -> str:
"""Get CPU model name."""
try:
import platform
return platform.processor()
except:
return "Unknown"
def _get_gpu_info(self) -> Dict[str, Any]:
"""Get GPU information."""
if torch.cuda.is_available():
return {
'name': torch.cuda.get_device_name(0),
'memory_gb': torch.cuda.get_device_properties(0).total_memory / (1024**3),
'compute_capability': torch.cuda.get_device_capability(0)
}
return {'available': False}
def benchmark_image_processing(self,
sizes: List[tuple] = None,
qualities: List[str] = None,
num_iterations: int = 5) -> Dict[str, Any]:
"""Benchmark image processing performance."""
print("\n=== Image Processing Benchmark ===")
sizes = sizes or [(512, 512), (1024, 1024), (1920, 1080)]
qualities = qualities or ['low', 'medium', 'high']
results = {
'test': 'image_processing',
'iterations': num_iterations,
'results': []
}
for size in sizes:
for quality in qualities:
print(f"Testing {size[0]}x{size[1]} @ {quality} quality...")
# Create test image
image = np.random.randint(0, 255, (*size, 3), dtype=np.uint8)
# Configure pipeline
config = PipelineConfig(
quality_preset=quality,
use_gpu=torch.cuda.is_available(),
enable_cache=False
)
try:
pipeline = ProcessingPipeline(config)
# Warmup
pipeline.process_image(image, None)
# Benchmark
times = []
memory_usage = []
for _ in range(num_iterations):
start_mem = psutil.Process().memory_info().rss / (1024**2)
start_time = time.time()
result = pipeline.process_image(image, None)
elapsed = time.time() - start_time
end_mem = psutil.Process().memory_info().rss / (1024**2)
times.append(elapsed)
memory_usage.append(end_mem - start_mem)
# Calculate statistics
result_data = {
'size': f"{size[0]}x{size[1]}",
'quality': quality,
'avg_time': statistics.mean(times),
'std_time': statistics.stdev(times) if len(times) > 1 else 0,
'min_time': min(times),
'max_time': max(times),
'fps': 1.0 / statistics.mean(times),
'avg_memory_mb': statistics.mean(memory_usage)
}
results['results'].append(result_data)
print(f" Average: {result_data['avg_time']:.3f}s ({result_data['fps']:.1f} FPS)")
except Exception as e:
print(f" Failed: {str(e)}")
results['results'].append({
'size': f"{size[0]}x{size[1]}",
'quality': quality,
'error': str(e)
})
self.results['benchmarks'].append(results)
return results
def benchmark_model_loading(self) -> Dict[str, Any]:
"""Benchmark model loading times."""
print("\n=== Model Loading Benchmark ===")
results = {
'test': 'model_loading',
'results': []
}
registry = ModelRegistry()
loader = ModelLoader(registry, device='cuda' if torch.cuda.is_available() else 'cpu')
# Test loading different models
models_to_test = ['rmbg-1.4', 'u2netp', 'modnet']
for model_id in models_to_test:
print(f"Loading {model_id}...")
# Clear cache
loader.unload_all()
# Measure loading time
start_time = time.time()
start_mem = psutil.Process().memory_info().rss / (1024**2)
try:
loaded = loader.load_model(model_id)
elapsed = time.time() - start_time
end_mem = psutil.Process().memory_info().rss / (1024**2)
if loaded:
result_data = {
'model': model_id,
'load_time': elapsed,
'memory_usage_mb': end_mem - start_mem,
'device': loaded.device
}
print(f" Loaded in {elapsed:.2f}s, Memory: {end_mem - start_mem:.1f}MB")
else:
result_data = {
'model': model_id,
'error': 'Failed to load'
}
print(f" Failed to load")
except Exception as e:
result_data = {
'model': model_id,
'error': str(e)
}
print(f" Error: {str(e)}")
results['results'].append(result_data)
self.results['benchmarks'].append(results)
return results
def benchmark_video_processing(self,
duration: int = 5,
fps: int = 30,
size: tuple = (1280, 720)) -> Dict[str, Any]:
"""Benchmark video processing performance."""
print("\n=== Video Processing Benchmark ===")
results = {
'test': 'video_processing',
'video_specs': {
'duration': duration,
'fps': fps,
'size': f"{size[0]}x{size[1]}",
'total_frames': duration * fps
},
'results': []
}
# Create test video
import tempfile
video_path = Path(tempfile.mkdtemp()) / "test_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(video_path), fourcc, fps, size)
print(f"Creating test video: {duration}s @ {fps}fps, {size[0]}x{size[1]}")
for i in range(duration * fps):
frame = np.random.randint(0, 255, (*size[::-1], 3), dtype=np.uint8)
# Add moving rectangle for motion
x = int((i / (duration * fps)) * size[0])
cv2.rectangle(frame, (x, 100), (x + 100, 200), (0, 255, 0), -1)
out.write(frame)
out.release()
# Test different quality settings
for quality in ['low', 'medium', 'high']:
print(f"Processing at {quality} quality...")
from api import VideoProcessorAPI
processor = VideoProcessorAPI()
start_time = time.time()
start_mem = psutil.Process().memory_info().rss / (1024**2)
try:
output_path = video_path.parent / f"output_{quality}.mp4"
stats = processor.process_video(
str(video_path),
str(output_path),
background=None
)
elapsed = time.time() - start_time
end_mem = psutil.Process().memory_info().rss / (1024**2)
result_data = {
'quality': quality,
'total_time': elapsed,
'frames_processed': stats.frames_processed,
'processing_fps': stats.processing_fps,
'time_per_frame': elapsed / stats.frames_processed if stats.frames_processed > 0 else 0,
'memory_usage_mb': end_mem - start_mem
}
print(f" Processed in {elapsed:.2f}s @ {stats.processing_fps:.1f} FPS")
except Exception as e:
result_data = {
'quality': quality,
'error': str(e)
}
print(f" Failed: {str(e)}")
results['results'].append(result_data)
# Cleanup
video_path.unlink(missing_ok=True)
self.results['benchmarks'].append(results)
return results
def benchmark_batch_processing(self,
batch_sizes: List[int] = None,
num_workers_list: List[int] = None) -> Dict[str, Any]:
"""Benchmark batch processing performance."""
print("\n=== Batch Processing Benchmark ===")
batch_sizes = batch_sizes or [1, 5, 10, 20]
num_workers_list = num_workers_list or [1, 2, 4, 8]
results = {
'test': 'batch_processing',
'results': []
}
# Create test images
test_images = []
for i in range(max(batch_sizes)):
img = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8)
test_images.append(img)
for batch_size in batch_sizes:
for num_workers in num_workers_list:
print(f"Testing batch_size={batch_size}, workers={num_workers}...")
config = PipelineConfig(
batch_size=batch_size,
num_workers=num_workers,
use_gpu=torch.cuda.is_available(),
enable_cache=False
)
try:
pipeline = ProcessingPipeline(config)
start_time = time.time()
results_batch = pipeline.process_batch(test_images[:batch_size])
elapsed = time.time() - start_time
successful = sum(1 for r in results_batch if r.success)
result_data = {
'batch_size': batch_size,
'num_workers': num_workers,
'total_time': elapsed,
'time_per_image': elapsed / batch_size,
'throughput': batch_size / elapsed,
'successful': successful
}
print(f" {elapsed:.2f}s total, {result_data['throughput']:.1f} images/sec")
except Exception as e:
result_data = {
'batch_size': batch_size,
'num_workers': num_workers,
'error': str(e)
}
print(f" Failed: {str(e)}")
results['results'].append(result_data)
self.results['benchmarks'].append(results)
return results
def save_results(self):
"""Save benchmark results to file."""
with open(self.output_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to: {self.output_file}")
def print_summary(self):
"""Print benchmark summary."""
print("\n" + "="*50)
print("BENCHMARK SUMMARY")
print("="*50)
for benchmark in self.results['benchmarks']:
print(f"\n{benchmark['test'].upper()}:")
if 'results' in benchmark:
for result in benchmark['results']:
if 'error' not in result:
if benchmark['test'] == 'image_processing':
print(f" {result['size']} @ {result['quality']}: {result['fps']:.1f} FPS")
elif benchmark['test'] == 'model_loading':
print(f" {result['model']}: {result['load_time']:.2f}s")
elif benchmark['test'] == 'video_processing':
print(f" {result['quality']}: {result['processing_fps']:.1f} FPS")
elif benchmark['test'] == 'batch_processing':
print(f" Batch {result['batch_size']} x {result['num_workers']} workers: {result['throughput']:.1f} img/s")
def main():
"""Main benchmark function."""
parser = argparse.ArgumentParser(description='BackgroundFX Pro Performance Benchmark')
parser.add_argument('--tests', nargs='+',
choices=['image', 'model', 'video', 'batch', 'all'],
default=['all'],
help='Tests to run')
parser.add_argument('--output', '-o', help='Output file for results')
parser.add_argument('--iterations', '-i', type=int, default=5,
help='Number of iterations for each test')
args = parser.parse_args()
benchmarker = Benchmarker(args.output)
tests_to_run = args.tests
if 'all' in tests_to_run:
tests_to_run = ['image', 'model', 'video', 'batch']
print("BackgroundFX Pro Performance Benchmark")
print("="*50)
print("System Information:")
print(f" CPU: {benchmarker.results['system_info']['cpu']['model']}")
print(f" Memory: {benchmarker.results['system_info']['memory']['total_gb']:.1f}GB")
if benchmarker.results['system_info']['cuda_available']:
print(f" GPU: {benchmarker.results['system_info']['gpu']['name']}")
else:
print(" GPU: Not available")
# Run selected benchmarks
if 'image' in tests_to_run:
benchmarker.benchmark_image_processing(num_iterations=args.iterations)
if 'model' in tests_to_run:
benchmarker.benchmark_model_loading()
if 'video' in tests_to_run:
benchmarker.benchmark_video_processing()
if 'batch' in tests_to_run:
benchmarker.benchmark_batch_processing()
# Save and display results
benchmarker.save_results()
benchmarker.print_summary()
if __name__ == "__main__":
main()