MogensR's picture
Rename progress_tracker.py to utils/monitoring/progress_tracker.py
434972c
raw
history blame
16.6 kB
"""
Progress Tracking Module
Handles progress monitoring, ETA calculations, and performance statistics
"""
import time
import logging
from typing import Optional, Callable, Dict, Any, List
from dataclasses import dataclass, field
from collections import deque
logger = logging.getLogger(__name__)
@dataclass
class ProgressSnapshot:
"""Snapshot of progress at a specific point in time"""
timestamp: float
frame_number: int
stage: str
fps: float
memory_usage_mb: Optional[float] = None
custom_metrics: Dict[str, Any] = field(default_factory=dict)
class ProgressTracker:
"""
Enhanced progress tracking with detailed statistics and ETA calculations
"""
def __init__(self, total_frames: int, callback: Optional[Callable] = None,
track_performance: bool = True):
self.total_frames = total_frames
self.callback = callback
self.track_performance = track_performance
# Timing data
self.start_time = time.time()
self.last_update_time = self.start_time
self.processed_frames = 0
# Performance tracking
self.frame_times = deque(maxlen=100) # Keep last 100 frame times
self.fps_history = deque(maxlen=50) # Keep last 50 FPS measurements
self.snapshots: List[ProgressSnapshot] = []
# Stage tracking
self.current_stage = "initializing"
self.stage_start_time = self.start_time
self.stages_completed = []
# Statistics
self.stats = {
'total_processing_time': 0.0,
'average_fps': 0.0,
'peak_fps': 0.0,
'slowest_fps': float('inf'),
'frames_per_second_variance': 0.0,
'estimated_completion_accuracy': 0.0,
'stage_times': {},
'memory_peak_mb': 0.0
}
# ETA calculation
self.eta_smoothing_factor = 0.2 # For exponential smoothing
self.smoothed_fps = 0.0
logger.debug(f"ProgressTracker initialized for {total_frames} frames")
def update(self, frame_number: int, stage: str = "",
custom_metrics: Optional[Dict[str, Any]] = None,
memory_usage_mb: Optional[float] = None):
"""
Update progress with comprehensive tracking
Args:
frame_number: Current frame being processed
stage: Current processing stage description
custom_metrics: Additional metrics to track
memory_usage_mb: Current memory usage in MB
"""
current_time = time.time()
# Handle stage changes
if stage and stage != self.current_stage:
self._complete_stage()
self.current_stage = stage
self.stage_start_time = current_time
# Calculate frame timing
if self.processed_frames > 0:
frame_time = current_time - self.last_update_time
self.frame_times.append(frame_time)
self.processed_frames = frame_number
self.last_update_time = current_time
# Calculate performance metrics
elapsed_time = current_time - self.start_time
current_fps = self._calculate_current_fps()
# Update FPS history and smoothing
if current_fps > 0:
self.fps_history.append(current_fps)
self._update_smoothed_fps(current_fps)
# Calculate ETA
eta_seconds = self._calculate_eta()
progress_pct = self.processed_frames / self.total_frames if self.total_frames > 0 else 0
# Update statistics
self._update_statistics(current_fps, memory_usage_mb)
# Create snapshot if performance tracking is enabled
if self.track_performance:
snapshot = ProgressSnapshot(
timestamp=current_time,
frame_number=frame_number,
stage=self.current_stage,
fps=current_fps,
memory_usage_mb=memory_usage_mb,
custom_metrics=custom_metrics or {}
)
self.snapshots.append(snapshot)
# Generate progress message
message = self._generate_progress_message(
elapsed_time, current_fps, eta_seconds, stage
)
# Call progress callback
if self.callback:
try:
self.callback(progress_pct, message)
except Exception as e:
logger.warning(f"Progress callback failed: {e}")
# Log detailed progress periodically
if frame_number % 50 == 0 or frame_number == self.total_frames:
self._log_detailed_progress(progress_pct, current_fps, eta_seconds)
def _calculate_current_fps(self) -> float:
"""Calculate current FPS based on recent frame times"""
if not self.frame_times:
return 0.0
# Use average of recent frame times for stability
recent_frame_times = list(self.frame_times)[-10:] # Last 10 frames
avg_frame_time = sum(recent_frame_times) / len(recent_frame_times)
return 1.0 / avg_frame_time if avg_frame_time > 0 else 0.0
def _update_smoothed_fps(self, current_fps: float):
"""Update smoothed FPS using exponential smoothing"""
if self.smoothed_fps == 0.0:
self.smoothed_fps = current_fps
else:
self.smoothed_fps = (
self.eta_smoothing_factor * current_fps +
(1 - self.eta_smoothing_factor) * self.smoothed_fps
)
def _calculate_eta(self) -> float:
"""Calculate estimated time to completion"""
if self.processed_frames <= 0 or self.smoothed_fps <= 0:
return 0.0
remaining_frames = self.total_frames - self.processed_frames
return remaining_frames / self.smoothed_fps
def _update_statistics(self, current_fps: float, memory_usage_mb: Optional[float]):
"""Update comprehensive statistics"""
current_time = time.time()
self.stats['total_processing_time'] = current_time - self.start_time
# FPS statistics
if self.fps_history:
fps_list = list(self.fps_history)
self.stats['average_fps'] = sum(fps_list) / len(fps_list)
self.stats['peak_fps'] = max(fps_list)
self.stats['slowest_fps'] = min(fps_list)
# Calculate variance
avg_fps = self.stats['average_fps']
variance = sum((fps - avg_fps) ** 2 for fps in fps_list) / len(fps_list)
self.stats['frames_per_second_variance'] = variance
# Memory tracking
if memory_usage_mb and memory_usage_mb > self.stats['memory_peak_mb']:
self.stats['memory_peak_mb'] = memory_usage_mb
def _complete_stage(self):
"""Complete the current stage and record its duration"""
if self.current_stage:
stage_duration = time.time() - self.stage_start_time
self.stats['stage_times'][self.current_stage] = stage_duration
self.stages_completed.append({
'stage': self.current_stage,
'duration': stage_duration,
'frames_processed': self.processed_frames
})
logger.debug(f"Completed stage '{self.current_stage}' in {stage_duration:.2f}s")
def _generate_progress_message(self, elapsed_time: float, current_fps: float,
eta_seconds: float, stage: str) -> str:
"""Generate comprehensive progress message"""
# Base progress info
message = (
f"Frame {self.processed_frames}/{self.total_frames} | "
f"Elapsed: {self._format_time(elapsed_time)} | "
f"Speed: {current_fps:.1f} fps"
)
# Add ETA if meaningful
if eta_seconds > 0:
message += f" | ETA: {self._format_time(eta_seconds)}"
# Add stage information
if stage:
message = f"{stage} | {message}"
# Add performance indicators
if self.fps_history and len(self.fps_history) >= 10:
recent_avg = sum(list(self.fps_history)[-10:]) / 10
if abs(current_fps - recent_avg) / recent_avg > 0.2: # 20% difference
trend = "↗" if current_fps > recent_avg else "↘"
message += f" {trend}"
return message
def _format_time(self, seconds: float) -> str:
"""Format time duration in human-readable format"""
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes}m {secs}s"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
return f"{hours}h {minutes}m"
def _log_detailed_progress(self, progress_pct: float, current_fps: float, eta_seconds: float):
"""Log detailed progress information"""
logger.info(
f"Progress: {progress_pct*100:.1f}% | "
f"FPS: {current_fps:.1f} (avg: {self.stats['average_fps']:.1f}) | "
f"ETA: {self._format_time(eta_seconds)} | "
f"Stage: {self.current_stage}"
)
def set_stage(self, stage: str):
"""Manually set the current processing stage"""
if stage != self.current_stage:
self._complete_stage()
self.current_stage = stage
self.stage_start_time = time.time()
logger.debug(f"Stage changed to: {stage}")
def add_custom_metric(self, key: str, value: Any):
"""Add a custom metric to the current snapshot"""
if self.snapshots:
self.snapshots[-1].custom_metrics[key] = value
def get_performance_summary(self) -> Dict[str, Any]:
"""Get comprehensive performance summary"""
self._complete_stage() # Complete current stage
total_time = time.time() - self.start_time
summary = {
'total_frames': self.total_frames,
'processed_frames': self.processed_frames,
'completion_percentage': (self.processed_frames / self.total_frames * 100) if self.total_frames > 0 else 0,
'total_processing_time': total_time,
'overall_fps': self.processed_frames / total_time if total_time > 0 else 0,
'stages_completed': len(self.stages_completed),
'current_stage': self.current_stage,
'statistics': self.stats.copy(),
'stage_breakdown': self.stages_completed.copy()
}
# Calculate stage percentages
if self.stats['stage_times']:
total_stage_time = sum(self.stats['stage_times'].values())
summary['stage_percentages'] = {
stage: (duration / total_stage_time * 100)
for stage, duration in self.stats['stage_times'].items()
}
# Performance analysis
if self.fps_history:
fps_list = list(self.fps_history)
summary['performance_analysis'] = {
'fps_stability': self._calculate_fps_stability(),
'performance_trend': self._analyze_performance_trend(),
'bottleneck_detection': self._detect_bottlenecks()
}
return summary
def _calculate_fps_stability(self) -> str:
"""Analyze FPS stability"""
if not self.fps_history or len(self.fps_history) < 10:
return "insufficient_data"
variance = self.stats['frames_per_second_variance']
avg_fps = self.stats['average_fps']
if avg_fps == 0:
return "unstable"
coefficient_of_variation = (variance ** 0.5) / avg_fps
if coefficient_of_variation < 0.1:
return "very_stable"
elif coefficient_of_variation < 0.2:
return "stable"
elif coefficient_of_variation < 0.4:
return "moderate"
else:
return "unstable"
def _analyze_performance_trend(self) -> str:
"""Analyze performance trend over time"""
if len(self.fps_history) < 20:
return "insufficient_data"
# Compare first and last quartiles
fps_list = list(self.fps_history)
quartile_size = len(fps_list) // 4
first_quartile_avg = sum(fps_list[:quartile_size]) / quartile_size
last_quartile_avg = sum(fps_list[-quartile_size:]) / quartile_size
change_percent = ((last_quartile_avg - first_quartile_avg) / first_quartile_avg) * 100
if change_percent > 10:
return "improving"
elif change_percent < -10:
return "degrading"
else:
return "stable"
def _detect_bottlenecks(self) -> List[str]:
"""Detect potential performance bottlenecks"""
bottlenecks = []
# Check for consistently low FPS
if self.stats['average_fps'] < 0.5:
bottlenecks.append("very_low_fps")
# Check for high variance
if self.stats['frames_per_second_variance'] > (self.stats['average_fps'] * 0.5) ** 2:
bottlenecks.append("inconsistent_performance")
# Check for memory pressure (if tracked)
if self.stats['memory_peak_mb'] > 8000: # 8GB
bottlenecks.append("high_memory_usage")
# Check stage timing imbalances
if self.stats['stage_times']:
stage_times = list(self.stats['stage_times'].values())
max_time = max(stage_times)
avg_time = sum(stage_times) / len(stage_times)
if max_time > avg_time * 3:
bottlenecks.append("stage_imbalance")
return bottlenecks
def export_performance_data(self) -> Dict[str, Any]:
"""Export detailed performance data for analysis"""
return {
'metadata': {
'total_frames': self.total_frames,
'tracking_enabled': self.track_performance,
'start_time': self.start_time,
'export_time': time.time()
},
'snapshots': [
{
'timestamp': snap.timestamp,
'frame_number': snap.frame_number,
'stage': snap.stage,
'fps': snap.fps,
'memory_usage_mb': snap.memory_usage_mb,
'custom_metrics': snap.custom_metrics
}
for snap in self.snapshots
],
'statistics': self.stats,
'stages': self.stages_completed,
'performance_summary': self.get_performance_summary()
}
def reset(self, new_total_frames: Optional[int] = None):
"""Reset tracker for new processing session"""
if new_total_frames is not None:
self.total_frames = new_total_frames
self.start_time = time.time()
self.last_update_time = self.start_time
self.processed_frames = 0
self.frame_times.clear()
self.fps_history.clear()
self.snapshots.clear()
self.current_stage = "initializing"
self.stage_start_time = self.start_time
self.stages_completed.clear()
self.smoothed_fps = 0.0
# Reset statistics
self.stats = {
'total_processing_time': 0.0,
'average_fps': 0.0,
'peak_fps': 0.0,
'slowest_fps': float('inf'),
'frames_per_second_variance': 0.0,
'estimated_completion_accuracy': 0.0,
'stage_times': {},
'memory_peak_mb': 0.0
}
logger.debug("ProgressTracker reset")
def finalize(self) -> Dict[str, Any]:
"""Finalize tracking and return comprehensive results"""
self._complete_stage()
final_summary = self.get_performance_summary()
logger.info(
f"Processing completed: {self.processed_frames}/{self.total_frames} frames "
f"in {self._format_time(final_summary['total_processing_time'])} "
f"(avg: {final_summary['overall_fps']:.1f} fps)"
)
return final_summary