Spaces:
Paused
Paused
| import os | |
| import io | |
| import time | |
| import logging | |
| import sys | |
| from typing import Optional, Dict, Any, List | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image | |
| try: | |
| # Defer heavy imports; may fail if onnxruntime/torch missing | |
| import insightface # type: ignore | |
| from insightface.app import FaceAnalysis # type: ignore | |
| except Exception: | |
| insightface = None # type: ignore | |
| FaceAnalysis = None # type: ignore | |
| logger = logging.getLogger(__name__) | |
| INSWAPPER_ONNX_PATH = os.path.join('models', 'inswapper', 'inswapper_128_fp16.onnx') | |
| ALT_INSWAPPER_PATH = os.path.join('models', 'inswapper', 'inswapper_128.onnx') | |
| CODEFORMER_PATH = os.path.join('models', 'codeformer', 'codeformer.pth') | |
| class FaceSwapPipeline: | |
| """Direct face swap + optional enhancement pipeline. | |
| Lifecycle: | |
| 1. initialize() -> loads detector/recognizer (buffalo_l) and inswapper onnx | |
| 2. set_source_image(image_bytes|np.array) -> extracts source identity face object | |
| 3. process_frame(frame) -> swap all or top-N faces using source face | |
| 4. (optional) CodeFormer enhancement (always attempted if model present) | |
| """ | |
| def __init__(self): | |
| self.initialized = False | |
| self.source_face = None | |
| self.source_img_meta = {} | |
| # Legacy compatibility flags expected by old WebRTC data channel handlers | |
| # 'loaded' previously indicated full reenactment stack ready; here it maps to self.initialized | |
| self.loaded = False | |
| # Single enhancer path: CodeFormer (optional) | |
| self.max_faces = int(os.getenv('MIRAGE_MAX_FACES', '1')) | |
| # CodeFormer config / controls | |
| self.codeformer_enabled = os.getenv('MIRAGE_CODEFORMER_ENABLED', '1').lower() in ('1','true','yes','on') | |
| self.codeformer_frame_stride = int(os.getenv('MIRAGE_CODEFORMER_FRAME_STRIDE', '1') or '1') | |
| if self.codeformer_frame_stride < 1: | |
| self.codeformer_frame_stride = 1 | |
| self.codeformer_face_only = os.getenv('MIRAGE_CODEFORMER_FACE_ONLY', '0').lower() in ('1','true','yes','on') | |
| self.codeformer_face_margin = float(os.getenv('MIRAGE_CODEFORMER_MARGIN', '0.15')) | |
| self._stats = { | |
| 'frames': 0, | |
| 'last_latency_ms': None, | |
| 'avg_latency_ms': None, | |
| 'swap_faces_last': 0, | |
| 'enhanced_frames': 0, | |
| # New diagnostic counters | |
| 'early_no_source': 0, | |
| 'early_uninitialized': 0, | |
| 'frames_no_faces': 0, | |
| 'total_faces_detected': 0, | |
| 'total_faces_swapped': 0, | |
| 'swap_failures': 0, | |
| 'cached_face_reuses': 0, | |
| # Brightness / quality tracking | |
| 'frames_low_brightness': 0, | |
| 'brightness_last': None, | |
| # Similarity metrics | |
| 'last_primary_similarity': None, | |
| } | |
| self._lat_hist: List[float] = [] | |
| self._codeformer_lat_hist: List[float] = [] | |
| self._frame_index = 0 | |
| self._last_faces_cache: List[Any] | None = None | |
| self.app: Optional[FaceAnalysis] = None | |
| self.swapper = None | |
| self.codeformer = None | |
| self.codeformer_fidelity = float(os.getenv('MIRAGE_CODEFORMER_FIDELITY', '0.75')) | |
| self.codeformer_loaded = False | |
| self.codeformer_error: str | None = None | |
| # Debug verbosity for swap decisions | |
| self.swap_debug = os.getenv('MIRAGE_SWAP_DEBUG', '0').lower() in ('1','true','yes','on') | |
| # Brightness compensation configuration | |
| self.enable_brightness_comp = os.getenv('MIRAGE_BRIGHTNESS_COMP', '1').lower() in ('1','true','yes','on') | |
| self.target_brightness = float(os.getenv('MIRAGE_TARGET_BRIGHTNESS', '90')) # mean luminance target (0-255) | |
| self.low_brightness_threshold = float(os.getenv('MIRAGE_LOW_BRIGHTNESS_THRESH', '40')) | |
| # Similarity threshold for logging (cosine similarity typical range [-1,1]) | |
| self.similarity_warn_threshold = float(os.getenv('MIRAGE_SIMILARITY_WARN', '0.15')) | |
| # Temporal reuse configuration | |
| self.face_cache_ttl = int(os.getenv('MIRAGE_FACE_CACHE_TTL', '5') or '5') # frames | |
| self._cached_face = None | |
| self._cached_face_age = 0 | |
| # Aggressive blend toggle for visibility | |
| self.aggressive_blend = os.getenv('MIRAGE_AGGRESSIVE_BLEND', '0').lower() in ('1','true','yes','on') | |
| # Optional face ROI upscaling for tiny faces | |
| self.face_min_size = int(os.getenv('MIRAGE_FACE_MIN_SIZE', '80') or '80') | |
| self.face_upscale_factor = float(os.getenv('MIRAGE_FACE_UPSCALE', '1.6')) | |
| # Toggle for (currently disabled) naive small-face upscale path that caused artifacts | |
| # Default OFF to prevent black rectangle artifacts observed when re-pasting scaled ROI | |
| self.enable_face_upscale = os.getenv('MIRAGE_FACE_UPSCALE_ENABLE', '0').lower() in ('1','true','yes','on') | |
| # Detector preprocessing (CLAHE) low light | |
| self.det_clahe = os.getenv('MIRAGE_DET_CLAHE', '1').lower() in ('1','true','yes','on') | |
| # End-to-end latency markers | |
| self._last_e2e_ms = None | |
| self._e2e_hist: List[float] = [] | |
| # Track model file actually loaded for diagnostics | |
| self.inswapper_model_path: str | None = None | |
| # Resource monitoring | |
| self._gpu_memory_warning_threshold = float(os.getenv('MIRAGE_GPU_MEMORY_WARN_GB', '0.5')) | |
| self._last_memory_check = 0 | |
| self._memory_check_interval = 50 # frames between GPU memory checks | |
| # Periodic structured logging interval (in frames) for runtime diagnostics | |
| try: | |
| # Default every 120 frames unless explicitly disabled (roughly ~4s at 30fps) | |
| self.log_interval = int(os.getenv('MIRAGE_SWAP_LOG_INTERVAL', '120') or '120') | |
| except Exception: | |
| self.log_interval = 0 # disabled if invalid | |
| def _log_periodic(self): | |
| """Emit a concise structured log line every N frames if enabled. | |
| Controlled by MIRAGE_SWAP_LOG_INTERVAL (>0). Keeps log volume bounded while | |
| still providing real-time observability in production (e.g. container logs). | |
| """ | |
| if not self.log_interval or self.log_interval < 1: | |
| return | |
| frames = self._stats['frames'] | |
| if frames == 0 or (frames % self.log_interval) != 0: | |
| return | |
| # Batch access to avoid repeated dict lookups | |
| s = self._stats | |
| lat_last = s['last_latency_ms'] | |
| lat_avg = s['avg_latency_ms'] | |
| brightness = s['brightness_last'] | |
| similarity = s['last_primary_similarity'] | |
| # Pre-format numbers to avoid repeated formatting in join | |
| parts = [ | |
| f"frames={frames}", | |
| f"swap_last={s['swap_faces_last']}", | |
| f"swapped_total={s['total_faces_swapped']}", | |
| f"detected_total={s['total_faces_detected']}", | |
| f"no_face_frames={s['frames_no_faces']}", | |
| f"cache_reuses={s['cached_face_reuses']}", | |
| f"swap_failures={s['swap_failures']}", | |
| f"lat_ms_last={lat_last:.1f}" if lat_last is not None else "lat_ms_last=None", | |
| f"lat_ms_avg={lat_avg:.1f}" if lat_avg is not None else "lat_ms_avg=None", | |
| f"e2e_ms_last={self._last_e2e_ms:.1f}" if self._last_e2e_ms is not None else "e2e_ms_last=None", | |
| f"brightness_last={brightness:.1f}" if brightness is not None else "brightness_last=None", | |
| f"low_brightness_frames={s['frames_low_brightness']}", | |
| f"primary_sim={similarity:.3f}" if similarity is not None else "primary_sim=None", | |
| f"early_uninit={s['early_uninitialized']}", | |
| f"early_no_source={s['early_no_source']}", | |
| f"cf_enhanced={s['enhanced_frames']}", | |
| ] | |
| if self.codeformer_error: | |
| parts.append(f"cf_err={self.codeformer_error}") | |
| logger.info("pipeline_stats " + ' '.join(parts)) | |
| def _check_gpu_memory(self): | |
| """Monitor GPU memory usage to detect resource exhaustion early""" | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| allocated_gb = torch.cuda.memory_allocated() / (1024**3) | |
| reserved_gb = torch.cuda.memory_reserved() / (1024**3) | |
| if allocated_gb > self._gpu_memory_warning_threshold: | |
| logger.warning(f"High GPU memory usage: allocated={allocated_gb:.2f}GB reserved={reserved_gb:.2f}GB") | |
| return False | |
| return True | |
| except Exception: | |
| pass | |
| return True # Assume OK if we can't check | |
| def initialize(self): | |
| if self.initialized: | |
| return True | |
| providers = self._select_providers() | |
| if insightface is None or FaceAnalysis is None: | |
| raise ImportError("insightface (and its deps like onnxruntime) not available. Ensure onnxruntime, onnx, torch installed.") | |
| self.app = FaceAnalysis(name='buffalo_l', providers=providers) | |
| self.app.prepare(ctx_id=0, det_size=(640,640)) | |
| # Capture active providers after prepare (best effort) | |
| try: | |
| self._active_providers = getattr(self.app, 'providers', providers) | |
| except Exception: | |
| self._active_providers = providers | |
| # Load swapper | |
| model_path = INSWAPPER_ONNX_PATH | |
| if not os.path.isfile(model_path): | |
| if os.path.isfile(ALT_INSWAPPER_PATH): | |
| model_path = ALT_INSWAPPER_PATH | |
| else: | |
| raise FileNotFoundError(f"Missing InSwapper model (checked {INSWAPPER_ONNX_PATH} and {ALT_INSWAPPER_PATH})") | |
| self.swapper = insightface.model_zoo.get_model(model_path, providers=providers) | |
| self.inswapper_model_path = model_path | |
| logger.info(f"Loaded InSwapper model: {model_path}") | |
| # Optional CodeFormer enhancer | |
| if self.codeformer_enabled: | |
| self._try_load_codeformer() | |
| self.initialized = True | |
| self.loaded = True # legacy attribute for external checks | |
| logger.info('FaceSwapPipeline initialized') | |
| return True | |
| def _select_providers(self) -> List[str] | None: | |
| """Decide ONNX Runtime providers with GPU preference and diagnostics. | |
| Env controls: | |
| MIRAGE_FORCE_CPU=1 -> force CPU only | |
| MIRAGE_CUDA_ONLY=1 -> request CUDA + CPU fallback (legacy name) | |
| MIRAGE_REQUIRE_GPU=1 -> raise if CUDA provider not available | |
| """ | |
| force_cpu = os.getenv('MIRAGE_FORCE_CPU', '0').lower() in ('1','true','yes','on') | |
| require_gpu = os.getenv('MIRAGE_REQUIRE_GPU', '0').lower() in ('1','true','yes','on') | |
| cuda_only_flag = os.getenv('MIRAGE_CUDA_ONLY', '0').lower() in ('1','true','yes','on') | |
| try: | |
| import onnxruntime as ort # type: ignore | |
| avail = ort.get_available_providers() | |
| try: | |
| ver = ort.__version__ # type: ignore | |
| except Exception: | |
| ver = 'unknown' | |
| logger.info(f"[providers] onnxruntime {ver} available={avail}") | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"ONNX Runtime not importable ({e}); letting insightface choose default providers") | |
| return None | |
| if force_cpu: | |
| logger.info('[providers] MIRAGE_FORCE_CPU=1 -> using CPUExecutionProvider only') | |
| return ['CPUExecutionProvider'] if 'CPUExecutionProvider' in avail else None | |
| providers: List[str] = [] | |
| if 'CUDAExecutionProvider' in avail: | |
| providers.append('CUDAExecutionProvider') | |
| if 'CPUExecutionProvider' in avail: | |
| providers.append('CPUExecutionProvider') | |
| if 'CUDAExecutionProvider' not in providers: | |
| msg = '[providers] CUDAExecutionProvider NOT available; running on CPU' | |
| if require_gpu or cuda_only_flag: | |
| # escalate to warning / potential exception | |
| logger.warning(msg + ' (require_gpu flag set)') | |
| if require_gpu: | |
| raise RuntimeError('GPU required but CUDAExecutionProvider unavailable') | |
| else: | |
| logger.info(msg) | |
| else: | |
| logger.info(f"[providers] Using providers order: {providers}") | |
| return providers or None | |
| def _ensure_repo_clone(self, target_dir: str) -> bool: | |
| """Clone CodeFormer repo shallowly if missing. Returns True if directory exists after call.""" | |
| try: | |
| if os.path.isdir(target_dir) and os.path.isdir(os.path.join(target_dir, '.git')): | |
| return True | |
| import subprocess, shlex | |
| os.makedirs(target_dir, exist_ok=True) | |
| # If directory empty, clone | |
| if not any(os.scandir(target_dir)): | |
| logger.info('Cloning CodeFormer repository (shallow)...') | |
| cmd = f"git clone --depth 1 https://github.com/sczhou/CodeFormer.git {shlex.quote(target_dir)}" | |
| subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return True | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"CodeFormer auto-clone failed: {e}") | |
| return False | |
| def _try_load_codeformer(self): # pragma: no cover (runtime GPU path) | |
| if not os.path.isfile(CODEFORMER_PATH): | |
| logger.warning(f"CodeFormer weight missing; skipping: {CODEFORMER_PATH}") | |
| return | |
| repo_root = os.path.join('models', 'codeformer_repo') | |
| try: | |
| if self._ensure_repo_clone(repo_root): | |
| for extra in (repo_root, os.path.join(repo_root, 'CodeFormer')): | |
| if os.path.isdir(extra) and extra not in sys.path: | |
| sys.path.insert(0, extra) | |
| except Exception as clone_err: # noqa: BLE001 | |
| logger.debug(f"CodeFormer repo clone/setup failed (continuing with installed packages): {clone_err}") | |
| try: | |
| import torch # type: ignore | |
| except Exception: | |
| logger.warning('Torch missing; cannot enable CodeFormer') | |
| self.codeformer_error = 'torch_missing' | |
| return | |
| # Direct import path used by upstream project (packaged when installed) | |
| # Primary expected path: basicsr.archs.* (weights independent). Some forks use codeformer.archs | |
| CodeFormer = None # type: ignore | |
| try: | |
| from basicsr.archs.codeformer_arch import CodeFormer as _CF # type: ignore | |
| CodeFormer = _CF # type: ignore | |
| except Exception as e: | |
| # Second import path attempt; if both fail capture reason | |
| try: | |
| from codeformer.archs.codeformer_arch import CodeFormer as _CF # type: ignore | |
| CodeFormer = _CF # type: ignore | |
| except Exception as e2: | |
| self.codeformer_error = f"import_failed:{e2}" | |
| logger.warning(f"CodeFormer import failed (basicsr & codeformer paths). Skipping enhancement. Root error: {e2}") | |
| return | |
| try: | |
| from basicsr.archs.rrdbnet_arch import RRDBNet # noqa: F401 | |
| except Exception: | |
| # Basicsr usually present (in requirements); if not, can't proceed | |
| logger.warning('basicsr not available; skipping CodeFormer') | |
| return | |
| # facexlib is required for some preprocessing utilities; warn if absent (not fatal for direct arch usage) | |
| try: # pragma: no cover | |
| import facexlib # type: ignore # noqa: F401 | |
| except Exception: | |
| logger.info('facexlib not installed; continuing (may reduce CodeFormer effectiveness)') | |
| try: | |
| import torch | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| net = CodeFormer(dim_embd=512, codebook_size=1024, n_head=8, n_layers=9, | |
| connect_list=['32','64','128','256']).to(device) | |
| ckpt = torch.load(CODEFORMER_PATH, map_location='cpu') | |
| if 'params_ema' in ckpt: | |
| net.load_state_dict(ckpt['params_ema'], strict=False) | |
| else: | |
| # Some weights store under 'state_dict' | |
| net.load_state_dict(ckpt.get('state_dict', ckpt), strict=False) | |
| net.eval() | |
| fidelity = min(max(self.codeformer_fidelity, 0.0), 1.0) | |
| class _CFWrap: | |
| def __init__(self, net, device, fidelity): | |
| self.net = net | |
| self.device = device | |
| self.fidelity = fidelity | |
| def enhance(self, img_bgr: np.ndarray) -> np.ndarray: | |
| import torch, torch.nn.functional as F # type: ignore | |
| img = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) | |
| tensor = torch.from_numpy(img).float().to(self.device) / 255.0 | |
| tensor = tensor.permute(2,0,1).unsqueeze(0) | |
| try: | |
| out = self.net(tensor, w=self.fidelity, adain=True)[0] | |
| except Exception: | |
| out = self.net(tensor, w=self.fidelity)[0] | |
| out = (out.clamp(0,1)*255).byte().permute(1,2,0).cpu().numpy() | |
| return cv2.cvtColor(out, cv2.COLOR_RGB2BGR) | |
| self.codeformer = _CFWrap(net, device, fidelity) | |
| self.codeformer_loaded = True | |
| logger.info('CodeFormer fully loaded') | |
| except Exception as e: | |
| self.codeformer_error = f"init_failed:{e}" | |
| logger.warning(f"CodeFormer final init failed: {e}") | |
| self.codeformer = None | |
| def _decode_image(self, data) -> np.ndarray: | |
| if isinstance(data, bytes): | |
| arr = np.frombuffer(data, np.uint8) | |
| img = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| return img | |
| if isinstance(data, np.ndarray): | |
| return data | |
| if hasattr(data, 'read'): | |
| buff = data.read() | |
| arr = np.frombuffer(buff, np.uint8) | |
| return cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| raise TypeError('Unsupported image input type') | |
| def set_source_image(self, image_input) -> bool: | |
| if not self.initialized: | |
| self.initialize() | |
| img = self._decode_image(image_input) | |
| if img is None: | |
| logger.error('Failed to decode source image') | |
| return False | |
| faces = self.app.get(img) | |
| if not faces: | |
| logger.error('No face detected in source image') | |
| return False | |
| # Choose the largest face by bbox area | |
| def _area(face): | |
| x1,y1,x2,y2 = face.bbox.astype(int) | |
| return (x2-x1)*(y2-y1) | |
| faces.sort(key=_area, reverse=True) | |
| self.source_face = faces[0] | |
| self.source_img_meta = {'resolution': img.shape[:2], 'num_faces': len(faces)} | |
| logger.info('Source face set') | |
| return True | |
| # Legacy method name alias used by some data channel messages | |
| def set_reference_frame(self, image_input) -> bool: # pragma: no cover - thin shim | |
| return self.set_source_image(image_input) | |
| # Audio processing stubs (voice conversion not yet integrated in new simplified pipeline) | |
| def process_audio_chunk(self, pcm_bytes: bytes) -> bytes: # pragma: no cover | |
| """Pass-through audio to satisfy legacy interface expectations. | |
| Future: integrate voice conversion here. For now: return original audio data. | |
| """ | |
| return pcm_bytes | |
| def process_frame(self, frame: np.ndarray) -> np.ndarray: | |
| frame_in_ts = time.time() | |
| if not self.initialized or self.swapper is None or self.app is None: | |
| self._stats['early_uninitialized'] += 1 | |
| if self.swap_debug: | |
| logger.debug('process_frame: pipeline not fully initialized yet') | |
| return frame | |
| if self.source_face is None: | |
| self._stats['early_no_source'] += 1 | |
| if self.swap_debug: | |
| logger.debug('process_frame: no source_face set yet') | |
| return frame | |
| t0 = time.time() | |
| # Brightness analysis (grayscale mean) to understand low-light degradation | |
| try: | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| brightness = float(np.mean(gray)) | |
| self._stats['brightness_last'] = brightness | |
| if brightness < self.low_brightness_threshold: | |
| self._stats['frames_low_brightness'] += 1 | |
| if self.enable_brightness_comp: | |
| # Simple gamma / gain compensation (scale then clip) | |
| gain = self.target_brightness / max(1.0, brightness) | |
| gain = min(gain, 3.0) # clamp to avoid noise amplification | |
| frame = np.clip(frame.astype(np.float32) * gain, 0, 255).astype(np.uint8) | |
| if self.swap_debug: | |
| logger.debug(f'Applied brightness compensation gain={gain:.2f} (brightness={brightness:.1f})') | |
| except Exception: | |
| pass | |
| # Detector preprocessing path for improved low-light detect | |
| det_input = frame | |
| if self.det_clahe: | |
| try: | |
| gray_det = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| if float(np.mean(gray_det)) < (self.low_brightness_threshold + 15): | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| eq = clahe.apply(gray_det) | |
| det_input = cv2.cvtColor(eq, cv2.COLOR_GRAY2BGR) | |
| except Exception: | |
| pass | |
| faces = self.app.get(det_input) | |
| self._last_faces_cache = faces | |
| no_detection = not faces | |
| if no_detection: | |
| # Attempt temporal reuse of last successful face if within ttl | |
| if self._cached_face is not None and self._cached_face_age < self.face_cache_ttl: | |
| faces = [self._cached_face] | |
| self._cached_face_age += 1 | |
| if self.swap_debug: | |
| logger.debug(f'process_frame: reusing cached face (age={self._cached_face_age})') | |
| else: | |
| self._cached_face = None | |
| self._cached_face_age = 0 | |
| if self.swap_debug: | |
| logger.debug('process_frame: no faces detected, no valid cache') | |
| self._record_latency(time.time() - t0) | |
| self._stats['swap_faces_last'] = 0 | |
| self._stats['frames_no_faces'] += 1 | |
| self._stats['frames'] += 1 | |
| self._frame_index += 1 | |
| self._log_periodic() | |
| return frame | |
| # Track if we used cached face and accumulate total faces detected | |
| if no_detection and faces: # faces populated from cache | |
| self._stats['cached_face_reuses'] += 1 | |
| elif faces: # fresh detection | |
| self._stats['total_faces_detected'] += len(faces) | |
| # Apply face size filter if enabled | |
| if faces and self.face_min_size > 0: | |
| def _area(face): | |
| x1,y1,x2,y2 = face.bbox.astype(int) | |
| return (x2-x1)*(y2-y1) | |
| filtered_faces = [] | |
| for face in faces: | |
| x1,y1,x2,y2 = face.bbox.astype(int) | |
| width, height = x2-x1, y2-y1 | |
| if min(width, height) >= self.face_min_size: | |
| filtered_faces.append(face) | |
| faces = filtered_faces | |
| # Sort faces by area and keep top-N | |
| if faces: | |
| def _area(face): | |
| x1,y1,x2,y2 = face.bbox.astype(int) | |
| return (x2-x1)*(y2-y1) | |
| faces.sort(key=_area, reverse=True) | |
| # Periodic GPU memory check | |
| if self._frame_index > 0 and (self._frame_index % self._memory_check_interval) == 0: | |
| if not self._check_gpu_memory(): | |
| logger.warning("GPU memory pressure detected - performance may degrade") | |
| out = frame | |
| count = 0 | |
| similarities: List[float] = [] | |
| for idx, f in enumerate(faces[:self.max_faces]): | |
| try: | |
| # Compute similarity to source embedding (cosine) for diagnostics | |
| try: | |
| src_emb = getattr(self.source_face, 'normed_embedding', None) | |
| tgt_emb = getattr(f, 'normed_embedding', None) | |
| sim = None | |
| if src_emb is not None and tgt_emb is not None: | |
| # Both are numpy arrays | |
| a = src_emb.astype(np.float32) | |
| b = tgt_emb.astype(np.float32) | |
| denom = (np.linalg.norm(a)*np.linalg.norm(b) + 1e-6) | |
| sim = float(np.dot(a, b) / denom) | |
| similarities.append(sim) | |
| if idx == 0: | |
| self._stats['last_primary_similarity'] = sim | |
| if self.swap_debug and sim is not None and sim < self.similarity_warn_threshold: | |
| logger.debug(f'Low similarity primary face sim={sim:.3f}') | |
| except Exception: | |
| pass | |
| # NOTE: Previously attempted naive small-face ROI upscale introduced moving black box artifacts | |
| # because face landmark coordinates remained in original frame space. We disable that path by default. | |
| # If re-enabled in future, we must recompute detection landmarks on the upscaled ROI. | |
| try: | |
| out = self.swapper.get(out, f, self.source_face, paste_back=True) | |
| count += 1 | |
| except Exception as e: | |
| self._stats['swap_failures'] += 1 | |
| logger.debug(f"Swap failed for face {idx}: {e}") | |
| except Exception as e: | |
| self._stats['swap_failures'] += 1 | |
| logger.debug(f"Face processing failed for face {idx}: {e}") | |
| self._stats['total_faces_swapped'] += count | |
| # Cache first face for reuse | |
| if faces: | |
| self._cached_face = faces[0] | |
| self._cached_face_age = 0 | |
| # Optional debug overlay for visual confirmation | |
| if count > 0 and os.getenv('MIRAGE_DEBUG_OVERLAY', '0').lower() in ('1','true','yes','on'): | |
| try: | |
| for i, f in enumerate(faces[:self.max_faces]): | |
| x1,y1,x2,y2 = f.bbox.astype(int) | |
| cv2.rectangle(out, (x1,y1), (x2,y2), (0,255,0), 2) | |
| label = 'SWAP' | |
| if i < len(similarities) and similarities[i] is not None: | |
| label += f' {similarities[i]:.2f}' | |
| cv2.putText(out, label, (x1, max(0,y1-5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA) | |
| except Exception: | |
| pass | |
| if self.swap_debug: | |
| logger.debug(f'process_frame: detected={len(faces)} swapped={count} stride={self.codeformer_frame_stride} apply_cf={count>0 and (self._frame_index % self.codeformer_frame_stride == 0)}') | |
| # CodeFormer stride / face-region logic | |
| apply_cf = ( | |
| self.codeformer is not None and | |
| self.codeformer_enabled and | |
| self.codeformer_frame_stride > 0 and | |
| (self._frame_index % self.codeformer_frame_stride == 0) | |
| ) | |
| if count > 0 and apply_cf: | |
| cf_t0 = time.time() | |
| try: | |
| if self.codeformer_face_only and faces: | |
| # Use largest face bbox | |
| f0 = faces[0] | |
| x1,y1,x2,y2 = f0.bbox.astype(int) | |
| h, w = out.shape[:2] | |
| mx = int((x2 - x1) * self.codeformer_face_margin) | |
| my = int((y2 - y1) * self.codeformer_face_margin) | |
| x1c = max(0, x1 - mx); y1c = max(0, y1 - my) | |
| x2c = min(w, x2 + mx); y2c = min(h, y2 + my) | |
| region = out[y1c:y2c, x1c:x2c] | |
| if region.size > 0: | |
| enhanced = self.codeformer.enhance(region) | |
| out[y1c:y2c, x1c:x2c] = enhanced | |
| else: | |
| out = self.codeformer.enhance(out) | |
| self._stats['enhanced_frames'] += 1 | |
| cf_dt = (time.time() - cf_t0)*1000.0 | |
| self._codeformer_lat_hist.append(cf_dt) | |
| if len(self._codeformer_lat_hist) > 200: | |
| self._codeformer_lat_hist.pop(0) | |
| except Exception as e: | |
| logger.debug(f"CodeFormer enhancement failed: {e}") | |
| self._record_latency(time.time() - t0) | |
| self._stats['swap_faces_last'] = count | |
| self._stats['frames'] += 1 | |
| self._frame_index += 1 | |
| # End-to-end latency including pre-detection + swap path | |
| self._last_e2e_ms = (time.time() - frame_in_ts) * 1000.0 | |
| self._e2e_hist.append(self._last_e2e_ms) | |
| if len(self._e2e_hist) > 200: | |
| self._e2e_hist.pop(0) | |
| # Periodic log emission | |
| self._log_periodic() | |
| return out | |
| def _record_latency(self, dt: float): | |
| ms = dt * 1000.0 | |
| self._stats['last_latency_ms'] = ms | |
| self._lat_hist.append(ms) | |
| if len(self._lat_hist) > 200: | |
| self._lat_hist.pop(0) | |
| self._stats['avg_latency_ms'] = float(np.mean(self._lat_hist)) if self._lat_hist else None | |
| def get_stats(self) -> Dict[str, Any]: | |
| cf_avg = float(np.mean(self._codeformer_lat_hist)) if self._codeformer_lat_hist else None | |
| info: Dict[str, Any] = dict( | |
| self._stats, | |
| initialized=self.initialized, | |
| codeformer_fidelity=self.codeformer_fidelity if self.codeformer is not None else None, | |
| codeformer_loaded=self.codeformer_loaded, | |
| codeformer_frame_stride=self.codeformer_frame_stride, | |
| codeformer_face_only=self.codeformer_face_only, | |
| codeformer_avg_latency_ms=cf_avg, | |
| max_faces=self.max_faces, | |
| debug_overlay=os.getenv('MIRAGE_DEBUG_OVERLAY', '0'), | |
| e2e_latency_ms=self._last_e2e_ms, | |
| e2e_latency_avg_ms=(float(np.mean(self._e2e_hist)) if self._e2e_hist else None), | |
| inswapper_model_path=self.inswapper_model_path, | |
| face_upscale_enabled=self.enable_face_upscale, | |
| codeformer_error=self.codeformer_error, | |
| ) | |
| # Provider diagnostics (best-effort) | |
| try: # pragma: no cover | |
| import onnxruntime as ort # type: ignore | |
| info['available_providers'] = ort.get_available_providers() | |
| info['active_providers'] = getattr(self, '_active_providers', None) | |
| except Exception: | |
| pass | |
| return info | |
| # Legacy interface used by webrtc_server data channel | |
| def get_performance_stats(self) -> Dict[str, Any]: # pragma: no cover simple delegate | |
| stats = self.get_stats() | |
| # Provide alias field names expected historically (if any) | |
| stats['frames_processed'] = stats.get('frames') | |
| return stats | |
| # Backwards compatibility for earlier server expecting process_video_frame | |
| def process_video_frame(self, frame: np.ndarray, frame_idx: int | None = None) -> np.ndarray: | |
| return self.process_frame(frame) | |
| def cleanup_resources(self): | |
| """Clean up GPU resources to prevent hang on reconnection""" | |
| try: | |
| if hasattr(self, 'swapper') and self.swapper is not None: | |
| # Force garbage collection of ONNX sessions | |
| del self.swapper | |
| self.swapper = None | |
| if hasattr(self, 'codeformer') and self.codeformer is not None: | |
| del self.codeformer | |
| self.codeformer = None | |
| self.codeformer_loaded = False | |
| # Clear GPU cache if available | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| logger.info("GPU cache cleared during cleanup") | |
| except Exception: | |
| pass | |
| logger.info("Pipeline resources cleaned up") | |
| except Exception as e: | |
| logger.warning(f"Resource cleanup failed: {e}") | |
| def reset_for_reconnection(self): | |
| """Reset pipeline state for clean reconnection without full reinitialization""" | |
| # Clear cached face to prevent stale data | |
| self._cached_face = None | |
| self._cached_face_age = 0 | |
| # Reset some stats but keep totals for debugging | |
| self._stats['swap_faces_last'] = 0 | |
| self._stats['brightness_last'] = None | |
| self._stats['last_primary_similarity'] = None | |
| # Clear latency histories to start fresh | |
| self._lat_hist.clear() | |
| self._e2e_hist.clear() | |
| self._codeformer_lat_hist.clear() | |
| # Force GPU memory cleanup to prevent hanging on reconnection | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() # Ensure all operations complete | |
| logger.info("GPU cache cleared and synchronized during reset") | |
| except Exception: | |
| pass | |
| logger.info("Pipeline state reset for reconnection") | |
| # Singleton access similar to previous pattern | |
| _pipeline_instance: Optional[FaceSwapPipeline] = None | |
| def get_pipeline() -> FaceSwapPipeline: | |
| global _pipeline_instance | |
| if _pipeline_instance is None: | |
| _pipeline_instance = FaceSwapPipeline() | |
| _pipeline_instance.initialize() | |
| return _pipeline_instance | |
| def reset_pipeline(): | |
| """Reset pipeline for clean reconnection - called by WebRTC server on connection reset""" | |
| global _pipeline_instance | |
| if _pipeline_instance is not None: | |
| _pipeline_instance.reset_for_reconnection() | |
| def cleanup_pipeline(): | |
| """Full cleanup for shutdown - releases GPU resources""" | |
| global _pipeline_instance | |
| if _pipeline_instance is not None: | |
| _pipeline_instance.cleanup_resources() | |
| _pipeline_instance = None | |