|
|
""" |
|
|
Tests for the processing pipeline. |
|
|
""" |
|
|
|
|
|
import pytest |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from unittest.mock import Mock, patch, MagicMock |
|
|
from pathlib import Path |
|
|
|
|
|
from api.pipeline import ( |
|
|
ProcessingPipeline, |
|
|
PipelineConfig, |
|
|
PipelineResult, |
|
|
ProcessingMode, |
|
|
PipelineStage |
|
|
) |
|
|
|
|
|
|
|
|
class TestPipelineConfig: |
|
|
"""Test pipeline configuration.""" |
|
|
|
|
|
def test_default_config(self): |
|
|
"""Test default configuration values.""" |
|
|
config = PipelineConfig() |
|
|
assert config.mode == ProcessingMode.PHOTO |
|
|
assert config.quality_preset == "high" |
|
|
assert config.use_gpu == True |
|
|
assert config.enable_cache == True |
|
|
|
|
|
def test_custom_config(self): |
|
|
"""Test custom configuration.""" |
|
|
config = PipelineConfig( |
|
|
mode=ProcessingMode.VIDEO, |
|
|
quality_preset="ultra", |
|
|
use_gpu=False, |
|
|
batch_size=4 |
|
|
) |
|
|
assert config.mode == ProcessingMode.VIDEO |
|
|
assert config.quality_preset == "ultra" |
|
|
assert config.use_gpu == False |
|
|
assert config.batch_size == 4 |
|
|
|
|
|
|
|
|
class TestProcessingPipeline: |
|
|
"""Test the main processing pipeline.""" |
|
|
|
|
|
@pytest.fixture |
|
|
def mock_pipeline(self, pipeline_config): |
|
|
"""Create a pipeline with mocked components.""" |
|
|
with patch('api.pipeline.ModelFactory') as mock_factory: |
|
|
with patch('api.pipeline.DeviceManager') as mock_device: |
|
|
mock_device.return_value.get_device.return_value = 'cpu' |
|
|
mock_factory.return_value.load_model.return_value = Mock() |
|
|
|
|
|
pipeline = ProcessingPipeline(pipeline_config) |
|
|
return pipeline |
|
|
|
|
|
def test_pipeline_initialization(self, mock_pipeline): |
|
|
"""Test pipeline initialization.""" |
|
|
assert mock_pipeline is not None |
|
|
assert mock_pipeline.config is not None |
|
|
assert mock_pipeline.current_stage == PipelineStage.INITIALIZATION |
|
|
|
|
|
def test_process_image_success(self, mock_pipeline, sample_image, sample_background): |
|
|
"""Test successful image processing.""" |
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
result = mock_pipeline.process_image(sample_image, sample_background) |
|
|
|
|
|
assert result is not None |
|
|
assert isinstance(result, PipelineResult) |
|
|
assert result.success == True |
|
|
assert result.output_image is not None |
|
|
|
|
|
def test_process_image_with_effects(self, mock_pipeline, sample_image): |
|
|
"""Test image processing with effects.""" |
|
|
mock_pipeline.config.apply_effects = ['bokeh', 'vignette'] |
|
|
|
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
result = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
assert result is not None |
|
|
assert result.success == True |
|
|
|
|
|
def test_process_image_failure(self, mock_pipeline, sample_image): |
|
|
"""Test image processing failure handling.""" |
|
|
|
|
|
mock_pipeline._segment_image = Mock(side_effect=Exception("Segmentation failed")) |
|
|
|
|
|
result = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
assert result is not None |
|
|
assert result.success == False |
|
|
assert len(result.errors) > 0 |
|
|
|
|
|
@pytest.mark.parametrize("quality", ["low", "medium", "high", "ultra"]) |
|
|
def test_quality_presets(self, mock_pipeline, sample_image, quality): |
|
|
"""Test different quality presets.""" |
|
|
mock_pipeline.config.quality_preset = quality |
|
|
|
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
result = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
assert result is not None |
|
|
assert result.success == True |
|
|
|
|
|
def test_batch_processing(self, mock_pipeline, sample_image): |
|
|
"""Test batch processing of multiple images.""" |
|
|
images = [sample_image] * 3 |
|
|
|
|
|
|
|
|
mock_pipeline.process_image = Mock(return_value=PipelineResult( |
|
|
success=True, |
|
|
output_image=sample_image, |
|
|
quality_score=0.9 |
|
|
)) |
|
|
|
|
|
results = mock_pipeline.process_batch(images) |
|
|
|
|
|
assert len(results) == 3 |
|
|
assert all(r.success for r in results) |
|
|
|
|
|
def test_progress_callback(self, mock_pipeline, sample_image): |
|
|
"""Test progress callback functionality.""" |
|
|
progress_values = [] |
|
|
|
|
|
def progress_callback(value, message): |
|
|
progress_values.append(value) |
|
|
|
|
|
mock_pipeline.config.progress_callback = progress_callback |
|
|
|
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
result = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
assert len(progress_values) > 0 |
|
|
assert 0.0 <= max(progress_values) <= 1.0 |
|
|
|
|
|
def test_cache_functionality(self, mock_pipeline, sample_image): |
|
|
"""Test caching functionality.""" |
|
|
mock_pipeline.config.enable_cache = True |
|
|
|
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
|
|
|
result1 = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
|
|
|
result2 = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
assert result1.success == result2.success |
|
|
|
|
|
assert mock_pipeline._segment_image.call_count == 1 |
|
|
|
|
|
def test_memory_management(self, mock_pipeline): |
|
|
"""Test memory management and cleanup.""" |
|
|
initial_cache_size = len(mock_pipeline.cache) |
|
|
|
|
|
|
|
|
for i in range(10): |
|
|
image = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8) |
|
|
mock_pipeline.cache[f"test_{i}"] = PipelineResult(success=True) |
|
|
|
|
|
|
|
|
mock_pipeline.clear_cache() |
|
|
|
|
|
assert len(mock_pipeline.cache) == 0 |
|
|
|
|
|
def test_statistics_tracking(self, mock_pipeline, sample_image): |
|
|
"""Test statistics tracking.""" |
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
|
|
|
result = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
|
|
|
stats = mock_pipeline.get_statistics() |
|
|
|
|
|
assert 'total_processed' in stats |
|
|
assert stats['total_processed'] > 0 |
|
|
assert 'avg_time' in stats |
|
|
|
|
|
|
|
|
class TestPipelineIntegration: |
|
|
"""Integration tests for the pipeline.""" |
|
|
|
|
|
@pytest.mark.integration |
|
|
@pytest.mark.slow |
|
|
def test_end_to_end_processing(self, sample_image, sample_background, temp_dir): |
|
|
"""Test end-to-end processing pipeline.""" |
|
|
config = PipelineConfig( |
|
|
use_gpu=False, |
|
|
quality_preset="medium", |
|
|
enable_cache=False |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
pipeline = ProcessingPipeline(config) |
|
|
except Exception: |
|
|
pytest.skip("Models not available for integration test") |
|
|
|
|
|
|
|
|
result = pipeline.process_image(sample_image, sample_background) |
|
|
|
|
|
if result.success: |
|
|
assert result.output_image is not None |
|
|
assert result.output_image.shape == sample_image.shape |
|
|
assert result.quality_score > 0 |
|
|
|
|
|
|
|
|
output_path = temp_dir / "test_output.png" |
|
|
cv2.imwrite(str(output_path), result.output_image) |
|
|
assert output_path.exists() |
|
|
|
|
|
@pytest.mark.integration |
|
|
@pytest.mark.slow |
|
|
def test_video_frame_processing(self, sample_video, temp_dir): |
|
|
"""Test processing video frames.""" |
|
|
config = PipelineConfig( |
|
|
mode=ProcessingMode.VIDEO, |
|
|
use_gpu=False, |
|
|
quality_preset="low" |
|
|
) |
|
|
|
|
|
try: |
|
|
pipeline = ProcessingPipeline(config) |
|
|
except Exception: |
|
|
pytest.skip("Models not available for integration test") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(sample_video) |
|
|
processed_frames = [] |
|
|
|
|
|
|
|
|
for i in range(5): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
result = pipeline.process_image(frame, None) |
|
|
if result.success: |
|
|
processed_frames.append(result.output_image) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
assert len(processed_frames) > 0 |
|
|
|
|
|
|
|
|
if processed_frames: |
|
|
output_path = temp_dir / "test_video_out.mp4" |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(str(output_path), fourcc, 30.0, |
|
|
(processed_frames[0].shape[1], processed_frames[0].shape[0])) |
|
|
|
|
|
for frame in processed_frames: |
|
|
out.write(frame) |
|
|
|
|
|
out.release() |
|
|
assert output_path.exists() |
|
|
|
|
|
|
|
|
class TestPipelinePerformance: |
|
|
"""Performance tests for the pipeline.""" |
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_processing_speed(self, mock_pipeline, sample_image, performance_timer): |
|
|
"""Test processing speed.""" |
|
|
|
|
|
mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) |
|
|
mock_pipeline.alpha_matting.process = Mock(return_value={ |
|
|
'alpha': np.ones((512, 512), dtype=np.float32), |
|
|
'confidence': 0.95 |
|
|
}) |
|
|
|
|
|
with performance_timer as timer: |
|
|
result = mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
assert result.success == True |
|
|
assert timer.elapsed < 1.0 |
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_batch_processing_speed(self, mock_pipeline, sample_image, performance_timer): |
|
|
"""Test batch processing speed.""" |
|
|
images = [sample_image] * 10 |
|
|
|
|
|
|
|
|
mock_pipeline.process_image = Mock(return_value=PipelineResult( |
|
|
success=True, |
|
|
output_image=sample_image, |
|
|
quality_score=0.9 |
|
|
)) |
|
|
|
|
|
with performance_timer as timer: |
|
|
results = mock_pipeline.process_batch(images) |
|
|
|
|
|
assert len(results) == 10 |
|
|
assert timer.elapsed < 5.0 |
|
|
|
|
|
def test_memory_usage(self, mock_pipeline, sample_image): |
|
|
"""Test memory usage during processing.""" |
|
|
import psutil |
|
|
import os |
|
|
|
|
|
process = psutil.Process(os.getpid()) |
|
|
initial_memory = process.memory_info().rss / 1024 / 1024 |
|
|
|
|
|
|
|
|
for _ in range(10): |
|
|
mock_pipeline.process_image(sample_image, None) |
|
|
|
|
|
final_memory = process.memory_info().rss / 1024 / 1024 |
|
|
memory_increase = final_memory - initial_memory |
|
|
|
|
|
|
|
|
assert memory_increase < 500 |