|
|
""" |
|
|
Progress Tracking Module |
|
|
Handles progress monitoring, ETA calculations, and performance statistics |
|
|
""" |
|
|
|
|
|
import time |
|
|
import logging |
|
|
from typing import Optional, Callable, Dict, Any, List |
|
|
from dataclasses import dataclass, field |
|
|
from collections import deque |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class ProgressSnapshot: |
|
|
"""Snapshot of progress at a specific point in time""" |
|
|
timestamp: float |
|
|
frame_number: int |
|
|
stage: str |
|
|
fps: float |
|
|
memory_usage_mb: Optional[float] = None |
|
|
custom_metrics: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
class ProgressTracker: |
|
|
""" |
|
|
Enhanced progress tracking with detailed statistics and ETA calculations |
|
|
""" |
|
|
|
|
|
def __init__(self, total_frames: int, callback: Optional[Callable] = None, |
|
|
track_performance: bool = True): |
|
|
self.total_frames = total_frames |
|
|
self.callback = callback |
|
|
self.track_performance = track_performance |
|
|
|
|
|
|
|
|
self.start_time = time.time() |
|
|
self.last_update_time = self.start_time |
|
|
self.processed_frames = 0 |
|
|
|
|
|
|
|
|
self.frame_times = deque(maxlen=100) |
|
|
self.fps_history = deque(maxlen=50) |
|
|
self.snapshots: List[ProgressSnapshot] = [] |
|
|
|
|
|
|
|
|
self.current_stage = "initializing" |
|
|
self.stage_start_time = self.start_time |
|
|
self.stages_completed = [] |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'total_processing_time': 0.0, |
|
|
'average_fps': 0.0, |
|
|
'peak_fps': 0.0, |
|
|
'slowest_fps': float('inf'), |
|
|
'frames_per_second_variance': 0.0, |
|
|
'estimated_completion_accuracy': 0.0, |
|
|
'stage_times': {}, |
|
|
'memory_peak_mb': 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
self.eta_smoothing_factor = 0.2 |
|
|
self.smoothed_fps = 0.0 |
|
|
|
|
|
logger.debug(f"ProgressTracker initialized for {total_frames} frames") |
|
|
|
|
|
def update(self, frame_number: int, stage: str = "", |
|
|
custom_metrics: Optional[Dict[str, Any]] = None, |
|
|
memory_usage_mb: Optional[float] = None): |
|
|
""" |
|
|
Update progress with comprehensive tracking |
|
|
|
|
|
Args: |
|
|
frame_number: Current frame being processed |
|
|
stage: Current processing stage description |
|
|
custom_metrics: Additional metrics to track |
|
|
memory_usage_mb: Current memory usage in MB |
|
|
""" |
|
|
current_time = time.time() |
|
|
|
|
|
|
|
|
if stage and stage != self.current_stage: |
|
|
self._complete_stage() |
|
|
self.current_stage = stage |
|
|
self.stage_start_time = current_time |
|
|
|
|
|
|
|
|
if self.processed_frames > 0: |
|
|
frame_time = current_time - self.last_update_time |
|
|
self.frame_times.append(frame_time) |
|
|
|
|
|
self.processed_frames = frame_number |
|
|
self.last_update_time = current_time |
|
|
|
|
|
|
|
|
elapsed_time = current_time - self.start_time |
|
|
current_fps = self._calculate_current_fps() |
|
|
|
|
|
|
|
|
if current_fps > 0: |
|
|
self.fps_history.append(current_fps) |
|
|
self._update_smoothed_fps(current_fps) |
|
|
|
|
|
|
|
|
eta_seconds = self._calculate_eta() |
|
|
progress_pct = self.processed_frames / self.total_frames if self.total_frames > 0 else 0 |
|
|
|
|
|
|
|
|
self._update_statistics(current_fps, memory_usage_mb) |
|
|
|
|
|
|
|
|
if self.track_performance: |
|
|
snapshot = ProgressSnapshot( |
|
|
timestamp=current_time, |
|
|
frame_number=frame_number, |
|
|
stage=self.current_stage, |
|
|
fps=current_fps, |
|
|
memory_usage_mb=memory_usage_mb, |
|
|
custom_metrics=custom_metrics or {} |
|
|
) |
|
|
self.snapshots.append(snapshot) |
|
|
|
|
|
|
|
|
message = self._generate_progress_message( |
|
|
elapsed_time, current_fps, eta_seconds, stage |
|
|
) |
|
|
|
|
|
|
|
|
if self.callback: |
|
|
try: |
|
|
self.callback(progress_pct, message) |
|
|
except Exception as e: |
|
|
logger.warning(f"Progress callback failed: {e}") |
|
|
|
|
|
|
|
|
if frame_number % 50 == 0 or frame_number == self.total_frames: |
|
|
self._log_detailed_progress(progress_pct, current_fps, eta_seconds) |
|
|
|
|
|
def _calculate_current_fps(self) -> float: |
|
|
"""Calculate current FPS based on recent frame times""" |
|
|
if not self.frame_times: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
recent_frame_times = list(self.frame_times)[-10:] |
|
|
avg_frame_time = sum(recent_frame_times) / len(recent_frame_times) |
|
|
|
|
|
return 1.0 / avg_frame_time if avg_frame_time > 0 else 0.0 |
|
|
|
|
|
def _update_smoothed_fps(self, current_fps: float): |
|
|
"""Update smoothed FPS using exponential smoothing""" |
|
|
if self.smoothed_fps == 0.0: |
|
|
self.smoothed_fps = current_fps |
|
|
else: |
|
|
self.smoothed_fps = ( |
|
|
self.eta_smoothing_factor * current_fps + |
|
|
(1 - self.eta_smoothing_factor) * self.smoothed_fps |
|
|
) |
|
|
|
|
|
def _calculate_eta(self) -> float: |
|
|
"""Calculate estimated time to completion""" |
|
|
if self.processed_frames <= 0 or self.smoothed_fps <= 0: |
|
|
return 0.0 |
|
|
|
|
|
remaining_frames = self.total_frames - self.processed_frames |
|
|
return remaining_frames / self.smoothed_fps |
|
|
|
|
|
def _update_statistics(self, current_fps: float, memory_usage_mb: Optional[float]): |
|
|
"""Update comprehensive statistics""" |
|
|
current_time = time.time() |
|
|
self.stats['total_processing_time'] = current_time - self.start_time |
|
|
|
|
|
|
|
|
if self.fps_history: |
|
|
fps_list = list(self.fps_history) |
|
|
self.stats['average_fps'] = sum(fps_list) / len(fps_list) |
|
|
self.stats['peak_fps'] = max(fps_list) |
|
|
self.stats['slowest_fps'] = min(fps_list) |
|
|
|
|
|
|
|
|
avg_fps = self.stats['average_fps'] |
|
|
variance = sum((fps - avg_fps) ** 2 for fps in fps_list) / len(fps_list) |
|
|
self.stats['frames_per_second_variance'] = variance |
|
|
|
|
|
|
|
|
if memory_usage_mb and memory_usage_mb > self.stats['memory_peak_mb']: |
|
|
self.stats['memory_peak_mb'] = memory_usage_mb |
|
|
|
|
|
def _complete_stage(self): |
|
|
"""Complete the current stage and record its duration""" |
|
|
if self.current_stage: |
|
|
stage_duration = time.time() - self.stage_start_time |
|
|
self.stats['stage_times'][self.current_stage] = stage_duration |
|
|
self.stages_completed.append({ |
|
|
'stage': self.current_stage, |
|
|
'duration': stage_duration, |
|
|
'frames_processed': self.processed_frames |
|
|
}) |
|
|
logger.debug(f"Completed stage '{self.current_stage}' in {stage_duration:.2f}s") |
|
|
|
|
|
def _generate_progress_message(self, elapsed_time: float, current_fps: float, |
|
|
eta_seconds: float, stage: str) -> str: |
|
|
"""Generate comprehensive progress message""" |
|
|
|
|
|
message = ( |
|
|
f"Frame {self.processed_frames}/{self.total_frames} | " |
|
|
f"Elapsed: {self._format_time(elapsed_time)} | " |
|
|
f"Speed: {current_fps:.1f} fps" |
|
|
) |
|
|
|
|
|
|
|
|
if eta_seconds > 0: |
|
|
message += f" | ETA: {self._format_time(eta_seconds)}" |
|
|
|
|
|
|
|
|
if stage: |
|
|
message = f"{stage} | {message}" |
|
|
|
|
|
|
|
|
if self.fps_history and len(self.fps_history) >= 10: |
|
|
recent_avg = sum(list(self.fps_history)[-10:]) / 10 |
|
|
if abs(current_fps - recent_avg) / recent_avg > 0.2: |
|
|
trend = "↗" if current_fps > recent_avg else "↘" |
|
|
message += f" {trend}" |
|
|
|
|
|
return message |
|
|
|
|
|
def _format_time(self, seconds: float) -> str: |
|
|
"""Format time duration in human-readable format""" |
|
|
if seconds < 60: |
|
|
return f"{int(seconds)}s" |
|
|
elif seconds < 3600: |
|
|
minutes = int(seconds // 60) |
|
|
secs = int(seconds % 60) |
|
|
return f"{minutes}m {secs}s" |
|
|
else: |
|
|
hours = int(seconds // 3600) |
|
|
minutes = int((seconds % 3600) // 60) |
|
|
return f"{hours}h {minutes}m" |
|
|
|
|
|
def _log_detailed_progress(self, progress_pct: float, current_fps: float, eta_seconds: float): |
|
|
"""Log detailed progress information""" |
|
|
logger.info( |
|
|
f"Progress: {progress_pct*100:.1f}% | " |
|
|
f"FPS: {current_fps:.1f} (avg: {self.stats['average_fps']:.1f}) | " |
|
|
f"ETA: {self._format_time(eta_seconds)} | " |
|
|
f"Stage: {self.current_stage}" |
|
|
) |
|
|
|
|
|
def set_stage(self, stage: str): |
|
|
"""Manually set the current processing stage""" |
|
|
if stage != self.current_stage: |
|
|
self._complete_stage() |
|
|
self.current_stage = stage |
|
|
self.stage_start_time = time.time() |
|
|
logger.debug(f"Stage changed to: {stage}") |
|
|
|
|
|
def add_custom_metric(self, key: str, value: Any): |
|
|
"""Add a custom metric to the current snapshot""" |
|
|
if self.snapshots: |
|
|
self.snapshots[-1].custom_metrics[key] = value |
|
|
|
|
|
def get_performance_summary(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive performance summary""" |
|
|
self._complete_stage() |
|
|
|
|
|
total_time = time.time() - self.start_time |
|
|
|
|
|
summary = { |
|
|
'total_frames': self.total_frames, |
|
|
'processed_frames': self.processed_frames, |
|
|
'completion_percentage': (self.processed_frames / self.total_frames * 100) if self.total_frames > 0 else 0, |
|
|
'total_processing_time': total_time, |
|
|
'overall_fps': self.processed_frames / total_time if total_time > 0 else 0, |
|
|
'stages_completed': len(self.stages_completed), |
|
|
'current_stage': self.current_stage, |
|
|
'statistics': self.stats.copy(), |
|
|
'stage_breakdown': self.stages_completed.copy() |
|
|
} |
|
|
|
|
|
|
|
|
if self.stats['stage_times']: |
|
|
total_stage_time = sum(self.stats['stage_times'].values()) |
|
|
summary['stage_percentages'] = { |
|
|
stage: (duration / total_stage_time * 100) |
|
|
for stage, duration in self.stats['stage_times'].items() |
|
|
} |
|
|
|
|
|
|
|
|
if self.fps_history: |
|
|
fps_list = list(self.fps_history) |
|
|
summary['performance_analysis'] = { |
|
|
'fps_stability': self._calculate_fps_stability(), |
|
|
'performance_trend': self._analyze_performance_trend(), |
|
|
'bottleneck_detection': self._detect_bottlenecks() |
|
|
} |
|
|
|
|
|
return summary |
|
|
|
|
|
def _calculate_fps_stability(self) -> str: |
|
|
"""Analyze FPS stability""" |
|
|
if not self.fps_history or len(self.fps_history) < 10: |
|
|
return "insufficient_data" |
|
|
|
|
|
variance = self.stats['frames_per_second_variance'] |
|
|
avg_fps = self.stats['average_fps'] |
|
|
|
|
|
if avg_fps == 0: |
|
|
return "unstable" |
|
|
|
|
|
coefficient_of_variation = (variance ** 0.5) / avg_fps |
|
|
|
|
|
if coefficient_of_variation < 0.1: |
|
|
return "very_stable" |
|
|
elif coefficient_of_variation < 0.2: |
|
|
return "stable" |
|
|
elif coefficient_of_variation < 0.4: |
|
|
return "moderate" |
|
|
else: |
|
|
return "unstable" |
|
|
|
|
|
def _analyze_performance_trend(self) -> str: |
|
|
"""Analyze performance trend over time""" |
|
|
if len(self.fps_history) < 20: |
|
|
return "insufficient_data" |
|
|
|
|
|
|
|
|
fps_list = list(self.fps_history) |
|
|
quartile_size = len(fps_list) // 4 |
|
|
|
|
|
first_quartile_avg = sum(fps_list[:quartile_size]) / quartile_size |
|
|
last_quartile_avg = sum(fps_list[-quartile_size:]) / quartile_size |
|
|
|
|
|
change_percent = ((last_quartile_avg - first_quartile_avg) / first_quartile_avg) * 100 |
|
|
|
|
|
if change_percent > 10: |
|
|
return "improving" |
|
|
elif change_percent < -10: |
|
|
return "degrading" |
|
|
else: |
|
|
return "stable" |
|
|
|
|
|
def _detect_bottlenecks(self) -> List[str]: |
|
|
"""Detect potential performance bottlenecks""" |
|
|
bottlenecks = [] |
|
|
|
|
|
|
|
|
if self.stats['average_fps'] < 0.5: |
|
|
bottlenecks.append("very_low_fps") |
|
|
|
|
|
|
|
|
if self.stats['frames_per_second_variance'] > (self.stats['average_fps'] * 0.5) ** 2: |
|
|
bottlenecks.append("inconsistent_performance") |
|
|
|
|
|
|
|
|
if self.stats['memory_peak_mb'] > 8000: |
|
|
bottlenecks.append("high_memory_usage") |
|
|
|
|
|
|
|
|
if self.stats['stage_times']: |
|
|
stage_times = list(self.stats['stage_times'].values()) |
|
|
max_time = max(stage_times) |
|
|
avg_time = sum(stage_times) / len(stage_times) |
|
|
|
|
|
if max_time > avg_time * 3: |
|
|
bottlenecks.append("stage_imbalance") |
|
|
|
|
|
return bottlenecks |
|
|
|
|
|
def export_performance_data(self) -> Dict[str, Any]: |
|
|
"""Export detailed performance data for analysis""" |
|
|
return { |
|
|
'metadata': { |
|
|
'total_frames': self.total_frames, |
|
|
'tracking_enabled': self.track_performance, |
|
|
'start_time': self.start_time, |
|
|
'export_time': time.time() |
|
|
}, |
|
|
'snapshots': [ |
|
|
{ |
|
|
'timestamp': snap.timestamp, |
|
|
'frame_number': snap.frame_number, |
|
|
'stage': snap.stage, |
|
|
'fps': snap.fps, |
|
|
'memory_usage_mb': snap.memory_usage_mb, |
|
|
'custom_metrics': snap.custom_metrics |
|
|
} |
|
|
for snap in self.snapshots |
|
|
], |
|
|
'statistics': self.stats, |
|
|
'stages': self.stages_completed, |
|
|
'performance_summary': self.get_performance_summary() |
|
|
} |
|
|
|
|
|
def reset(self, new_total_frames: Optional[int] = None): |
|
|
"""Reset tracker for new processing session""" |
|
|
if new_total_frames is not None: |
|
|
self.total_frames = new_total_frames |
|
|
|
|
|
self.start_time = time.time() |
|
|
self.last_update_time = self.start_time |
|
|
self.processed_frames = 0 |
|
|
self.frame_times.clear() |
|
|
self.fps_history.clear() |
|
|
self.snapshots.clear() |
|
|
self.current_stage = "initializing" |
|
|
self.stage_start_time = self.start_time |
|
|
self.stages_completed.clear() |
|
|
self.smoothed_fps = 0.0 |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'total_processing_time': 0.0, |
|
|
'average_fps': 0.0, |
|
|
'peak_fps': 0.0, |
|
|
'slowest_fps': float('inf'), |
|
|
'frames_per_second_variance': 0.0, |
|
|
'estimated_completion_accuracy': 0.0, |
|
|
'stage_times': {}, |
|
|
'memory_peak_mb': 0.0 |
|
|
} |
|
|
|
|
|
logger.debug("ProgressTracker reset") |
|
|
|
|
|
def finalize(self) -> Dict[str, Any]: |
|
|
"""Finalize tracking and return comprehensive results""" |
|
|
self._complete_stage() |
|
|
final_summary = self.get_performance_summary() |
|
|
|
|
|
logger.info( |
|
|
f"Processing completed: {self.processed_frames}/{self.total_frames} frames " |
|
|
f"in {self._format_time(final_summary['total_processing_time'])} " |
|
|
f"(avg: {final_summary['overall_fps']:.1f} fps)" |
|
|
) |
|
|
|
|
|
return final_summary |