Spaces:
Runtime error
Runtime error
| from queue import Queue | |
| import argilla as rg | |
| import gradio as gr | |
| client = rg.Argilla() | |
| incoming_events = Queue() | |
| def check_incoming_events(): | |
| """ | |
| This function is called every 5 seconds to check if there are any incoming | |
| events and send data to update the JSON component. | |
| """ | |
| events = [] | |
| while not incoming_events.empty(): | |
| events.append(incoming_events.get()) | |
| return {"events": events} | |
| with gr.Blocks() as demo: | |
| argilla_server = client.http_client.base_url | |
| gr.Markdown("## Argilla Events") | |
| gr.Markdown(f"This demo shows the incoming events from the [Argilla Server]({argilla_server}).") | |
| json_component = gr.JSON(label="Incoming argilla events:") | |
| gr.Timer(5, active=True).tick(check_incoming_events, outputs=json_component) | |
| server, _, _ = demo.launch(prevent_thread_lock=True, app_kwargs={"docs_url": "/docs"}) | |
| # Set up the webhook listeners | |
| rg.set_webhook_server(server) | |
| # Delete all existing webhooks | |
| for webhook in client.webhooks: | |
| webhook.delete() | |
| # Create a webhook for record events | |
| async def record_events(event:dict): | |
| print("Received event", event) | |
| incoming_events.put(event) | |
| # Create a webhook for dataset events | |
| async def dataset_events(type: str, dataset: rg.Dataset | None = None, **kwargs): | |
| print(f"Received event {type} for dataset {dataset.id}") | |
| incoming_events.put((type, dataset)) | |
| # Create a webhook for response events | |
| async def response_events(event: dict): | |
| print("Received event", event) | |
| incoming_events.put(event) | |
| async def deleted_events(type: str, data: dict, **kwargs): | |
| print(f"Received event {type} for resource {data}") | |
| incoming_events.put((type, data)) | |
| demo.block_thread() | |