Spaces:
Build error
Build error
| """ | |
| Utilities for making Streamlit work better with async operations. | |
| This module provides functions to help execute async code in Streamlit. | |
| """ | |
| import asyncio | |
| import concurrent.futures | |
| import functools | |
| import threading | |
| import nest_asyncio | |
| from typing import Any, Callable, TypeVar, cast | |
| T = TypeVar('T') | |
| # Apply nest_asyncio to allow nested event loops | |
| try: | |
| nest_asyncio.apply() | |
| except ImportError: | |
| pass # If nest_asyncio isn't available, continue without it | |
| def run_async(func: Callable[..., Any]) -> Callable[..., Any]: | |
| """ | |
| Decorator to safely run an async function in Streamlit. | |
| This decorator handles the complexities of running async code in Streamlit, | |
| which can have issues with event loops. | |
| Args: | |
| func: The async function to run | |
| Returns: | |
| A wrapper function that can be called synchronously | |
| """ | |
| def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| return run_in_thread_pool(lambda: run_in_executor(func, *args, **kwargs)) | |
| return wrapper | |
| def run_in_thread_pool(func: Callable[[], T]) -> T: | |
| """ | |
| Run a function in a thread pool to isolate it from Streamlit's event loop. | |
| Args: | |
| func: The function to run | |
| Returns: | |
| The result of the function | |
| """ | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(func) | |
| return cast(T, future.result(timeout=60)) # 60 second timeout | |
| def run_in_executor(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: | |
| """ | |
| Run an async function in a new event loop within the current thread. | |
| Args: | |
| func: The async function to run | |
| *args: Arguments to pass to the function | |
| **kwargs: Keyword arguments to pass to the function | |
| Returns: | |
| The result of the async function | |
| """ | |
| async_result = None | |
| try: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| if asyncio.iscoroutinefunction(func): | |
| # If it's already an async function | |
| coro = func(*args, **kwargs) | |
| else: | |
| # If it's a regular function that returns a coroutine | |
| coro = func(*args, **kwargs) | |
| async_result = loop.run_until_complete(coro) | |
| finally: | |
| try: | |
| if loop and not loop.is_closed(): | |
| loop.close() | |
| except Exception: | |
| pass # Ignore errors while closing the loop | |
| return async_result | |
| # Example usage: | |
| # @run_async | |
| # async def fetch_data_from_blockchain(address): | |
| # # Your async code here | |
| # return result | |
| # | |
| # # Then in your Streamlit app: | |
| # data = fetch_data_from_blockchain("0x123...") |