Spaces:
Runtime error
Runtime error
| from threading import Thread | |
| import gradio as gr | |
| import inspect | |
| from gradio import routes | |
| from typing import List, Type | |
| import requests, os, re, asyncio, queue | |
| import math | |
| import time | |
| import datetime | |
| import requests, json | |
| loop = asyncio.get_event_loop() | |
| # Monkey patch | |
| def get_types(cls_set: List[Type], component: str): | |
| docset = [] | |
| types = [] | |
| if component == "input": | |
| for cls in cls_set: | |
| doc = inspect.getdoc(cls) | |
| doc_lines = doc.split("\n") | |
| docset.append(doc_lines[1].split(":")[-1]) | |
| types.append(doc_lines[1].split(")")[0].split("(")[-1]) | |
| else: | |
| for cls in cls_set: | |
| doc = inspect.getdoc(cls) | |
| doc_lines = doc.split("\n") | |
| docset.append(doc_lines[-1].split(":")[-1]) | |
| types.append(doc_lines[-1].split(")")[0].split("(")[-1]) | |
| return docset, types | |
| routes.get_types = get_types | |
| q = queue.Queue() | |
| # App code | |
| def res(x, id, cdata, url): | |
| global q | |
| arr = [x, id, str(cdata.split(",", 1)[0]), url] | |
| q.put(arr) | |
| print("\n_Done\n\n") | |
| return "Done" | |
| with gr.Blocks() as demo: | |
| count = 0 | |
| aa = gr.Interface( | |
| fn=res, | |
| inputs=["text","text", "text", "text"], | |
| outputs="text", | |
| description="call", | |
| ) | |
| demo.queue(max_size=32).launch(enable_queue=True) |