|
|
|
|
|
""" |
|
|
Benchmark script for BackgroundFX Pro. |
|
|
Tests performance across different configurations and hardware. |
|
|
""" |
|
|
|
|
|
import time |
|
|
import psutil |
|
|
import torch |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
import json |
|
|
import argparse |
|
|
from typing import Dict, List, Any |
|
|
import statistics |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
import sys |
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
|
|
from api import ProcessingPipeline, PipelineConfig |
|
|
from models import ModelRegistry, ModelLoader |
|
|
|
|
|
|
|
|
class Benchmarker: |
|
|
"""Performance benchmarking tool.""" |
|
|
|
|
|
def __init__(self, output_file: str = None): |
|
|
"""Initialize benchmarker.""" |
|
|
self.results = { |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'system_info': self._get_system_info(), |
|
|
'benchmarks': [] |
|
|
} |
|
|
self.output_file = output_file or f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
|
|
|
|
def _get_system_info(self) -> Dict[str, Any]: |
|
|
"""Collect system information.""" |
|
|
info = { |
|
|
'cpu': { |
|
|
'count': psutil.cpu_count(), |
|
|
'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 0, |
|
|
'model': self._get_cpu_model() |
|
|
}, |
|
|
'memory': { |
|
|
'total_gb': psutil.virtual_memory().total / (1024**3), |
|
|
'available_gb': psutil.virtual_memory().available / (1024**3) |
|
|
}, |
|
|
'gpu': self._get_gpu_info(), |
|
|
'python_version': sys.version, |
|
|
'torch_version': torch.__version__, |
|
|
'cuda_available': torch.cuda.is_available() |
|
|
} |
|
|
return info |
|
|
|
|
|
def _get_cpu_model(self) -> str: |
|
|
"""Get CPU model name.""" |
|
|
try: |
|
|
import platform |
|
|
return platform.processor() |
|
|
except: |
|
|
return "Unknown" |
|
|
|
|
|
def _get_gpu_info(self) -> Dict[str, Any]: |
|
|
"""Get GPU information.""" |
|
|
if torch.cuda.is_available(): |
|
|
return { |
|
|
'name': torch.cuda.get_device_name(0), |
|
|
'memory_gb': torch.cuda.get_device_properties(0).total_memory / (1024**3), |
|
|
'compute_capability': torch.cuda.get_device_capability(0) |
|
|
} |
|
|
return {'available': False} |
|
|
|
|
|
def benchmark_image_processing(self, |
|
|
sizes: List[tuple] = None, |
|
|
qualities: List[str] = None, |
|
|
num_iterations: int = 5) -> Dict[str, Any]: |
|
|
"""Benchmark image processing performance.""" |
|
|
print("\n=== Image Processing Benchmark ===") |
|
|
|
|
|
sizes = sizes or [(512, 512), (1024, 1024), (1920, 1080)] |
|
|
qualities = qualities or ['low', 'medium', 'high'] |
|
|
|
|
|
results = { |
|
|
'test': 'image_processing', |
|
|
'iterations': num_iterations, |
|
|
'results': [] |
|
|
} |
|
|
|
|
|
for size in sizes: |
|
|
for quality in qualities: |
|
|
print(f"Testing {size[0]}x{size[1]} @ {quality} quality...") |
|
|
|
|
|
|
|
|
image = np.random.randint(0, 255, (*size, 3), dtype=np.uint8) |
|
|
|
|
|
|
|
|
config = PipelineConfig( |
|
|
quality_preset=quality, |
|
|
use_gpu=torch.cuda.is_available(), |
|
|
enable_cache=False |
|
|
) |
|
|
|
|
|
try: |
|
|
pipeline = ProcessingPipeline(config) |
|
|
|
|
|
|
|
|
pipeline.process_image(image, None) |
|
|
|
|
|
|
|
|
times = [] |
|
|
memory_usage = [] |
|
|
|
|
|
for _ in range(num_iterations): |
|
|
start_mem = psutil.Process().memory_info().rss / (1024**2) |
|
|
start_time = time.time() |
|
|
|
|
|
result = pipeline.process_image(image, None) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
end_mem = psutil.Process().memory_info().rss / (1024**2) |
|
|
|
|
|
times.append(elapsed) |
|
|
memory_usage.append(end_mem - start_mem) |
|
|
|
|
|
|
|
|
result_data = { |
|
|
'size': f"{size[0]}x{size[1]}", |
|
|
'quality': quality, |
|
|
'avg_time': statistics.mean(times), |
|
|
'std_time': statistics.stdev(times) if len(times) > 1 else 0, |
|
|
'min_time': min(times), |
|
|
'max_time': max(times), |
|
|
'fps': 1.0 / statistics.mean(times), |
|
|
'avg_memory_mb': statistics.mean(memory_usage) |
|
|
} |
|
|
|
|
|
results['results'].append(result_data) |
|
|
print(f" Average: {result_data['avg_time']:.3f}s ({result_data['fps']:.1f} FPS)") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" Failed: {str(e)}") |
|
|
results['results'].append({ |
|
|
'size': f"{size[0]}x{size[1]}", |
|
|
'quality': quality, |
|
|
'error': str(e) |
|
|
}) |
|
|
|
|
|
self.results['benchmarks'].append(results) |
|
|
return results |
|
|
|
|
|
def benchmark_model_loading(self) -> Dict[str, Any]: |
|
|
"""Benchmark model loading times.""" |
|
|
print("\n=== Model Loading Benchmark ===") |
|
|
|
|
|
results = { |
|
|
'test': 'model_loading', |
|
|
'results': [] |
|
|
} |
|
|
|
|
|
registry = ModelRegistry() |
|
|
loader = ModelLoader(registry, device='cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
|
|
|
|
|
models_to_test = ['rmbg-1.4', 'u2netp', 'modnet'] |
|
|
|
|
|
for model_id in models_to_test: |
|
|
print(f"Loading {model_id}...") |
|
|
|
|
|
|
|
|
loader.unload_all() |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
start_mem = psutil.Process().memory_info().rss / (1024**2) |
|
|
|
|
|
try: |
|
|
loaded = loader.load_model(model_id) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
end_mem = psutil.Process().memory_info().rss / (1024**2) |
|
|
|
|
|
if loaded: |
|
|
result_data = { |
|
|
'model': model_id, |
|
|
'load_time': elapsed, |
|
|
'memory_usage_mb': end_mem - start_mem, |
|
|
'device': loaded.device |
|
|
} |
|
|
print(f" Loaded in {elapsed:.2f}s, Memory: {end_mem - start_mem:.1f}MB") |
|
|
else: |
|
|
result_data = { |
|
|
'model': model_id, |
|
|
'error': 'Failed to load' |
|
|
} |
|
|
print(f" Failed to load") |
|
|
|
|
|
except Exception as e: |
|
|
result_data = { |
|
|
'model': model_id, |
|
|
'error': str(e) |
|
|
} |
|
|
print(f" Error: {str(e)}") |
|
|
|
|
|
results['results'].append(result_data) |
|
|
|
|
|
self.results['benchmarks'].append(results) |
|
|
return results |
|
|
|
|
|
def benchmark_video_processing(self, |
|
|
duration: int = 5, |
|
|
fps: int = 30, |
|
|
size: tuple = (1280, 720)) -> Dict[str, Any]: |
|
|
"""Benchmark video processing performance.""" |
|
|
print("\n=== Video Processing Benchmark ===") |
|
|
|
|
|
results = { |
|
|
'test': 'video_processing', |
|
|
'video_specs': { |
|
|
'duration': duration, |
|
|
'fps': fps, |
|
|
'size': f"{size[0]}x{size[1]}", |
|
|
'total_frames': duration * fps |
|
|
}, |
|
|
'results': [] |
|
|
} |
|
|
|
|
|
|
|
|
import tempfile |
|
|
video_path = Path(tempfile.mkdtemp()) / "test_video.mp4" |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(str(video_path), fourcc, fps, size) |
|
|
|
|
|
print(f"Creating test video: {duration}s @ {fps}fps, {size[0]}x{size[1]}") |
|
|
for i in range(duration * fps): |
|
|
frame = np.random.randint(0, 255, (*size[::-1], 3), dtype=np.uint8) |
|
|
|
|
|
x = int((i / (duration * fps)) * size[0]) |
|
|
cv2.rectangle(frame, (x, 100), (x + 100, 200), (0, 255, 0), -1) |
|
|
out.write(frame) |
|
|
out.release() |
|
|
|
|
|
|
|
|
for quality in ['low', 'medium', 'high']: |
|
|
print(f"Processing at {quality} quality...") |
|
|
|
|
|
from api import VideoProcessorAPI |
|
|
processor = VideoProcessorAPI() |
|
|
|
|
|
start_time = time.time() |
|
|
start_mem = psutil.Process().memory_info().rss / (1024**2) |
|
|
|
|
|
try: |
|
|
output_path = video_path.parent / f"output_{quality}.mp4" |
|
|
stats = processor.process_video( |
|
|
str(video_path), |
|
|
str(output_path), |
|
|
background=None |
|
|
) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
end_mem = psutil.Process().memory_info().rss / (1024**2) |
|
|
|
|
|
result_data = { |
|
|
'quality': quality, |
|
|
'total_time': elapsed, |
|
|
'frames_processed': stats.frames_processed, |
|
|
'processing_fps': stats.processing_fps, |
|
|
'time_per_frame': elapsed / stats.frames_processed if stats.frames_processed > 0 else 0, |
|
|
'memory_usage_mb': end_mem - start_mem |
|
|
} |
|
|
|
|
|
print(f" Processed in {elapsed:.2f}s @ {stats.processing_fps:.1f} FPS") |
|
|
|
|
|
except Exception as e: |
|
|
result_data = { |
|
|
'quality': quality, |
|
|
'error': str(e) |
|
|
} |
|
|
print(f" Failed: {str(e)}") |
|
|
|
|
|
results['results'].append(result_data) |
|
|
|
|
|
|
|
|
video_path.unlink(missing_ok=True) |
|
|
|
|
|
self.results['benchmarks'].append(results) |
|
|
return results |
|
|
|
|
|
def benchmark_batch_processing(self, |
|
|
batch_sizes: List[int] = None, |
|
|
num_workers_list: List[int] = None) -> Dict[str, Any]: |
|
|
"""Benchmark batch processing performance.""" |
|
|
print("\n=== Batch Processing Benchmark ===") |
|
|
|
|
|
batch_sizes = batch_sizes or [1, 5, 10, 20] |
|
|
num_workers_list = num_workers_list or [1, 2, 4, 8] |
|
|
|
|
|
results = { |
|
|
'test': 'batch_processing', |
|
|
'results': [] |
|
|
} |
|
|
|
|
|
|
|
|
test_images = [] |
|
|
for i in range(max(batch_sizes)): |
|
|
img = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8) |
|
|
test_images.append(img) |
|
|
|
|
|
for batch_size in batch_sizes: |
|
|
for num_workers in num_workers_list: |
|
|
print(f"Testing batch_size={batch_size}, workers={num_workers}...") |
|
|
|
|
|
config = PipelineConfig( |
|
|
batch_size=batch_size, |
|
|
num_workers=num_workers, |
|
|
use_gpu=torch.cuda.is_available(), |
|
|
enable_cache=False |
|
|
) |
|
|
|
|
|
try: |
|
|
pipeline = ProcessingPipeline(config) |
|
|
|
|
|
start_time = time.time() |
|
|
results_batch = pipeline.process_batch(test_images[:batch_size]) |
|
|
elapsed = time.time() - start_time |
|
|
|
|
|
successful = sum(1 for r in results_batch if r.success) |
|
|
|
|
|
result_data = { |
|
|
'batch_size': batch_size, |
|
|
'num_workers': num_workers, |
|
|
'total_time': elapsed, |
|
|
'time_per_image': elapsed / batch_size, |
|
|
'throughput': batch_size / elapsed, |
|
|
'successful': successful |
|
|
} |
|
|
|
|
|
print(f" {elapsed:.2f}s total, {result_data['throughput']:.1f} images/sec") |
|
|
|
|
|
except Exception as e: |
|
|
result_data = { |
|
|
'batch_size': batch_size, |
|
|
'num_workers': num_workers, |
|
|
'error': str(e) |
|
|
} |
|
|
print(f" Failed: {str(e)}") |
|
|
|
|
|
results['results'].append(result_data) |
|
|
|
|
|
self.results['benchmarks'].append(results) |
|
|
return results |
|
|
|
|
|
def save_results(self): |
|
|
"""Save benchmark results to file.""" |
|
|
with open(self.output_file, 'w') as f: |
|
|
json.dump(self.results, f, indent=2) |
|
|
print(f"\nResults saved to: {self.output_file}") |
|
|
|
|
|
def print_summary(self): |
|
|
"""Print benchmark summary.""" |
|
|
print("\n" + "="*50) |
|
|
print("BENCHMARK SUMMARY") |
|
|
print("="*50) |
|
|
|
|
|
for benchmark in self.results['benchmarks']: |
|
|
print(f"\n{benchmark['test'].upper()}:") |
|
|
|
|
|
if 'results' in benchmark: |
|
|
for result in benchmark['results']: |
|
|
if 'error' not in result: |
|
|
if benchmark['test'] == 'image_processing': |
|
|
print(f" {result['size']} @ {result['quality']}: {result['fps']:.1f} FPS") |
|
|
elif benchmark['test'] == 'model_loading': |
|
|
print(f" {result['model']}: {result['load_time']:.2f}s") |
|
|
elif benchmark['test'] == 'video_processing': |
|
|
print(f" {result['quality']}: {result['processing_fps']:.1f} FPS") |
|
|
elif benchmark['test'] == 'batch_processing': |
|
|
print(f" Batch {result['batch_size']} x {result['num_workers']} workers: {result['throughput']:.1f} img/s") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main benchmark function.""" |
|
|
parser = argparse.ArgumentParser(description='BackgroundFX Pro Performance Benchmark') |
|
|
parser.add_argument('--tests', nargs='+', |
|
|
choices=['image', 'model', 'video', 'batch', 'all'], |
|
|
default=['all'], |
|
|
help='Tests to run') |
|
|
parser.add_argument('--output', '-o', help='Output file for results') |
|
|
parser.add_argument('--iterations', '-i', type=int, default=5, |
|
|
help='Number of iterations for each test') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
benchmarker = Benchmarker(args.output) |
|
|
|
|
|
tests_to_run = args.tests |
|
|
if 'all' in tests_to_run: |
|
|
tests_to_run = ['image', 'model', 'video', 'batch'] |
|
|
|
|
|
print("BackgroundFX Pro Performance Benchmark") |
|
|
print("="*50) |
|
|
print("System Information:") |
|
|
print(f" CPU: {benchmarker.results['system_info']['cpu']['model']}") |
|
|
print(f" Memory: {benchmarker.results['system_info']['memory']['total_gb']:.1f}GB") |
|
|
if benchmarker.results['system_info']['cuda_available']: |
|
|
print(f" GPU: {benchmarker.results['system_info']['gpu']['name']}") |
|
|
else: |
|
|
print(" GPU: Not available") |
|
|
|
|
|
|
|
|
if 'image' in tests_to_run: |
|
|
benchmarker.benchmark_image_processing(num_iterations=args.iterations) |
|
|
|
|
|
if 'model' in tests_to_run: |
|
|
benchmarker.benchmark_model_loading() |
|
|
|
|
|
if 'video' in tests_to_run: |
|
|
benchmarker.benchmark_video_processing() |
|
|
|
|
|
if 'batch' in tests_to_run: |
|
|
benchmarker.benchmark_batch_processing() |
|
|
|
|
|
|
|
|
benchmarker.save_results() |
|
|
benchmarker.print_summary() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |