Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| import gradio as gr | |
| import os | |
| import tempfile | |
| import shutil | |
| from typing import Optional, Tuple, Union | |
| from huggingface_hub import InferenceClient, whoami | |
| from pathlib import Path | |
| # Initialize Hugging Face Inference Client with fal-ai provider | |
| client = InferenceClient( | |
| provider="fal-ai", | |
| api_key=os.environ.get("HF_TOKEN"), | |
| bill_to="huggingface", | |
| ) | |
| def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool: | |
| """Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" | |
| if not token: | |
| return False | |
| if isinstance(token, gr.OAuthToken): | |
| token_str = token.token | |
| elif isinstance(token, str): | |
| token_str = token | |
| else: | |
| return False | |
| try: | |
| user_info = whoami(token=token_str) | |
| return ( | |
| user_info.get("isPro", False) or | |
| any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) | |
| ) | |
| except Exception as e: | |
| print(f"Could not verify user's PRO/Enterprise status: {e}") | |
| return False | |
| def cleanup_temp_files(): | |
| """Clean up old temporary video files to prevent storage overflow.""" | |
| try: | |
| temp_dir = tempfile.gettempdir() | |
| # Clean up old .mp4 files in temp directory | |
| for file_path in Path(temp_dir).glob("*.mp4"): | |
| try: | |
| # Remove files older than 5 minutes | |
| import time | |
| if file_path.stat().st_mtime < (time.time() - 300): | |
| file_path.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| def generate_video( | |
| prompt: str, | |
| duration: int = 8, | |
| size: str = "1280x720", | |
| api_key: Optional[str] = None | |
| ) -> Tuple[Optional[str], str]: | |
| """Generate video using Sora-2 through Hugging Face Inference API with fal-ai provider.""" | |
| cleanup_temp_files() | |
| try: | |
| if api_key: | |
| temp_client = InferenceClient( | |
| provider="fal-ai", | |
| api_key=api_key, | |
| bill_to="huggingface", | |
| ) | |
| else: | |
| temp_client = client | |
| if not os.environ.get("HF_TOKEN") and not api_key: | |
| return None, "β Please set HF_TOKEN environment variable." | |
| video_bytes = temp_client.text_to_video( | |
| prompt, | |
| model="akhaliq/sora-2", | |
| ) | |
| temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| try: | |
| temp_file.write(video_bytes) | |
| temp_file.flush() | |
| video_path = temp_file.name | |
| finally: | |
| temp_file.close() | |
| return video_path, "β Video generated successfully!" | |
| except Exception as e: | |
| return None, f"β Error generating video: {str(e)}" | |
| # --- NEW: image -> video support --- | |
| def generate_video_from_image( | |
| image: Union[str, bytes], | |
| prompt: str, | |
| api_key: Optional[str] = None | |
| ) -> Tuple[Optional[str], str]: | |
| """Generate a video from a single input image + prompt using Sora-2 image-to-video.""" | |
| cleanup_temp_files() | |
| if not prompt or prompt.strip() == "": | |
| return None, "β Please enter a prompt" | |
| try: | |
| if api_key: | |
| temp_client = InferenceClient( | |
| provider="fal-ai", | |
| api_key=api_key, | |
| bill_to="huggingface", | |
| ) | |
| else: | |
| temp_client = client | |
| if not os.environ.get("HF_TOKEN") and not api_key: | |
| return None, "β Please set HF_TOKEN environment variable." | |
| if isinstance(image, str): | |
| with open(image, "rb") as f: | |
| input_image = f.read() | |
| elif isinstance(image, (bytes, bytearray)): | |
| input_image = image | |
| else: | |
| return None, "β Invalid image input. Please upload an image." | |
| video_bytes = temp_client.image_to_video( | |
| input_image, | |
| prompt=prompt, | |
| model="akhaliq/sora-2-image-to-video", | |
| ) | |
| temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| try: | |
| temp_file.write(video_bytes) | |
| temp_file.flush() | |
| video_path = temp_file.name | |
| finally: | |
| temp_file.close() | |
| return video_path, "β Video generated from image successfully!" | |
| except Exception as e: | |
| return None, f"β Error generating video from image: {str(e)}" | |
| def generate_with_pro_auth( | |
| prompt: str, | |
| oauth_token: Optional[gr.OAuthToken] = None | |
| ) -> Tuple[Optional[str], str]: | |
| """Wrapper function that checks if user is PRO before generating video.""" | |
| if not verify_pro_status(oauth_token): | |
| raise gr.Error("Access Denied. This app is exclusively for Hugging Face PRO users.") | |
| if not prompt or prompt.strip() == "": | |
| return None, "β Please enter a prompt" | |
| return generate_video( | |
| prompt, | |
| duration=8, | |
| size="1280x720", | |
| api_key=None | |
| ) | |
| # --- NEW: PRO-gated wrapper for image -> video --- | |
| def generate_with_pro_auth_image( | |
| prompt: str, | |
| image_path: Optional[str] = None, | |
| oauth_token: Optional[gr.OAuthToken] = None | |
| ) -> Tuple[Optional[str], str]: | |
| """Checks PRO status then calls image->video generator.""" | |
| if not verify_pro_status(oauth_token): | |
| raise gr.Error("Access Denied. This app is exclusively for Hugging Face PRO users.") | |
| if not image_path: | |
| return None, "β Please upload an image" | |
| return generate_video_from_image(image=image_path, prompt=prompt, api_key=None) | |
| def simple_generate(prompt: str) -> Optional[str]: | |
| """Simplified wrapper for examples that only returns video.""" | |
| if not prompt or prompt.strip() == "": | |
| return None | |
| video_path, _ = generate_video(prompt, duration=8, size="1280x720", api_key=None) | |
| return video_path | |
| def create_ui(): | |
| css = ''' | |
| .logo-dark{display: none} | |
| .dark .logo-dark{display: block !important} | |
| .dark .logo-light{display: none} | |
| #sub_title{margin-top: -20px !important} | |
| .pro-badge{ | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 4px 12px; | |
| border-radius: 20px; | |
| font-size: 0.9em; | |
| font-weight: bold; | |
| display: inline-block; | |
| margin-left: 8px; | |
| } | |
| ''' | |
| with gr.Blocks(title="Sora-2 Text-to-Video Generator", theme=gr.themes.Soft(), css=css) as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; max-width: 800px; margin: 0 auto;"> | |
| <h1 style="font-size: 2.5em; margin-bottom: 0.5em;"> | |
| π¬ Sora-2 Text-to-Video Generator | |
| <span class="pro-badge">PRO</span> | |
| </h1> | |
| <p style="font-size: 1.1em; color: #666; margin-bottom: 20px;">Generate stunning videos using OpenAI's Sora-2 model</p> | |
| <p id="sub_title" style="font-size: 1em; margin-top: 20px; margin-bottom: 15px;"> | |
| <strong>Exclusive access for Hugging Face PRO users.</strong> | |
| <a href="http://huggingface.co/subscribe/pro?source=sora2_video" target="_blank" style="color: #667eea;">Subscribe to PRO β</a> | |
| </p> | |
| <p style="font-size: 0.9em; color: #999; margin-top: 15px;"> | |
| Built with <a href="https://huggingface.co/spaces/akhaliq/anycoder" target="_blank" style="color: #667eea;">anycoder</a> | |
| </p> | |
| </div> | |
| """) | |
| gr.LoginButton() | |
| pro_message = gr.Markdown(visible=False) | |
| main_interface = gr.Column(visible=False) | |
| with main_interface: | |
| gr.HTML("""<div style="text-align: center; margin: 20px 0;"> | |
| <p style="color: #28a745; font-weight: bold;">β¨ Welcome PRO User! You have full access to Sora-2.</p> | |
| </div>""") | |
| # Text -> Video | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prompt_input = gr.Textbox( | |
| label="Enter your prompt", | |
| placeholder="Describe the video you want to create...", | |
| lines=4 | |
| ) | |
| generate_btn = gr.Button("π₯ Generate Video", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| video_output = gr.Video(label="Generated Video", height=400, interactive=False, show_download_button=True) | |
| status_output = gr.Textbox(label="Status", interactive=False, visible=True) | |
| generate_btn.click( | |
| fn=generate_with_pro_auth, | |
| inputs=[prompt_input], | |
| outputs=[video_output, status_output], | |
| queue=False | |
| ) | |
| # --- NEW: Image -> Video UI --- | |
| gr.HTML(""" | |
| <div style="text-align: center; margin: 40px 0 10px;"> | |
| <h3 style="margin-bottom: 8px;">πΌοΈ β π¬ Image β Video (beta)</h3> | |
| <p style="color:#666; margin:0;">Turn a single image into a short video with a guiding prompt.</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| img_prompt_input = gr.Textbox( | |
| label="Describe how the scene should evolve", | |
| placeholder="e.g., The cat starts to dance and spins playfully", | |
| lines=3, | |
| ) | |
| image_input = gr.Image(label="Upload an image", type="filepath") | |
| generate_img_btn = gr.Button("π₯ Generate from Image", variant="primary") | |
| with gr.Column(scale=1): | |
| video_output_img = gr.Video(label="Generated Video (from Image)", height=400, interactive=False, show_download_button=True) | |
| status_output_img = gr.Textbox(label="Status", interactive=False, visible=True) | |
| generate_img_btn.click( | |
| fn=generate_with_pro_auth_image, | |
| inputs=[img_prompt_input, image_input], | |
| outputs=[video_output_img, status_output_img], | |
| queue=False | |
| ) | |
| gr.HTML("""<div style="text-align: center; margin-top: 40px; padding: 20px; border-top: 1px solid #e0e0e0;"> | |
| <h3 style="color: #667eea;">Thank you for being a PRO user! π€</h3> | |
| </div>""") | |
| def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None): | |
| if not profile: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| if verify_pro_status(oauth_token): | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| message = "## β¨ Exclusive Access for PRO Users\n\nThis tool is available exclusively for Hugging Face **PRO** members." | |
| return gr.update(visible=False), gr.update(visible=True, value=message) | |
| demo.load(control_access, inputs=None, outputs=[main_interface, pro_message]) | |
| return demo | |
| if __name__ == "__main__": | |
| try: | |
| cleanup_temp_files() | |
| if os.path.exists("gradio_cached_examples"): | |
| shutil.rmtree("gradio_cached_examples", ignore_errors=True) | |
| except Exception as e: | |
| print(f"Initial cleanup error: {e}") | |
| app = create_ui() | |
| app.launch(show_api=False, enable_monitoring=False, quiet=True, max_threads=10) | |
