Spaces:
Sleeping
Sleeping
| import contextlib | |
| import io | |
| import re | |
| import sys | |
| import threading | |
| import streamlit as st | |
| class _Redirect: | |
| class IOStuff(io.StringIO): | |
| def __init__( | |
| self, trigger, max_buffer, buffer_separator, regex, dup, need_dup, on_thread | |
| ): | |
| super().__init__() | |
| self._trigger = trigger | |
| self._max_buffer = max_buffer | |
| self._buffer_separator = buffer_separator | |
| self._regex = regex and re.compile(regex) | |
| self._dup = dup | |
| self._need_dup = need_dup | |
| self._on_thread = on_thread | |
| def write(self, __s: str) -> int: | |
| res = None | |
| if self._on_thread == threading.get_ident(): | |
| if self._max_buffer: | |
| concatenated_len = super().tell() + len(__s) | |
| if concatenated_len > self._max_buffer: | |
| rest = self.get_filtered_output()[ | |
| concatenated_len - self._max_buffer : | |
| ] | |
| if self._buffer_separator is not None: | |
| rest = rest.split(self._buffer_separator, 1)[-1] | |
| super().seek(0) | |
| super().write(rest) | |
| super().truncate(super().tell() + len(__s)) | |
| res = super().write(__s) | |
| self._trigger(self.get_filtered_output()) | |
| if self._on_thread != threading.get_ident() or self._need_dup: | |
| self._dup.write(__s) | |
| return res | |
| def get_filtered_output(self): | |
| if self._regex is None or self._buffer_separator is None: | |
| return self.getvalue() | |
| return self._buffer_separator.join( | |
| filter( | |
| self._regex.search, self.getvalue().split(self._buffer_separator) | |
| ) | |
| ) | |
| def print_at_end(self): | |
| self._trigger(self.get_filtered_output()) | |
| def __init__( | |
| self, | |
| stdout=None, | |
| stderr=False, | |
| format=None, | |
| to=None, | |
| max_buffer=None, | |
| buffer_separator="\n", | |
| regex=None, | |
| duplicate_out=False, | |
| ): | |
| self.io_args = { | |
| "trigger": self._write, | |
| "max_buffer": max_buffer, | |
| "buffer_separator": buffer_separator, | |
| "regex": regex, | |
| "on_thread": threading.get_ident(), | |
| } | |
| self.redirections = [] | |
| self.st = None | |
| self.stderr = stderr is True | |
| self.stdout = stdout is True or (stdout is None and not self.stderr) | |
| self.format = format or "code" | |
| self.to = to | |
| self.fun = None | |
| self.duplicate_out = duplicate_out or None | |
| self.active_nested = None | |
| if not self.stdout and not self.stderr: | |
| raise ValueError("one of stdout or stderr must be True") | |
| if self.format not in ["text", "markdown", "latex", "code", "write"]: | |
| raise ValueError( | |
| f"format need oneof the following: {', '.join(['text', 'markdown', 'latex', 'code', 'write'])}" | |
| ) | |
| if self.to and (not hasattr(self.to, "text") or not hasattr(self.to, "empty")): | |
| raise ValueError(f"'to' is not a streamlit container object") | |
| def __enter__(self): | |
| if self.st is not None: | |
| if self.to is None: | |
| if self.active_nested is None: | |
| self.active_nested = self( | |
| format=self.format, | |
| max_buffer=self.io_args["max_buffer"], | |
| buffer_separator=self.io_args["buffer_separator"], | |
| regex=self.io_args["regex"], | |
| duplicate_out=self.duplicate_out, | |
| ) | |
| return self.active_nested.__enter__() | |
| else: | |
| raise Exception("Already entered") | |
| to = self.to or st | |
| to.text("Logs:") | |
| self.st = to.empty() | |
| self.fun = getattr(self.st, self.format) | |
| io_obj = None | |
| def redirect(to_duplicate, context_redirect): | |
| nonlocal io_obj | |
| io_obj = _Redirect.IOStuff( | |
| need_dup=self.duplicate_out and True, dup=to_duplicate, **self.io_args | |
| ) | |
| redirection = context_redirect(io_obj) | |
| self.redirections.append((redirection, io_obj)) | |
| redirection.__enter__() | |
| if self.stderr: | |
| redirect(sys.stderr, contextlib.redirect_stderr) | |
| if self.stdout: | |
| redirect(sys.stdout, contextlib.redirect_stdout) | |
| return io_obj | |
| def __call__( | |
| self, | |
| to=None, | |
| format=None, | |
| max_buffer=None, | |
| buffer_separator="\n", | |
| regex=None, | |
| duplicate_out=False, | |
| ): | |
| return _Redirect( | |
| self.stdout, | |
| self.stderr, | |
| format=format, | |
| to=to, | |
| max_buffer=max_buffer, | |
| buffer_separator=buffer_separator, | |
| regex=regex, | |
| duplicate_out=duplicate_out, | |
| ) | |
| def __exit__(self, *exc): | |
| if self.active_nested is not None: | |
| nested = self.active_nested | |
| if nested.active_nested is None: | |
| self.active_nested = None | |
| return nested.__exit__(*exc) | |
| res = None | |
| for redirection, io_obj in reversed(self.redirections): | |
| res = redirection.__exit__(*exc) | |
| io_obj.print_at_end() | |
| self.redirections = [] | |
| self.st = None | |
| self.fun = None | |
| return res | |
| def _write(self, data): | |
| self.fun(data) | |
| stdout = _Redirect() | |
| stderr = _Redirect(stderr=True) | |
| stdouterr = _Redirect(stdout=True, stderr=True) | |
| """ | |
| # can be used as | |
| import time | |
| import sys | |
| from random import getrandbits | |
| import streamlit.redirect as rd | |
| st.text('Suboutput:') | |
| so = st.empty() | |
| with rd.stdout, rd.stderr(format='markdown', to=st.sidebar): | |
| print("hello ") | |
| time.sleep(1) | |
| i = 5 | |
| while i > 0: | |
| print("**M**izu? ", file=sys.stdout if getrandbits(1) else sys.stderr) | |
| i -= 1 | |
| with rd.stdout(to=so): | |
| print(f" cica {i}") | |
| if i: | |
| time.sleep(1) | |
| # """ | |