|
|
|
|
|
""" |
|
|
Smoke test for two-stage video processing |
|
|
THIS IS A NEW FILE - Basic end-to-end test |
|
|
Tests quality profiles, frame count preservation, and basic functionality |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import tempfile |
|
|
import logging |
|
|
import time |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
from processing.two_stage.two_stage_processor import TwoStageProcessor |
|
|
from models.loaders.matanyone_loader import MatAnyoneLoader |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def create_test_video(path: str, frames: int = 30, fps: int = 30): |
|
|
"""Create a simple test video with a moving circle (simulating a person)""" |
|
|
width, height = 640, 480 |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(path, fourcc, fps, (width, height)) |
|
|
|
|
|
if not out.isOpened(): |
|
|
raise RuntimeError(f"Failed to create test video at {path}") |
|
|
|
|
|
for i in range(frames): |
|
|
|
|
|
frame = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
frame[:] = (30, 30, 30) |
|
|
|
|
|
|
|
|
x = int(width/2 + 100 * np.sin(i * 0.2)) |
|
|
y = int(height/2 + 50 * np.cos(i * 0.15)) |
|
|
cv2.circle(frame, (x, y), 60, (255, 255, 255), -1) |
|
|
|
|
|
|
|
|
cv2.circle(frame, (x, y-20), 20, (200, 100, 100), -1) |
|
|
|
|
|
out.write(frame) |
|
|
|
|
|
out.release() |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(path) |
|
|
actual_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
cap.release() |
|
|
|
|
|
logger.info(f"Created test video: {path} ({actual_frames} frames)") |
|
|
return actual_frames |
|
|
|
|
|
|
|
|
def verify_output_video(path: str, expected_frames: int) -> bool: |
|
|
"""Verify output video exists and has correct frame count""" |
|
|
if not os.path.exists(path): |
|
|
logger.error(f"Output video not found: {path}") |
|
|
return False |
|
|
|
|
|
file_size = os.path.getsize(path) |
|
|
if file_size < 1000: |
|
|
logger.error(f"Output video too small: {file_size} bytes") |
|
|
return False |
|
|
|
|
|
cap = cv2.VideoCapture(path) |
|
|
if not cap.isOpened(): |
|
|
logger.error(f"Cannot open output video: {path}") |
|
|
return False |
|
|
|
|
|
actual_frames = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
actual_frames += 1 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
if actual_frames != expected_frames: |
|
|
logger.error(f"Frame count mismatch: got {actual_frames}, expected {expected_frames}") |
|
|
return False |
|
|
|
|
|
logger.info(f"Output verified: {actual_frames} frames, {file_size:,} bytes") |
|
|
return True |
|
|
|
|
|
|
|
|
def test_quality_profiles(): |
|
|
"""Test that different quality profiles produce different results""" |
|
|
logger.info("="*60) |
|
|
logger.info("Testing Quality Profiles") |
|
|
logger.info("="*60) |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
tmpdir = Path(tmpdir) |
|
|
|
|
|
|
|
|
test_video = tmpdir / "test_input.mp4" |
|
|
expected_frames = create_test_video(str(test_video), frames=30, fps=30) |
|
|
|
|
|
|
|
|
background = np.ones((480, 640, 3), dtype=np.uint8) * 128 |
|
|
background[:240, :] = (100, 150, 200) |
|
|
|
|
|
results = {} |
|
|
|
|
|
for quality in ["speed", "balanced", "max"]: |
|
|
logger.info(f"\nTesting quality mode: {quality}") |
|
|
logger.info("-" * 40) |
|
|
|
|
|
|
|
|
os.environ["BFX_QUALITY"] = quality |
|
|
|
|
|
|
|
|
processor = TwoStageProcessor( |
|
|
sam2_predictor=None, |
|
|
matanyone_model=None |
|
|
) |
|
|
|
|
|
|
|
|
output_path = tmpdir / f"output_{quality}.mp4" |
|
|
start_time = time.time() |
|
|
|
|
|
result, message = processor.process_full_pipeline( |
|
|
video_path=str(test_video), |
|
|
background=background, |
|
|
output_path=str(output_path), |
|
|
key_color_mode="auto", |
|
|
chroma_settings=None, |
|
|
progress_callback=lambda p, d: logger.debug(f"{p:.1%}: {d}"), |
|
|
stop_event=None |
|
|
) |
|
|
|
|
|
process_time = time.time() - start_time |
|
|
|
|
|
if result is None: |
|
|
logger.error(f"Processing failed for {quality}: {message}") |
|
|
continue |
|
|
|
|
|
|
|
|
if verify_output_video(result, expected_frames): |
|
|
results[quality] = { |
|
|
"success": True, |
|
|
"time": process_time, |
|
|
"frames_refined": processor.frames_refined, |
|
|
"total_frames": processor.total_frames_processed |
|
|
} |
|
|
logger.info(f"β {quality}: {process_time:.2f}s, " |
|
|
f"{processor.frames_refined}/{processor.total_frames_processed} refined") |
|
|
else: |
|
|
results[quality] = {"success": False} |
|
|
logger.error(f"β {quality}: verification failed") |
|
|
|
|
|
|
|
|
logger.info("\n" + "="*60) |
|
|
logger.info("SUMMARY") |
|
|
logger.info("="*60) |
|
|
|
|
|
all_passed = all(r.get("success", False) for r in results.values()) |
|
|
|
|
|
if all_passed: |
|
|
|
|
|
if len(results) >= 2: |
|
|
times = [r["time"] for r in results.values() if "time" in r] |
|
|
refined_counts = [r["frames_refined"] for r in results.values() if "frames_refined" in r] |
|
|
|
|
|
if len(set(refined_counts)) > 1: |
|
|
logger.info("β Quality profiles show different refinement counts") |
|
|
else: |
|
|
logger.warning("β All quality profiles refined same number of frames") |
|
|
|
|
|
if max(times) - min(times) > 0.1: |
|
|
logger.info("β Quality profiles show different processing times") |
|
|
else: |
|
|
logger.warning("β Quality profiles have similar processing times") |
|
|
|
|
|
for quality, result in results.items(): |
|
|
if result.get("success"): |
|
|
logger.info(f"β {quality:8s}: {result['time']:.2f}s, " |
|
|
f"{result['frames_refined']}/{result['total_frames']} frames refined") |
|
|
else: |
|
|
logger.info(f"β {quality:8s}: FAILED") |
|
|
|
|
|
return all_passed |
|
|
|
|
|
|
|
|
def test_frame_preservation(): |
|
|
"""Test that no frames are lost during processing""" |
|
|
logger.info("\n" + "="*60) |
|
|
logger.info("Testing Frame Preservation") |
|
|
logger.info("="*60) |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
tmpdir = Path(tmpdir) |
|
|
|
|
|
|
|
|
test_cases = [10, 25, 30, 60] |
|
|
|
|
|
for frame_count in test_cases: |
|
|
logger.info(f"\nTesting with {frame_count} frames...") |
|
|
|
|
|
test_video = tmpdir / f"test_{frame_count}.mp4" |
|
|
expected = create_test_video(str(test_video), frames=frame_count, fps=30) |
|
|
|
|
|
os.environ["BFX_QUALITY"] = "speed" |
|
|
|
|
|
processor = TwoStageProcessor() |
|
|
output_path = tmpdir / f"output_{frame_count}.mp4" |
|
|
|
|
|
result, message = processor.process_full_pipeline( |
|
|
video_path=str(test_video), |
|
|
background=np.ones((480, 640, 3), dtype=np.uint8) * 100, |
|
|
output_path=str(output_path), |
|
|
key_color_mode="green", |
|
|
) |
|
|
|
|
|
if result and verify_output_video(result, expected): |
|
|
logger.info(f"β {frame_count} frames: preserved correctly") |
|
|
else: |
|
|
logger.error(f"β {frame_count} frames: FAILED") |
|
|
return False |
|
|
|
|
|
logger.info("\nβ All frame preservation tests passed!") |
|
|
return True |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Run all smoke tests""" |
|
|
logger.info("\n" + "π₯"*20) |
|
|
logger.info("BACKGROUNDFX PRO SMOKE TESTS") |
|
|
logger.info("π₯"*20) |
|
|
|
|
|
tests_passed = [] |
|
|
|
|
|
|
|
|
try: |
|
|
tests_passed.append(test_quality_profiles()) |
|
|
except Exception as e: |
|
|
logger.error(f"Quality profile test crashed: {e}") |
|
|
tests_passed.append(False) |
|
|
|
|
|
|
|
|
try: |
|
|
tests_passed.append(test_frame_preservation()) |
|
|
except Exception as e: |
|
|
logger.error(f"Frame preservation test crashed: {e}") |
|
|
tests_passed.append(False) |
|
|
|
|
|
|
|
|
logger.info("\n" + "="*60) |
|
|
if all(tests_passed): |
|
|
logger.info("β
ALL SMOKE TESTS PASSED!") |
|
|
logger.info("="*60) |
|
|
return 0 |
|
|
else: |
|
|
logger.error("β SOME TESTS FAILED") |
|
|
logger.info("="*60) |
|
|
return 1 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit(main()) |