Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import requests | |
| from typing import Optional, Tuple | |
| from ui.statusui import StatusUI | |
| from checks.health_check import check_model_endpoint | |
| from agents.model import huggingface_model_uri | |
| def wake_up_endpoint( | |
| endpoint_uri: str, | |
| ui, | |
| max_wait: int = 300, | |
| initial_delay: float = 3.0, | |
| backoff_factor: float = 1.5, | |
| max_retry_delay: float = 10.0 | |
| ) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Poll the endpoint until it responds OK or timeout. | |
| Args: | |
| endpoint_uri: The endpoint URL to monitor | |
| ui: UI object for status updates | |
| max_wait: Maximum total wait time in seconds (minimum 60s enforced) | |
| initial_delay: Initial delay between retries in seconds | |
| backoff_factor: Multiplier for exponential backoff | |
| max_retry_delay: Maximum delay between retries in seconds | |
| Returns: | |
| Tuple of (success: bool, error_message: Optional[str]) | |
| """ | |
| # Configuration validation | |
| max_wait = max(max_wait, 60) | |
| current_delay = min(initial_delay, max_retry_delay) | |
| # Prepare request components | |
| headers = {} | |
| if hf_token := os.environ.get("HF_TOKEN"): | |
| headers["Authorization"] = f"Bearer {hf_token}" | |
| payload = {"inputs": "ping"} | |
| timeout = min(5, current_delay * 0.8) # Ensure timeout is less than delay | |
| start_time = time.time() | |
| last_status = None | |
| while (time.time() - start_time) < max_wait: | |
| try: | |
| # Log attempt | |
| if endpoint_uri != last_status: | |
| ui.append(f"Pinging endpoint: {endpoint_uri}") | |
| last_status = endpoint_uri | |
| # Make the request | |
| response = requests.post( | |
| endpoint_uri, | |
| headers=headers, | |
| json=payload, | |
| timeout=timeout | |
| ) | |
| if response.ok: | |
| ui.append("✅ Endpoint is awake and responsive") | |
| return True, None | |
| # Handle specific HTTP status codes | |
| if response.status_code in {503, 504}: | |
| status_msg = f"Endpoint warming up (HTTP {response.status_code})" | |
| else: | |
| status_msg = f"Unexpected response (HTTP {response.status_code})" | |
| ui.append(f"{status_msg}, retrying in {current_delay:.1f}s...") | |
| except requests.exceptions.RequestException as e: | |
| ui.append(f"Connection error ({type(e).__name__}), retrying in {current_delay:.1f}s...") | |
| # Wait before next attempt with exponential backoff | |
| time.sleep(current_delay) | |
| current_delay = min(current_delay * backoff_factor, max_retry_delay) | |
| timeout = min(5, current_delay * 0.8) | |
| # Timeout reached | |
| error_msg = f"❌ Timed out after {max_wait}s waiting for endpoint" | |
| ui.append(error_msg) | |
| return False, error_msg | |
| def run_status_checks(): | |
| """Run all status checks and return endpoint URI if successful""" | |
| ui = StatusUI() | |
| #ui.launch() | |
| # Then launch it (non-blocking) | |
| ui.launch(inbrowser=False, prevent_thread_lock=True) | |
| # Now send messages | |
| ui.append("Starting prechecks...") | |
| time.sleep(0.5) # Brief pause for UI to initialize | |
| ui.append("Checking endpoint..") | |
| ui.append("Starting prechecks...") | |
| ui.append("Checking endpoint..") | |
| endpoint_uri = huggingface_model_uri() # Get the URI for the endpoint | |
| ui.append(endpoint_uri) | |
| # Wake it up before health check | |
| wake_up_successful = wake_up_endpoint(endpoint_uri, ui) | |
| success, error_msg = wake_up_endpoint(endpoint_uri, ui) | |
| if not success: | |
| ui.append("Warning: Could not wake up the endpoint. Exiting.") | |
| else: | |
| ui.append("✅ End point responded OK.") | |
| is_healthy, status_info = check_model_endpoint(endpoint_uri) # Test the endpoint | |
| if not is_healthy: | |
| from checks.failed_check import create_failed_gradio_ui | |
| interface = create_failed_gradio_ui(status_info) | |
| interface.launch(show_error=True, share=True) | |
| return None | |
| return endpoint_uri |