Spaces:
Paused
Paused
MacBook pro
Optimize for HuggingFace Spaces: simplified Gradio interface and reduced dependencies
755d25a
| """ | |
| Real-time Optimization Module | |
| Implements latency reduction, frame buffering, and GPU optimization | |
| """ | |
| import torch | |
| import torch.nn.functional as F | |
| import numpy as np | |
| import time | |
| import threading | |
| import queue | |
| import logging | |
| from collections import deque | |
| from typing import Dict, Any, Optional, Tuple | |
| import psutil | |
| import gc | |
| logger = logging.getLogger(__name__) | |
| class LatencyOptimizer: | |
| """Optimizes processing pipeline for minimal latency""" | |
| def __init__(self, target_latency_ms: float = 250.0): | |
| self.target_latency_ms = target_latency_ms | |
| self.latency_history = deque(maxlen=100) | |
| self.processing_times = {} | |
| # Adaptive parameters | |
| self.current_quality = 1.0 # 0.5 to 1.0 | |
| self.current_resolution = (512, 512) | |
| self.current_fps = 20 | |
| # Performance thresholds | |
| self.latency_threshold_high = target_latency_ms * 0.8 # 200ms | |
| self.latency_threshold_low = target_latency_ms * 0.6 # 150ms | |
| # Adaptation counters | |
| self.high_latency_count = 0 | |
| self.low_latency_count = 0 | |
| self.adaptation_threshold = 5 # consecutive frames | |
| def record_latency(self, stage: str, latency_ms: float): | |
| """Record latency for a processing stage""" | |
| self.processing_times[stage] = latency_ms | |
| # Calculate total latency | |
| total_latency = sum(self.processing_times.values()) | |
| self.latency_history.append(total_latency) | |
| # Trigger adaptation if needed | |
| self._adapt_quality(total_latency) | |
| def _adapt_quality(self, total_latency: float): | |
| """Adapt quality based on latency""" | |
| if total_latency > self.latency_threshold_high: | |
| self.high_latency_count += 1 | |
| self.low_latency_count = 0 | |
| if self.high_latency_count >= self.adaptation_threshold: | |
| self._degrade_quality() | |
| self.high_latency_count = 0 | |
| elif total_latency < self.latency_threshold_low: | |
| self.low_latency_count += 1 | |
| self.high_latency_count = 0 | |
| if self.low_latency_count >= self.adaptation_threshold * 2: # Be more conservative with upgrades | |
| self._improve_quality() | |
| self.low_latency_count = 0 | |
| else: | |
| self.high_latency_count = 0 | |
| self.low_latency_count = 0 | |
| def _degrade_quality(self): | |
| """Degrade quality to improve latency""" | |
| if self.current_quality > 0.7: | |
| self.current_quality -= 0.1 | |
| logger.info(f"Reduced quality to {self.current_quality:.1f}") | |
| elif self.current_fps > 15: | |
| self.current_fps -= 2 | |
| logger.info(f"Reduced FPS to {self.current_fps}") | |
| elif self.current_resolution[0] > 384: | |
| self.current_resolution = (384, 384) | |
| logger.info(f"Reduced resolution to {self.current_resolution}") | |
| def _improve_quality(self): | |
| """Improve quality when latency allows""" | |
| if self.current_resolution[0] < 512: | |
| self.current_resolution = (512, 512) | |
| logger.info(f"Increased resolution to {self.current_resolution}") | |
| elif self.current_fps < 20: | |
| self.current_fps += 2 | |
| logger.info(f"Increased FPS to {self.current_fps}") | |
| elif self.current_quality < 1.0: | |
| self.current_quality += 0.1 | |
| logger.info(f"Increased quality to {self.current_quality:.1f}") | |
| def get_current_settings(self) -> Dict[str, Any]: | |
| """Get current adaptive settings""" | |
| return { | |
| "quality": self.current_quality, | |
| "resolution": self.current_resolution, | |
| "fps": self.current_fps, | |
| "avg_latency_ms": np.mean(self.latency_history) if self.latency_history else 0 | |
| } | |
| class FrameBuffer: | |
| """Thread-safe frame buffer with overflow protection""" | |
| def __init__(self, max_size: int = 5): | |
| self.max_size = max_size | |
| self.buffer = queue.Queue(maxsize=max_size) | |
| self.dropped_frames = 0 | |
| self.total_frames = 0 | |
| def put_frame(self, frame: np.ndarray, timestamp: float) -> bool: | |
| """Add frame to buffer, returns False if dropped""" | |
| self.total_frames += 1 | |
| try: | |
| self.buffer.put_nowait((frame, timestamp)) | |
| return True | |
| except queue.Full: | |
| # Drop oldest frame and add new one | |
| try: | |
| self.buffer.get_nowait() | |
| self.buffer.put_nowait((frame, timestamp)) | |
| self.dropped_frames += 1 | |
| return True | |
| except queue.Empty: | |
| return False | |
| def get_frame(self) -> Optional[Tuple[np.ndarray, float]]: | |
| """Get next frame from buffer""" | |
| try: | |
| return self.buffer.get_nowait() | |
| except queue.Empty: | |
| return None | |
| def get_stats(self) -> Dict[str, int]: | |
| """Get buffer statistics""" | |
| return { | |
| "size": self.buffer.qsize(), | |
| "max_size": self.max_size, | |
| "dropped_frames": self.dropped_frames, | |
| "total_frames": self.total_frames, | |
| "drop_rate": self.dropped_frames / max(self.total_frames, 1) | |
| } | |
| class GPUMemoryManager: | |
| """Manages GPU memory for optimal performance""" | |
| def __init__(self): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.memory_threshold = 0.9 # 90% of GPU memory | |
| self.cleanup_interval = 50 # frames | |
| self.frame_count = 0 | |
| def optimize_memory(self): | |
| """Optimize GPU memory usage""" | |
| if not torch.cuda.is_available(): | |
| return | |
| self.frame_count += 1 | |
| # Periodic cleanup | |
| if self.frame_count % self.cleanup_interval == 0: | |
| self._cleanup_memory() | |
| # Emergency cleanup if memory usage is high | |
| if self._get_memory_usage() > self.memory_threshold: | |
| self._emergency_cleanup() | |
| def _get_memory_usage(self) -> float: | |
| """Get current GPU memory usage ratio""" | |
| if not torch.cuda.is_available(): | |
| return 0.0 | |
| allocated = torch.cuda.memory_allocated() | |
| total = torch.cuda.get_device_properties(0).total_memory | |
| return allocated / total | |
| def _cleanup_memory(self): | |
| """Perform memory cleanup""" | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def _emergency_cleanup(self): | |
| """Emergency memory cleanup""" | |
| logger.warning("High GPU memory usage, performing emergency cleanup") | |
| self._cleanup_memory() | |
| # Force garbage collection | |
| for _ in range(3): | |
| gc.collect() | |
| def get_memory_stats(self) -> Dict[str, float]: | |
| """Get GPU memory statistics""" | |
| if not torch.cuda.is_available(): | |
| return {"available": False} | |
| allocated = torch.cuda.memory_allocated() | |
| reserved = torch.cuda.memory_reserved() | |
| total = torch.cuda.get_device_properties(0).total_memory | |
| return { | |
| "available": True, | |
| "allocated_gb": allocated / (1024**3), | |
| "reserved_gb": reserved / (1024**3), | |
| "total_gb": total / (1024**3), | |
| "usage_ratio": allocated / total | |
| } | |
| class AudioSyncManager: | |
| """Manages audio-video synchronization""" | |
| def __init__(self, max_drift_ms: float = 150.0): | |
| self.max_drift_ms = max_drift_ms | |
| self.audio_timestamps = deque(maxlen=100) | |
| self.video_timestamps = deque(maxlen=100) | |
| self.sync_offset = 0.0 | |
| def add_audio_timestamp(self, timestamp: float): | |
| """Add audio timestamp""" | |
| self.audio_timestamps.append(timestamp) | |
| self._calculate_sync_offset() | |
| def add_video_timestamp(self, timestamp: float): | |
| """Add video timestamp""" | |
| self.video_timestamps.append(timestamp) | |
| self._calculate_sync_offset() | |
| def _calculate_sync_offset(self): | |
| """Calculate current sync offset""" | |
| if len(self.audio_timestamps) == 0 or len(self.video_timestamps) == 0: | |
| return | |
| # Calculate average timestamp difference | |
| audio_avg = np.mean(list(self.audio_timestamps)[-10:]) # Last 10 samples | |
| video_avg = np.mean(list(self.video_timestamps)[-10:]) | |
| self.sync_offset = audio_avg - video_avg | |
| def should_drop_video_frame(self, video_timestamp: float) -> bool: | |
| """Check if video frame should be dropped for sync""" | |
| if len(self.audio_timestamps) == 0: | |
| return False | |
| latest_audio = self.audio_timestamps[-1] | |
| drift = video_timestamp - latest_audio | |
| return abs(drift) > self.max_drift_ms | |
| def get_sync_stats(self) -> Dict[str, float]: | |
| """Get synchronization statistics""" | |
| return { | |
| "sync_offset_ms": self.sync_offset, | |
| "audio_samples": len(self.audio_timestamps), | |
| "video_samples": len(self.video_timestamps) | |
| } | |
| class PerformanceProfiler: | |
| """Profiles system performance for optimization""" | |
| def __init__(self): | |
| self.cpu_usage = deque(maxlen=60) # 1 minute at 1 Hz | |
| self.memory_usage = deque(maxlen=60) | |
| self.gpu_utilization = deque(maxlen=60) | |
| # Start monitoring thread | |
| self.monitoring = True | |
| self.monitor_thread = threading.Thread(target=self._monitor_system) | |
| self.monitor_thread.daemon = True | |
| self.monitor_thread.start() | |
| def _monitor_system(self): | |
| """Monitor system resources""" | |
| while self.monitoring: | |
| try: | |
| # CPU usage | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| self.cpu_usage.append(cpu_percent) | |
| # Memory usage | |
| memory = psutil.virtual_memory() | |
| self.memory_usage.append(memory.percent) | |
| # GPU utilization (if available) | |
| if torch.cuda.is_available(): | |
| # Approximate GPU utilization based on memory usage | |
| gpu_memory_used = torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory | |
| self.gpu_utilization.append(gpu_memory_used * 100) | |
| else: | |
| self.gpu_utilization.append(0) | |
| except Exception as e: | |
| logger.error(f"System monitoring error: {e}") | |
| time.sleep(1) | |
| def stop_monitoring(self): | |
| """Stop system monitoring""" | |
| self.monitoring = False | |
| if self.monitor_thread.is_alive(): | |
| self.monitor_thread.join() | |
| def get_system_stats(self) -> Dict[str, Any]: | |
| """Get system performance statistics""" | |
| return { | |
| "cpu_usage_avg": np.mean(self.cpu_usage) if self.cpu_usage else 0, | |
| "cpu_usage_max": np.max(self.cpu_usage) if self.cpu_usage else 0, | |
| "memory_usage_avg": np.mean(self.memory_usage) if self.memory_usage else 0, | |
| "memory_usage_max": np.max(self.memory_usage) if self.memory_usage else 0, | |
| "gpu_utilization_avg": np.mean(self.gpu_utilization) if self.gpu_utilization else 0, | |
| "gpu_utilization_max": np.max(self.gpu_utilization) if self.gpu_utilization else 0 | |
| } | |
| class RealTimeOptimizer: | |
| """Main real-time optimization controller""" | |
| def __init__(self, target_latency_ms: float = 250.0): | |
| self.latency_optimizer = LatencyOptimizer(target_latency_ms) | |
| self.frame_buffer = FrameBuffer() | |
| self.gpu_manager = GPUMemoryManager() | |
| self.audio_sync = AudioSyncManager() | |
| self.profiler = PerformanceProfiler() | |
| self.stats = {} | |
| self.last_stats_update = time.time() | |
| def process_frame(self, frame: np.ndarray, timestamp: float, stage: str = "video") -> bool: | |
| """Process a frame with optimization""" | |
| start_time = time.time() | |
| # Check if frame should be dropped for sync | |
| if stage == "video" and self.audio_sync.should_drop_video_frame(timestamp): | |
| return False | |
| # Add to buffer | |
| success = self.frame_buffer.put_frame(frame, timestamp) | |
| # Record processing time | |
| processing_time = (time.time() - start_time) * 1000 | |
| self.latency_optimizer.record_latency(stage, processing_time) | |
| # Update timestamps for sync | |
| if stage == "video": | |
| self.audio_sync.add_video_timestamp(timestamp) | |
| elif stage == "audio": | |
| self.audio_sync.add_audio_timestamp(timestamp) | |
| # Optimize GPU memory | |
| self.gpu_manager.optimize_memory() | |
| return success | |
| def get_frame(self) -> Optional[Tuple[np.ndarray, float]]: | |
| """Get next frame from buffer""" | |
| return self.frame_buffer.get_frame() | |
| def get_optimization_settings(self) -> Dict[str, Any]: | |
| """Get current optimization settings""" | |
| return self.latency_optimizer.get_current_settings() | |
| def get_comprehensive_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive performance statistics""" | |
| now = time.time() | |
| # Update stats every 2 seconds | |
| if now - self.last_stats_update > 2.0: | |
| self.stats = { | |
| "latency": self.latency_optimizer.get_current_settings(), | |
| "buffer": self.frame_buffer.get_stats(), | |
| "gpu": self.gpu_manager.get_memory_stats(), | |
| "sync": self.audio_sync.get_sync_stats(), | |
| "system": self.profiler.get_system_stats() | |
| } | |
| self.last_stats_update = now | |
| return self.stats | |
| def cleanup(self): | |
| """Cleanup optimizer resources""" | |
| self.profiler.stop_monitoring() | |
| # Global optimizer instance | |
| _optimizer_instance = None | |
| def get_realtime_optimizer() -> RealTimeOptimizer: | |
| """Get or create global optimizer instance""" | |
| global _optimizer_instance | |
| if _optimizer_instance is None: | |
| _optimizer_instance = RealTimeOptimizer() | |
| return _optimizer_instance |