Spaces:
Runtime error
Runtime error
| # generates periodic hearbeats for remote expriment monitoring | |
| from pathlib import Path | |
| import json | |
| from inspect import stack | |
| from .ticker import IntervalTicker | |
| _CURRENT_BEAT_STACK = [] | |
| def get_heartbeat(): | |
| """ | |
| Returns: | |
| The :class:`HeartBeat` object that's currently being used. | |
| Throws an error if no :class:`EventStorage` is currently enabled. | |
| """ | |
| assert len( | |
| _CURRENT_BEAT_STACK | |
| ), "get_heartbeat() has to be called inside a 'with EventStorage(...)' context!" | |
| return _CURRENT_BEAT_STACK[-1] | |
| def get_tqdm_meter(pbar, format_dict): | |
| format_dict['bar_format'] = "{r_bar}" | |
| meter_str = pbar.format_meter(**format_dict) | |
| meter_str = meter_str[2:] | |
| return meter_str | |
| def caller_info(n_stack_up): | |
| info = stack()[1 + n_stack_up] # 1 up as base so that it starts from caller | |
| msg = f"{info.filename}:{info.lineno} - {info.function}" | |
| return msg | |
| class HeartBeat(): | |
| def __init__( | |
| self, pbar, write_interval=10, | |
| output_dir="./", fname="heartbeat.json" | |
| ): | |
| self.pbar = pbar | |
| self.fname = Path(output_dir) / fname | |
| self.ticker = IntervalTicker(write_interval) | |
| self.completed = False | |
| # force one write at the beginning | |
| self.beat(force_write=True, n_stack_up=2) | |
| def beat(self, force_write=False, n_stack_up=1): | |
| on_write_period = self.ticker.tick() | |
| if force_write or on_write_period: | |
| stats = self.stats() | |
| stats['caller'] = caller_info(n_stack_up) | |
| with open(self.fname, "w") as f: | |
| json.dump(stats, f) | |
| def done(self): | |
| self.completed = True | |
| self.beat(force_write=True, n_stack_up=2) | |
| def stats(self): | |
| pbar = self.pbar | |
| fdict = pbar.format_dict | |
| stats = { | |
| "beat": self.ticker.tick_str(), | |
| "done": self.completed, | |
| "meter": get_tqdm_meter(pbar, fdict), | |
| "elapsed": int(fdict['elapsed']) | |
| } | |
| return stats | |
| def __enter__(self): | |
| _CURRENT_BEAT_STACK.append(self) | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| assert _CURRENT_BEAT_STACK[-1] == self | |
| _CURRENT_BEAT_STACK.pop() | |