""" Tests for the processing pipeline. """ import pytest import numpy as np import cv2 from unittest.mock import Mock, patch, MagicMock from pathlib import Path from api.pipeline import ( ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode, PipelineStage ) class TestPipelineConfig: """Test pipeline configuration.""" def test_default_config(self): """Test default configuration values.""" config = PipelineConfig() assert config.mode == ProcessingMode.PHOTO assert config.quality_preset == "high" assert config.use_gpu == True assert config.enable_cache == True def test_custom_config(self): """Test custom configuration.""" config = PipelineConfig( mode=ProcessingMode.VIDEO, quality_preset="ultra", use_gpu=False, batch_size=4 ) assert config.mode == ProcessingMode.VIDEO assert config.quality_preset == "ultra" assert config.use_gpu == False assert config.batch_size == 4 class TestProcessingPipeline: """Test the main processing pipeline.""" @pytest.fixture def mock_pipeline(self, pipeline_config): """Create a pipeline with mocked components.""" with patch('api.pipeline.ModelFactory') as mock_factory: with patch('api.pipeline.DeviceManager') as mock_device: mock_device.return_value.get_device.return_value = 'cpu' mock_factory.return_value.load_model.return_value = Mock() pipeline = ProcessingPipeline(pipeline_config) return pipeline def test_pipeline_initialization(self, mock_pipeline): """Test pipeline initialization.""" assert mock_pipeline is not None assert mock_pipeline.config is not None assert mock_pipeline.current_stage == PipelineStage.INITIALIZATION def test_process_image_success(self, mock_pipeline, sample_image, sample_background): """Test successful image processing.""" # Mock the processing methods mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) result = mock_pipeline.process_image(sample_image, sample_background) assert result is not None assert isinstance(result, PipelineResult) assert result.success == True assert result.output_image is not None def test_process_image_with_effects(self, mock_pipeline, sample_image): """Test image processing with effects.""" mock_pipeline.config.apply_effects = ['bokeh', 'vignette'] # Mock processing mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) result = mock_pipeline.process_image(sample_image, None) assert result is not None assert result.success == True def test_process_image_failure(self, mock_pipeline, sample_image): """Test image processing failure handling.""" # Mock segmentation to fail mock_pipeline._segment_image = Mock(side_effect=Exception("Segmentation failed")) result = mock_pipeline.process_image(sample_image, None) assert result is not None assert result.success == False assert len(result.errors) > 0 @pytest.mark.parametrize("quality", ["low", "medium", "high", "ultra"]) def test_quality_presets(self, mock_pipeline, sample_image, quality): """Test different quality presets.""" mock_pipeline.config.quality_preset = quality # Mock processing mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) result = mock_pipeline.process_image(sample_image, None) assert result is not None assert result.success == True def test_batch_processing(self, mock_pipeline, sample_image): """Test batch processing of multiple images.""" images = [sample_image] * 3 # Mock processing mock_pipeline.process_image = Mock(return_value=PipelineResult( success=True, output_image=sample_image, quality_score=0.9 )) results = mock_pipeline.process_batch(images) assert len(results) == 3 assert all(r.success for r in results) def test_progress_callback(self, mock_pipeline, sample_image): """Test progress callback functionality.""" progress_values = [] def progress_callback(value, message): progress_values.append(value) mock_pipeline.config.progress_callback = progress_callback # Mock processing mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) result = mock_pipeline.process_image(sample_image, None) assert len(progress_values) > 0 assert 0.0 <= max(progress_values) <= 1.0 def test_cache_functionality(self, mock_pipeline, sample_image): """Test caching functionality.""" mock_pipeline.config.enable_cache = True # Mock processing mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) # First call result1 = mock_pipeline.process_image(sample_image, None) # Second call (should use cache) result2 = mock_pipeline.process_image(sample_image, None) assert result1.success == result2.success # Verify segmentation was only called once (cache hit on second call) assert mock_pipeline._segment_image.call_count == 1 def test_memory_management(self, mock_pipeline): """Test memory management and cleanup.""" initial_cache_size = len(mock_pipeline.cache) # Process multiple images to fill cache for i in range(10): image = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8) mock_pipeline.cache[f"test_{i}"] = PipelineResult(success=True) # Clear cache mock_pipeline.clear_cache() assert len(mock_pipeline.cache) == 0 def test_statistics_tracking(self, mock_pipeline, sample_image): """Test statistics tracking.""" # Mock processing mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) # Process image result = mock_pipeline.process_image(sample_image, None) # Get statistics stats = mock_pipeline.get_statistics() assert 'total_processed' in stats assert stats['total_processed'] > 0 assert 'avg_time' in stats class TestPipelineIntegration: """Integration tests for the pipeline.""" @pytest.mark.integration @pytest.mark.slow def test_end_to_end_processing(self, sample_image, sample_background, temp_dir): """Test end-to-end processing pipeline.""" config = PipelineConfig( use_gpu=False, quality_preset="medium", enable_cache=False ) # Create pipeline (will use real components if available) try: pipeline = ProcessingPipeline(config) except Exception: pytest.skip("Models not available for integration test") # Process image result = pipeline.process_image(sample_image, sample_background) if result.success: assert result.output_image is not None assert result.output_image.shape == sample_image.shape assert result.quality_score > 0 # Save output output_path = temp_dir / "test_output.png" cv2.imwrite(str(output_path), result.output_image) assert output_path.exists() @pytest.mark.integration @pytest.mark.slow def test_video_frame_processing(self, sample_video, temp_dir): """Test processing video frames.""" config = PipelineConfig( mode=ProcessingMode.VIDEO, use_gpu=False, quality_preset="low" ) try: pipeline = ProcessingPipeline(config) except Exception: pytest.skip("Models not available for integration test") # Open video cap = cv2.VideoCapture(sample_video) processed_frames = [] # Process first 5 frames for i in range(5): ret, frame = cap.read() if not ret: break result = pipeline.process_image(frame, None) if result.success: processed_frames.append(result.output_image) cap.release() assert len(processed_frames) > 0 # Save as video if processed_frames: output_path = temp_dir / "test_video_out.mp4" fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(output_path), fourcc, 30.0, (processed_frames[0].shape[1], processed_frames[0].shape[0])) for frame in processed_frames: out.write(frame) out.release() assert output_path.exists() class TestPipelinePerformance: """Performance tests for the pipeline.""" @pytest.mark.slow def test_processing_speed(self, mock_pipeline, sample_image, performance_timer): """Test processing speed.""" # Mock processing mock_pipeline._segment_image = Mock(return_value=np.ones((512, 512), dtype=np.uint8) * 255) mock_pipeline.alpha_matting.process = Mock(return_value={ 'alpha': np.ones((512, 512), dtype=np.float32), 'confidence': 0.95 }) with performance_timer as timer: result = mock_pipeline.process_image(sample_image, None) assert result.success == True assert timer.elapsed < 1.0 # Should process in under 1 second @pytest.mark.slow def test_batch_processing_speed(self, mock_pipeline, sample_image, performance_timer): """Test batch processing speed.""" images = [sample_image] * 10 # Mock processing mock_pipeline.process_image = Mock(return_value=PipelineResult( success=True, output_image=sample_image, quality_score=0.9 )) with performance_timer as timer: results = mock_pipeline.process_batch(images) assert len(results) == 10 assert timer.elapsed < 5.0 # Should process 10 images in under 5 seconds def test_memory_usage(self, mock_pipeline, sample_image): """Test memory usage during processing.""" import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB # Process multiple images for _ in range(10): mock_pipeline.process_image(sample_image, None) final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = final_memory - initial_memory # Memory increase should be reasonable (less than 500MB for 10 images) assert memory_increase < 500