import os import time import tempfile import shutil from pathlib import Path from typing import Optional, Tuple, Union import gradio as gr from huggingface_hub import InferenceClient, whoami # ========================= # Inference client (fal-ai) # ========================= client = InferenceClient( provider="fal-ai", api_key=os.environ.get("HF_TOKEN"), bill_to="huggingface", ) # ========================= # Auth / PRO helpers # ========================= def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool: """Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" if not token: return False if isinstance(token, gr.OAuthToken): token_str = token.token elif isinstance(token, str): token_str = token else: return False try: user_info = whoami(token=token_str) return ( user_info.get("isPro", False) or any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) ) except Exception as e: print(f"Could not verify user's PRO/Enterprise status: {e}") return False # ========================= # Storage hygiene # ========================= def cleanup_temp_files(): """Clean up old temporary .mp4 files to prevent storage overflow.""" try: temp_dir = tempfile.gettempdir() for file_path in Path(temp_dir).glob("*.mp4"): try: # Remove files older than 5 minutes if file_path.stat().st_mtime < (time.time() - 300): file_path.unlink(missing_ok=True) except Exception: pass # Ignore errors for individual files except Exception as e: print(f"Cleanup error: {e}") def _write_video_bytes_to_tempfile(video_bytes: bytes) -> str: temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) try: temp_file.write(video_bytes) temp_file.flush() return temp_file.name finally: temp_file.close() # ========================= # Generation (Text → Video) # ========================= def generate_video( prompt: str, duration: int = 8, size: str = "1280x720", api_key: Optional[str] = None, ) -> Tuple[Optional[str], str]: """ Generate video using Sora-2 via Hugging Face Inference API (fal-ai provider). Returns (video_path, status_message). """ cleanup_temp_files() try: # Use provided API key or environment variable temp_client = ( InferenceClient(provider="fal-ai", api_key=api_key, bill_to="huggingface") if api_key else client ) if not (api_key or os.environ.get("HF_TOKEN")): return None, "❌ Please set HF_TOKEN environment variable." # Call text-to-video video_bytes = temp_client.text_to_video( prompt, model="akhaliq/sora-2", # If your backend supports these, you can forward them as kwargs: # duration=duration, # size=size, ) video_path = _write_video_bytes_to_tempfile(video_bytes) return video_path, "✅ Video generated successfully!" except Exception as e: return None, f"❌ Error generating video: {str(e)}" # ========================= # Generation (Image → Video) # ========================= def generate_video_from_image( prompt: str, image_path: str, api_key: Optional[str] = None, ) -> Tuple[Optional[str], str]: """ Generate video from a single input image using Sora-2 image-to-video. Returns (video_path, status_message). """ cleanup_temp_files() if not image_path or not Path(image_path).exists(): return None, "❌ Please upload an image." try: temp_client = ( InferenceClient(provider="fal-ai", api_key=api_key, bill_to="huggingface") if api_key else client ) if not (api_key or os.environ.get("HF_TOKEN")): return None, "❌ Please set HF_TOKEN environment variable." with open(image_path, "rb") as f: input_image = f.read() video_bytes = temp_client.image_to_video( input_image, prompt=prompt or "", model="akhaliq/sora-2-image-to-video", ) video_path = _write_video_bytes_to_tempfile(video_bytes) return video_path, "✅ Video generated successfully from image!" except Exception as e: return None, f"❌ Error generating video from image: {str(e)}" # ========================= # PRO wrapper (uses request) # ========================= def generate_with_pro_auth( mode: str, prompt: str, image_path: Optional[str], request: gr.Request, ) -> Tuple[Optional[str], str]: """ Check PRO status from the request's OAuth token, then route to the appropriate generation function based on mode. """ oauth_token = getattr(request, "oauth_token", None) if not verify_pro_status(oauth_token): raise gr.Error( "Access Denied. This app is exclusively for Hugging Face PRO users. Please subscribe to PRO to use this app." ) if mode == "Text → Video": if not prompt or not prompt.strip(): return None, "❌ Please enter a prompt." return generate_video(prompt, duration=8, size="1280x720", api_key=None) # Image → Video if not image_path: return None, "❌ Please upload an image." # Prompt is optional for image→video; pass empty string if not provided return generate_video_from_image(prompt or "", image_path, api_key=None) def simple_generate(prompt: str) -> Optional[str]: """Examples: only return the video path (text→video).""" if not prompt or not prompt.strip(): return None video_path, _ = generate_video(prompt, duration=8, size="1280x720", api_key=None) return video_path # ========================= # UI # ========================= def create_ui(): css = ''' .logo-dark{display: none} .dark .logo-dark{display: block !important} .dark .logo-light{display: none} #sub_title{margin-top: -20px !important} .pro-badge{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 4px 12px; border-radius: 20px; font-size: 0.9em; font-weight: bold; display: inline-block; margin-left: 8px; } ''' with gr.Blocks(title="Sora-2 Text-to-Video Generator", theme=gr.themes.Soft(), css=css) as demo: gr.HTML("""
Generate stunning videos using OpenAI's Sora-2 model
Exclusive access for Hugging Face PRO users. Subscribe to PRO →
Built with anycoder
✨ Welcome PRO User! You have full access to Sora-2.
Join thousands of creators who are already using PRO tools to bring their ideas to life.
""" return gr.update(visible=False), gr.update(visible=True, value=message) demo.load( control_access, inputs=None, outputs=[main_interface, pro_message], ) return demo # ========================= # Entrypoint # ========================= if __name__ == "__main__": # Clean up any leftover files on startup try: cleanup_temp_files() if os.path.exists("gradio_cached_examples"): shutil.rmtree("gradio_cached_examples", ignore_errors=True) except Exception as e: print(f"Initial cleanup error: {e}") app = create_ui() app.launch( show_api=False, enable_monitoring=False, quiet=True, max_threads=10, )