# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. from collections import defaultdict from contextlib import contextmanager from functools import partial import numpy as np from mmengine import Timer class RunningAverage(): r"""A helper class to calculate running average in a sliding window. Args: window (int): The size of the sliding window. """ def __init__(self, window: int = 1): self.window = window self._data = [] def update(self, value): """Update a new data sample.""" self._data.append(value) self._data = self._data[-self.window:] def average(self): """Get the average value of current window.""" return np.mean(self._data) class StopWatch: r"""A helper class to measure FPS and detailed time consuming of each phase in a video processing loop or similar scenarios. Args: window (int): The sliding window size to calculate the running average of the time consuming. Example: >>> from mmpose.utils import StopWatch >>> import time >>> stop_watch = StopWatch(window=10) >>> with stop_watch.timeit('total'): >>> time.sleep(0.1) >>> # 'timeit' support nested use >>> with stop_watch.timeit('phase1'): >>> time.sleep(0.1) >>> with stop_watch.timeit('phase2'): >>> time.sleep(0.2) >>> time.sleep(0.2) >>> report = stop_watch.report() """ def __init__(self, window=1): self.window = window self._record = defaultdict(partial(RunningAverage, window=self.window)) self._timer_stack = [] @contextmanager def timeit(self, timer_name='_FPS_'): """Timing a code snippet with an assigned name. Args: timer_name (str): The unique name of the interested code snippet to handle multiple timers and generate reports. Note that '_FPS_' is a special key that the measurement will be in `fps` instead of `millisecond`. Also see `report` and `report_strings`. Default: '_FPS_'. Note: This function should always be used in a `with` statement, as shown in the example. """ self._timer_stack.append((timer_name, Timer())) try: yield finally: timer_name, timer = self._timer_stack.pop() self._record[timer_name].update(timer.since_start()) def report(self, key=None): """Report timing information. Returns: dict: The key is the timer name and the value is the \ corresponding average time consuming. """ result = { name: r.average() * 1000. for name, r in self._record.items() } if '_FPS_' in result: result['_FPS_'] = 1000. / result.pop('_FPS_') if key is None: return result return result[key] def report_strings(self): """Report timing information in texture strings. Returns: list(str): Each element is the information string of a timed \ event, in format of '{timer_name}: {time_in_ms}'. \ Specially, if timer_name is '_FPS_', the result will \ be converted to fps. """ result = self.report() strings = [] if '_FPS_' in result: strings.append(f'FPS: {result["_FPS_"]:>5.1f}') strings += [f'{name}: {val:>3.0f}' for name, val in result.items()] return strings def reset(self): self._record = defaultdict(list) self._active_timer_stack = []