File size: 2,698 Bytes
4585d4c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Utilities for making Streamlit work better with async operations.
This module provides functions to help execute async code in Streamlit.
"""

import asyncio
import concurrent.futures
import functools
import threading
import nest_asyncio
from typing import Any, Callable, TypeVar, cast

T = TypeVar('T')

# Apply nest_asyncio to allow nested event loops
try:
    nest_asyncio.apply()
except ImportError:
    pass  # If nest_asyncio isn't available, continue without it


def run_async(func: Callable[..., Any]) -> Callable[..., Any]:
    """
    Decorator to safely run an async function in Streamlit.
    This decorator handles the complexities of running async code in Streamlit,
    which can have issues with event loops.

    Args:
        func: The async function to run

    Returns:
        A wrapper function that can be called synchronously
    """
    @functools.wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        return run_in_thread_pool(lambda: run_in_executor(func, *args, **kwargs))
    return wrapper


def run_in_thread_pool(func: Callable[[], T]) -> T:
    """
    Run a function in a thread pool to isolate it from Streamlit's event loop.

    Args:
        func: The function to run

    Returns:
        The result of the function
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func)
        return cast(T, future.result(timeout=60))  # 60 second timeout


def run_in_executor(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
    """
    Run an async function in a new event loop within the current thread.

    Args:
        func: The async function to run
        *args: Arguments to pass to the function
        **kwargs: Keyword arguments to pass to the function

    Returns:
        The result of the async function
    """
    async_result = None
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        if asyncio.iscoroutinefunction(func):
            # If it's already an async function
            coro = func(*args, **kwargs)
        else:
            # If it's a regular function that returns a coroutine
            coro = func(*args, **kwargs)

        async_result = loop.run_until_complete(coro)
    finally:
        try:
            if loop and not loop.is_closed():
                loop.close()
        except Exception:
            pass  # Ignore errors while closing the loop

    return async_result


# Example usage:
# @run_async
# async def fetch_data_from_blockchain(address):
#     # Your async code here
#     return result
#
# # Then in your Streamlit app:
# data = fetch_data_from_blockchain("0x123...")