mirage / realtime_optimizer.py
MacBook pro
Optimize for HuggingFace Spaces: simplified Gradio interface and reduced dependencies
755d25a
raw
history blame
14.5 kB
"""
Real-time Optimization Module
Implements latency reduction, frame buffering, and GPU optimization
"""
import torch
import torch.nn.functional as F
import numpy as np
import time
import threading
import queue
import logging
from collections import deque
from typing import Dict, Any, Optional, Tuple
import psutil
import gc
logger = logging.getLogger(__name__)
class LatencyOptimizer:
"""Optimizes processing pipeline for minimal latency"""
def __init__(self, target_latency_ms: float = 250.0):
self.target_latency_ms = target_latency_ms
self.latency_history = deque(maxlen=100)
self.processing_times = {}
# Adaptive parameters
self.current_quality = 1.0 # 0.5 to 1.0
self.current_resolution = (512, 512)
self.current_fps = 20
# Performance thresholds
self.latency_threshold_high = target_latency_ms * 0.8 # 200ms
self.latency_threshold_low = target_latency_ms * 0.6 # 150ms
# Adaptation counters
self.high_latency_count = 0
self.low_latency_count = 0
self.adaptation_threshold = 5 # consecutive frames
def record_latency(self, stage: str, latency_ms: float):
"""Record latency for a processing stage"""
self.processing_times[stage] = latency_ms
# Calculate total latency
total_latency = sum(self.processing_times.values())
self.latency_history.append(total_latency)
# Trigger adaptation if needed
self._adapt_quality(total_latency)
def _adapt_quality(self, total_latency: float):
"""Adapt quality based on latency"""
if total_latency > self.latency_threshold_high:
self.high_latency_count += 1
self.low_latency_count = 0
if self.high_latency_count >= self.adaptation_threshold:
self._degrade_quality()
self.high_latency_count = 0
elif total_latency < self.latency_threshold_low:
self.low_latency_count += 1
self.high_latency_count = 0
if self.low_latency_count >= self.adaptation_threshold * 2: # Be more conservative with upgrades
self._improve_quality()
self.low_latency_count = 0
else:
self.high_latency_count = 0
self.low_latency_count = 0
def _degrade_quality(self):
"""Degrade quality to improve latency"""
if self.current_quality > 0.7:
self.current_quality -= 0.1
logger.info(f"Reduced quality to {self.current_quality:.1f}")
elif self.current_fps > 15:
self.current_fps -= 2
logger.info(f"Reduced FPS to {self.current_fps}")
elif self.current_resolution[0] > 384:
self.current_resolution = (384, 384)
logger.info(f"Reduced resolution to {self.current_resolution}")
def _improve_quality(self):
"""Improve quality when latency allows"""
if self.current_resolution[0] < 512:
self.current_resolution = (512, 512)
logger.info(f"Increased resolution to {self.current_resolution}")
elif self.current_fps < 20:
self.current_fps += 2
logger.info(f"Increased FPS to {self.current_fps}")
elif self.current_quality < 1.0:
self.current_quality += 0.1
logger.info(f"Increased quality to {self.current_quality:.1f}")
def get_current_settings(self) -> Dict[str, Any]:
"""Get current adaptive settings"""
return {
"quality": self.current_quality,
"resolution": self.current_resolution,
"fps": self.current_fps,
"avg_latency_ms": np.mean(self.latency_history) if self.latency_history else 0
}
class FrameBuffer:
"""Thread-safe frame buffer with overflow protection"""
def __init__(self, max_size: int = 5):
self.max_size = max_size
self.buffer = queue.Queue(maxsize=max_size)
self.dropped_frames = 0
self.total_frames = 0
def put_frame(self, frame: np.ndarray, timestamp: float) -> bool:
"""Add frame to buffer, returns False if dropped"""
self.total_frames += 1
try:
self.buffer.put_nowait((frame, timestamp))
return True
except queue.Full:
# Drop oldest frame and add new one
try:
self.buffer.get_nowait()
self.buffer.put_nowait((frame, timestamp))
self.dropped_frames += 1
return True
except queue.Empty:
return False
def get_frame(self) -> Optional[Tuple[np.ndarray, float]]:
"""Get next frame from buffer"""
try:
return self.buffer.get_nowait()
except queue.Empty:
return None
def get_stats(self) -> Dict[str, int]:
"""Get buffer statistics"""
return {
"size": self.buffer.qsize(),
"max_size": self.max_size,
"dropped_frames": self.dropped_frames,
"total_frames": self.total_frames,
"drop_rate": self.dropped_frames / max(self.total_frames, 1)
}
class GPUMemoryManager:
"""Manages GPU memory for optimal performance"""
def __init__(self):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.memory_threshold = 0.9 # 90% of GPU memory
self.cleanup_interval = 50 # frames
self.frame_count = 0
def optimize_memory(self):
"""Optimize GPU memory usage"""
if not torch.cuda.is_available():
return
self.frame_count += 1
# Periodic cleanup
if self.frame_count % self.cleanup_interval == 0:
self._cleanup_memory()
# Emergency cleanup if memory usage is high
if self._get_memory_usage() > self.memory_threshold:
self._emergency_cleanup()
def _get_memory_usage(self) -> float:
"""Get current GPU memory usage ratio"""
if not torch.cuda.is_available():
return 0.0
allocated = torch.cuda.memory_allocated()
total = torch.cuda.get_device_properties(0).total_memory
return allocated / total
def _cleanup_memory(self):
"""Perform memory cleanup"""
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
def _emergency_cleanup(self):
"""Emergency memory cleanup"""
logger.warning("High GPU memory usage, performing emergency cleanup")
self._cleanup_memory()
# Force garbage collection
for _ in range(3):
gc.collect()
def get_memory_stats(self) -> Dict[str, float]:
"""Get GPU memory statistics"""
if not torch.cuda.is_available():
return {"available": False}
allocated = torch.cuda.memory_allocated()
reserved = torch.cuda.memory_reserved()
total = torch.cuda.get_device_properties(0).total_memory
return {
"available": True,
"allocated_gb": allocated / (1024**3),
"reserved_gb": reserved / (1024**3),
"total_gb": total / (1024**3),
"usage_ratio": allocated / total
}
class AudioSyncManager:
"""Manages audio-video synchronization"""
def __init__(self, max_drift_ms: float = 150.0):
self.max_drift_ms = max_drift_ms
self.audio_timestamps = deque(maxlen=100)
self.video_timestamps = deque(maxlen=100)
self.sync_offset = 0.0
def add_audio_timestamp(self, timestamp: float):
"""Add audio timestamp"""
self.audio_timestamps.append(timestamp)
self._calculate_sync_offset()
def add_video_timestamp(self, timestamp: float):
"""Add video timestamp"""
self.video_timestamps.append(timestamp)
self._calculate_sync_offset()
def _calculate_sync_offset(self):
"""Calculate current sync offset"""
if len(self.audio_timestamps) == 0 or len(self.video_timestamps) == 0:
return
# Calculate average timestamp difference
audio_avg = np.mean(list(self.audio_timestamps)[-10:]) # Last 10 samples
video_avg = np.mean(list(self.video_timestamps)[-10:])
self.sync_offset = audio_avg - video_avg
def should_drop_video_frame(self, video_timestamp: float) -> bool:
"""Check if video frame should be dropped for sync"""
if len(self.audio_timestamps) == 0:
return False
latest_audio = self.audio_timestamps[-1]
drift = video_timestamp - latest_audio
return abs(drift) > self.max_drift_ms
def get_sync_stats(self) -> Dict[str, float]:
"""Get synchronization statistics"""
return {
"sync_offset_ms": self.sync_offset,
"audio_samples": len(self.audio_timestamps),
"video_samples": len(self.video_timestamps)
}
class PerformanceProfiler:
"""Profiles system performance for optimization"""
def __init__(self):
self.cpu_usage = deque(maxlen=60) # 1 minute at 1 Hz
self.memory_usage = deque(maxlen=60)
self.gpu_utilization = deque(maxlen=60)
# Start monitoring thread
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_system)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _monitor_system(self):
"""Monitor system resources"""
while self.monitoring:
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.append(cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.append(memory.percent)
# GPU utilization (if available)
if torch.cuda.is_available():
# Approximate GPU utilization based on memory usage
gpu_memory_used = torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory
self.gpu_utilization.append(gpu_memory_used * 100)
else:
self.gpu_utilization.append(0)
except Exception as e:
logger.error(f"System monitoring error: {e}")
time.sleep(1)
def stop_monitoring(self):
"""Stop system monitoring"""
self.monitoring = False
if self.monitor_thread.is_alive():
self.monitor_thread.join()
def get_system_stats(self) -> Dict[str, Any]:
"""Get system performance statistics"""
return {
"cpu_usage_avg": np.mean(self.cpu_usage) if self.cpu_usage else 0,
"cpu_usage_max": np.max(self.cpu_usage) if self.cpu_usage else 0,
"memory_usage_avg": np.mean(self.memory_usage) if self.memory_usage else 0,
"memory_usage_max": np.max(self.memory_usage) if self.memory_usage else 0,
"gpu_utilization_avg": np.mean(self.gpu_utilization) if self.gpu_utilization else 0,
"gpu_utilization_max": np.max(self.gpu_utilization) if self.gpu_utilization else 0
}
class RealTimeOptimizer:
"""Main real-time optimization controller"""
def __init__(self, target_latency_ms: float = 250.0):
self.latency_optimizer = LatencyOptimizer(target_latency_ms)
self.frame_buffer = FrameBuffer()
self.gpu_manager = GPUMemoryManager()
self.audio_sync = AudioSyncManager()
self.profiler = PerformanceProfiler()
self.stats = {}
self.last_stats_update = time.time()
def process_frame(self, frame: np.ndarray, timestamp: float, stage: str = "video") -> bool:
"""Process a frame with optimization"""
start_time = time.time()
# Check if frame should be dropped for sync
if stage == "video" and self.audio_sync.should_drop_video_frame(timestamp):
return False
# Add to buffer
success = self.frame_buffer.put_frame(frame, timestamp)
# Record processing time
processing_time = (time.time() - start_time) * 1000
self.latency_optimizer.record_latency(stage, processing_time)
# Update timestamps for sync
if stage == "video":
self.audio_sync.add_video_timestamp(timestamp)
elif stage == "audio":
self.audio_sync.add_audio_timestamp(timestamp)
# Optimize GPU memory
self.gpu_manager.optimize_memory()
return success
def get_frame(self) -> Optional[Tuple[np.ndarray, float]]:
"""Get next frame from buffer"""
return self.frame_buffer.get_frame()
def get_optimization_settings(self) -> Dict[str, Any]:
"""Get current optimization settings"""
return self.latency_optimizer.get_current_settings()
def get_comprehensive_stats(self) -> Dict[str, Any]:
"""Get comprehensive performance statistics"""
now = time.time()
# Update stats every 2 seconds
if now - self.last_stats_update > 2.0:
self.stats = {
"latency": self.latency_optimizer.get_current_settings(),
"buffer": self.frame_buffer.get_stats(),
"gpu": self.gpu_manager.get_memory_stats(),
"sync": self.audio_sync.get_sync_stats(),
"system": self.profiler.get_system_stats()
}
self.last_stats_update = now
return self.stats
def cleanup(self):
"""Cleanup optimizer resources"""
self.profiler.stop_monitoring()
# Global optimizer instance
_optimizer_instance = None
def get_realtime_optimizer() -> RealTimeOptimizer:
"""Get or create global optimizer instance"""
global _optimizer_instance
if _optimizer_instance is None:
_optimizer_instance = RealTimeOptimizer()
return _optimizer_instance