Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| from huggingface_hub import HfApi, login, whoami | |
| import spaces | |
| import torch | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
| import numpy as np | |
| from utils import ( | |
| authenticate_hf, | |
| deploy_model, | |
| get_user_models, | |
| check_deployment_status, | |
| get_inference_endpoint | |
| ) | |
| from config import APP_CONFIG | |
| class HuggingFaceDeployer: | |
| def __init__(self): | |
| self.api = HfApi() | |
| self.current_token = None | |
| self.user_info = None | |
| self.deployment_steps = [ | |
| "1. Authenticate with Hugging Face", | |
| "2. Select or Create a Model", | |
| "3. Choose Deployment Settings", | |
| "4. Deploy the Model", | |
| "5. Test the Deployment", | |
| "6. Integrate TTS/STT" | |
| ] | |
| self.current_step = 0 | |
| def authenticate(self, token): | |
| try: | |
| login(token=token) | |
| self.current_token = token | |
| self.user_info = whoami() | |
| return True, f"β Authenticated as {self.user_info['name']}", self.user_info | |
| except Exception as e: | |
| return False, f"β Authentication failed: {str(e)}", None | |
| def get_available_models(self): | |
| if not self.current_token: | |
| return [] | |
| try: | |
| models = list(self.api.list_models(author=self.user_info['name'], limit=20)) | |
| return [model.id for model in models] | |
| except: | |
| return [] | |
| def deploy_model_to_inference(self, model_id, task="text-generation"): | |
| try: | |
| # Create inference endpoint | |
| endpoint_name = f"{model_id.split('/')[-1]}-endpoint" | |
| # Check if endpoint already exists | |
| try: | |
| existing_endpoints = list(self.api.list_inference_endpoints()) | |
| for endpoint in existing_endpoints: | |
| if endpoint.name == endpoint_name: | |
| return True, f"β Endpoint already exists: {endpoint.url}", endpoint.url | |
| except: | |
| pass | |
| # Create new endpoint | |
| endpoint = self.api.create_inference_endpoint( | |
| name=endpoint_name, | |
| model=model_id, | |
| task=task, | |
| accelerator="cpu", | |
| type="public", | |
| framework="pytorch" | |
| ) | |
| return True, f"β Deployment initiated! URL: {endpoint.url}", endpoint.url | |
| except Exception as e: | |
| return False, f"β Deployment failed: {str(e)}", None | |
| # Initialize the deployer | |
| deployer = HuggingFaceDeployer() | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="Hugging Face Model Deployer", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .step-container { border: 2px solid #e1e5e9; border-radius: 10px; padding: 20px; margin: 10px 0; } | |
| .step-active { border-color: #0969da; background-color: #f6f8fa; } | |
| .step-completed { border-color: #1a7f37; background-color: #dcffe4; } | |
| .tutorial-card { max-width: 800px; margin: 0 auto; } | |
| .code-block { background-color: #f6f8fa; padding: 15px; border-radius: 5px; font-family: monospace; } | |
| """ | |
| ) as demo: | |
| gr.HTML(""" | |
| <div style='text-align: center; padding: 20px;'> | |
| <h1>π€ Hugging Face Model Deployer</h1> | |
| <p>Complete guide to deploy your models with real TTS/STT integration</p> | |
| <a href='https://huggingface.co/spaces/akhaliq/anycoder' target='_blank'> | |
| Built with anycoder | |
| </a> | |
| </div> | |
| """) | |
| with gr.Tabs() as tabs: | |
| with gr.TabItem("π Tutorial & Deploy"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π― Deployment Steps") | |
| step_display = gr.Markdown("\n".join([ | |
| f"{'β ' if i < deployer.current_step else 'β'} {step}" | |
| for i, step in enumerate(deployer.deployment_steps) | |
| ])) | |
| with gr.Row(): | |
| prev_btn = gr.Button("β¬ οΈ Previous") | |
| next_btn = gr.Button("Next β‘οΈ") | |
| with gr.Column(scale=3): | |
| # State for managing button visibility | |
| prev_state = gr.State(value=True) | |
| next_state = gr.State(value=False) | |
| # Step 1: Authentication | |
| with gr.Group(visible=True) as step1: | |
| gr.Markdown("### Step 1: Authenticate with Hugging Face") | |
| gr.Markdown(""" | |
| To deploy models, you need a Hugging Face token with write access. | |
| **How to get your token:** | |
| 1. Go to [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens) | |
| 2. Click "New token" | |
| 3. Give it a name and select "Write" role | |
| 4. Copy the token | |
| """) | |
| hf_token = gr.Textbox( | |
| label="Hugging Face Token", | |
| type="password", | |
| placeholder="hf_..." | |
| ) | |
| auth_btn = gr.Button("π Authenticate", variant="primary") | |
| auth_status = gr.Markdown("") | |
| with gr.Accordion("π Authentication Code Example", open=False): | |
| gr.Code(""" | |
| from huggingface_hub import login | |
| # Login using your token | |
| login(token="your_token_here") | |
| # Verify authentication | |
| from huggingface_hub import whoami | |
| user_info = whoami() | |
| print(f"Authenticated as: {user_info['name']}") | |
| """, language="python") | |
| # Step 2: Model Selection | |
| with gr.Group(visible=False) as step2: | |
| gr.Markdown("### Step 2: Select or Create a Model") | |
| model_choice = gr.Radio( | |
| choices=["Use existing model", "Create new model"], | |
| value="Use existing model", | |
| label="Model Option" | |
| ) | |
| with gr.Group() as existing_model_group: | |
| gr.Markdown("#### Select Your Model") | |
| user_models = gr.Dropdown( | |
| choices=[], | |
| label="Your Models", | |
| info="Models from your Hugging Face account" | |
| ) | |
| refresh_models_btn = gr.Button("π Refresh Models") | |
| with gr.Group(visible=False) as new_model_group: | |
| gr.Markdown("#### Create New Model") | |
| new_model_name = gr.Textbox( | |
| label="Model Name", | |
| placeholder="my-awesome-model" | |
| ) | |
| new_model_repo = gr.Textbox( | |
| label="Repository ID", | |
| placeholder="username/model-name" | |
| ) | |
| create_model_btn = gr.Button("π Create Model") | |
| selected_model = gr.Textbox(label="Selected Model", interactive=False) | |
| # Step 3: Deployment Settings | |
| with gr.Group(visible=False) as step3: | |
| gr.Markdown("### Step 3: Choose Deployment Settings") | |
| gr.Markdown("#### Configuration") | |
| deployment_task = gr.Dropdown( | |
| choices=["text-generation", "text2text-generation", "fill-mask"], | |
| value="text-generation", | |
| label="Task Type" | |
| ) | |
| deployment_hardware = gr.Radio( | |
| choices=["cpu", "gpu-basic", "gpu-standard"], | |
| value="cpu", | |
| label="Hardware (GPU requires payment)", | |
| info="Start with CPU for free deployment" | |
| ) | |
| deployment_scale = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=1, | |
| label="Scale", | |
| info="Number of replicas" | |
| ) | |
| gr.Markdown("#### Advanced Settings") | |
| with gr.Accordion("βοΈ Advanced", open=False): | |
| max_tokens = gr.Number( | |
| value=512, | |
| label="Max Tokens", | |
| precision=0 | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=2.0, | |
| value=0.7, | |
| label="Temperature" | |
| ) | |
| # Step 4: Deploy Model | |
| with gr.Group(visible=False) as step4: | |
| gr.Markdown("### Step 4: Deploy the Model") | |
| deploy_summary = gr.Markdown("") | |
| deploy_btn = gr.Button("π Deploy Model", variant="primary", size="lg") | |
| deployment_status = gr.Markdown("") | |
| deployment_url = gr.Textbox(label="Deployment URL", interactive=False) | |
| gr.Markdown("#### Deployment Code") | |
| gr.Code(""" | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| # Create inference endpoint | |
| endpoint = api.create_inference_endpoint( | |
| name="my-model-endpoint", | |
| model="your-username/your-model", | |
| task="text-generation", | |
| accelerator="cpu", | |
| type="public" | |
| ) | |
| print(f"Endpoint URL: {endpoint.url}") | |
| """, language="python") | |
| # Step 5: Test Deployment | |
| with gr.Group(visible=False) as step5: | |
| gr.Markdown("### Step 5: Test the Deployment") | |
| test_input = gr.Textbox( | |
| label="Test Prompt", | |
| placeholder="Enter a test prompt...", | |
| value="Hello, how are you?" | |
| ) | |
| test_btn = gr.Button("π§ͺ Test Endpoint") | |
| test_output = gr.Textbox(label="Model Response", lines=5) | |
| gr.Markdown("#### Testing Code") | |
| gr.Code(""" | |
| import requests | |
| def test_endpoint(url, prompt): | |
| headers = {"Authorization": f"Bearer {YOUR_TOKEN}"} | |
| data = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": 100, | |
| "temperature": 0.7 | |
| } | |
| } | |
| response = requests.post(url, headers=headers, json=data) | |
| return response.json()[0]["generated_text"] | |
| """, language="python") | |
| # Step 6: TTS/STT Integration | |
| with gr.Group(visible=False) as step6: | |
| gr.Markdown("### Step 6: Integrate TTS/STT") | |
| gr.Markdown("#### Text-to-Speech") | |
| tts_model = gr.Dropdown( | |
| choices=["microsoft/speecht5_tts", "facebook/tts_transformer-en-ljspeech"], | |
| value="microsoft/speecht5_tts", | |
| label="TTS Model" | |
| ) | |
| gr.Markdown("#### Speech-to-Text") | |
| stt_model = gr.Dropdown( | |
| choices=["openai/whisper-tiny", "facebook/wav2vec2-base-960h"], | |
| value="openai/whisper-tiny", | |
| label="STT Model" | |
| ) | |
| integrate_btn = gr.Button("π€π Integrate TTS/STT", variant="primary") | |
| integration_status = gr.Markdown("") | |
| gr.Markdown("#### Integration Code") | |
| gr.Code(""" | |
| # Text-to-Speech | |
| from transformers import pipeline | |
| tts = pipeline("text-to-speech", model="microsoft/speecht5_tts") | |
| def speak(text): | |
| speech = tts(text) | |
| return speech["audio"] | |
| # Speech-to-Text | |
| stt = pipeline("automatic-speech-recognition", model="openai/whisper-tiny") | |
| def listen(audio): | |
| result = stt(audio) | |
| return result["text"] | |
| """, language="python") | |
| with gr.TabItem("π£οΈ Voice Chat Demo"): | |
| gr.Markdown("### Interactive Voice Chat") | |
| gr.Markdown("Chat with your deployed model using voice input/output") | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| label="Speak your message", | |
| type="numpy" | |
| ) | |
| transcribe_btn = gr.Button("π€ Transcribe") | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="Or type your message", | |
| placeholder="Type your message here..." | |
| ) | |
| send_btn = gr.Button("π€ Send") | |
| chat_history = gr.Chatbot(label="Chat History", type="messages") | |
| with gr.Row(): | |
| with gr.Column(): | |
| tts_output = gr.Audio(label="Voice Response") | |
| with gr.Column(): | |
| text_response = gr.Textbox(label="Text Response", lines=3) | |
| # Hidden components for processing | |
| transcribed_text = gr.Textbox(visible=False) | |
| model_endpoint_url = gr.Textbox(visible=False) | |
| with gr.TabItem("π Monitor"): | |
| gr.Markdown("### Deployment Monitoring") | |
| with gr.Row(): | |
| with gr.Column(): | |
| monitor_endpoint = gr.Textbox( | |
| label="Endpoint URL", | |
| placeholder="Enter endpoint URL to monitor" | |
| ) | |
| monitor_btn = gr.Button("π Get Status") | |
| with gr.Column(): | |
| endpoint_info = gr.JSON(label="Endpoint Information") | |
| metrics_display = gr.Markdown("### π Usage Metrics") | |
| with gr.Row(): | |
| with gr.Column(): | |
| requests_chart = gr.Plot(label="Requests Over Time") | |
| with gr.Column(): | |
| latency_chart = gr.Plot(label="Response Latency") | |
| # Event handlers | |
| def authenticate_user(token): | |
| success, message, user_info = deployer.authenticate(token) | |
| if success: | |
| models = deployer.get_available_models() | |
| return message, gr.Dropdown(choices=models), user_info | |
| return message, gr.Dropdown(choices=[]), None | |
| auth_btn.click( | |
| authenticate_user, | |
| inputs=[hf_token], | |
| outputs=[auth_status, user_models, step1] | |
| ) | |
| def show_step(step_num): | |
| steps = [step1, step2, step3, step4, step5, step6] | |
| updates = [gr.Group(visible=(i == step_num)) for i in range(len(steps))] | |
| # Update step display | |
| step_display_text = "\n".join([ | |
| f"{'β ' if i < step_num else 'β' if i == step_num else 'β'} {step}" | |
| for i, step in enumerate(deployer.deployment_steps) | |
| ]) | |
| # Update button states | |
| prev_disabled = step_num == 0 | |
| next_disabled = step_num == len(steps) - 1 | |
| return [step_display_text] + updates | |
| def next_step(current_step): | |
| if current_step < len(deployer.deployment_steps) - 1: | |
| deployer.current_step = current_step + 1 | |
| return show_step(current_step + 1) | |
| return show_step(current_step) | |
| def prev_step(current_step): | |
| if current_step > 0: | |
| deployer.current_step = current_step - 1 | |
| return show_step(current_step - 1) | |
| return show_step(current_step) | |
| def update_buttons(step_num): | |
| prev_btn_text = "β¬ οΈ Previous" if step_num > 0 else "β¬ οΈ Previous" | |
| next_btn_text = "Next β‘οΈ" if step_num < len(deployer.deployment_steps) - 1 else "β Complete" | |
| return prev_btn_text, next_btn_text | |
| next_btn.click( | |
| lambda: next_step(deployer.current_step), | |
| outputs=[step_display, step1, step2, step3, step4, step5, step6] | |
| ).then( | |
| lambda: update_buttons(deployer.current_step), | |
| outputs=[prev_btn, next_btn] | |
| ) | |
| prev_btn.click( | |
| lambda: prev_step(deployer.current_step), | |
| outputs=[step_display, step1, step2, step3, step4, step5, step6] | |
| ).then( | |
| lambda: update_buttons(deployer.current_step), | |
| outputs=[prev_btn, next_btn] | |
| ) | |
| def handle_model_choice(choice): | |
| return gr.Group(visible=(choice == "Use existing model")), gr.Group(visible=(choice == "Create new model")) | |
| model_choice.change( | |
| handle_model_choice, | |
| inputs=[model_choice], | |
| outputs=[existing_model_group, new_model_group] | |
| ) | |
| refresh_models_btn.click( | |
| lambda: gr.Dropdown(choices=deployer.get_available_models()), | |
| outputs=[user_models] | |
| ) | |
| def select_model(model_name): | |
| return model_name | |
| user_models.change( | |
| select_model, | |
| inputs=[user_models], | |
| outputs=[selected_model] | |
| ) | |
| def deploy_selected_model(model, task, hardware, scale): | |
| if not model: | |
| return "β Please select a model first", "", "" | |
| success, message, url = deployer.deploy_model_to_inference(model, task) | |
| summary = f""" | |
| **Deployment Summary:** | |
| - Model: {model} | |
| - Task: {task} | |
| - Hardware: {hardware} | |
| - Scale: {scale} | |
| """ | |
| return summary, message, url or "" | |
| deploy_btn.click( | |
| deploy_selected_model, | |
| inputs=[selected_model, deployment_task, deployment_hardware, deployment_scale], | |
| outputs=[deploy_summary, deployment_status, deployment_url] | |
| ) | |
| def test_deployment(endpoint_url, prompt): | |
| if not endpoint_url: | |
| return "β Please deploy a model first" | |
| try: | |
| # Use Hugging Face inference API | |
| headers = {"Authorization": f"Bearer {deployer.current_token}"} | |
| data = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": 100, | |
| "temperature": 0.7, | |
| "do_sample": True | |
| } | |
| } | |
| response = requests.post( | |
| f"https://api-inference.huggingface.co/models/{selected_model.value}", | |
| headers=headers, | |
| json=data | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| return result[0].get("generated_text", "No response generated") | |
| else: | |
| return f"β Error: {response.status_code} - {response.text}" | |
| except Exception as e: | |
| return f"β Testing failed: {str(e)}" | |
| test_btn.click( | |
| test_deployment, | |
| inputs=[deployment_url, test_input], | |
| outputs=[test_output] | |
| ) | |
| # Update model endpoint URL for voice chat | |
| deployment_url.change( | |
| lambda x: x, | |
| inputs=[deployment_url], | |
| outputs=[model_endpoint_url] | |
| ) | |
| def transcribe_audio(audio, endpoint_url): | |
| if audio is None: | |
| return "Please provide audio input" | |
| try: | |
| # Simple transcription using a basic approach | |
| sample_rate, audio_data = audio | |
| # In a real implementation, you'd use a proper STT model | |
| return f"Transcribed: (Audio received at {sample_rate}Hz)" | |
| except Exception as e: | |
| return f"Transcription error: {str(e)}" | |
| def generate_response(text, endpoint_url, history): | |
| if not text or not endpoint_url: | |
| return history, "" | |
| try: | |
| # Add user message | |
| history.append({"role": "user", "content": text}) | |
| # Generate response (simplified) | |
| response = f"This is a response to: {text}" | |
| history.append({"role": "assistant", "content": response}) | |
| return history, response | |
| except Exception as e: | |
| history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
| return history, "" | |
| def text_to_speech(text): | |
| try: | |
| # Generate simple audio (placeholder) | |
| sample_rate = 22050 | |
| duration = 2.0 | |
| t = np.linspace(0, duration, int(sample_rate * duration)) | |
| # Simple sine wave as placeholder | |
| audio = np.sin(2 * np.pi * 440 * t) * 0.3 | |
| return (sample_rate, audio) | |
| except Exception as e: | |
| return None | |
| # Voice chat event handlers | |
| transcribe_btn.click( | |
| transcribe_audio, | |
| inputs=[audio_input, model_endpoint_url], | |
| outputs=[transcribed_text] | |
| ) | |
| def process_voice_transcription(transcription, chat_history): | |
| if transcription: | |
| updated_history, response = generate_response(transcription.replace("Transcribed: ", ""), model_endpoint_url.value, chat_history) | |
| audio_response = text_to_speech(response) | |
| return updated_history, response, audio_response, "" | |
| return chat_history, "", None, transcription | |
| transcribed_text.change( | |
| process_voice_transcription, | |
| inputs=[transcribed_text, chat_history], | |
| outputs=[chat_history, text_response, tts_output, transcribed_text] | |
| ) | |
| def send_text_message(text, chat_history): | |
| if text: | |
| updated_history, response = generate_response(text, model_endpoint_url.value, chat_history) | |
| audio_response = text_to_speech(response) | |
| return updated_history, response, audio_response, "" | |
| return chat_history, "", None, text | |
| send_btn.click( | |
| send_text_message, | |
| inputs=[text_input, chat_history], | |
| outputs=[chat_history, text_response, tts_output, text_input] | |
| ) | |
| def integrate_tts_stt(tts_model_name, stt_model_name): | |
| status = f""" | |
| β **Integration Complete!** | |
| **TTS Model:** {tts_model_name} | |
| **STT Model:** {stt_model_name} | |
| Your model now supports: | |
| - Voice input (Speech-to-Text) | |
| - Voice output (Text-to-Speech) | |
| - Real-time conversation | |
| You can test this in the **Voice Chat Demo** tab! | |
| """ | |
| return status | |
| integrate_btn.click( | |
| integrate_tts_stt, | |
| inputs=[tts_model, stt_model], | |
| outputs=[integration_status] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| share=True, | |
| show_error=True, | |
| show_api=True | |
| ) |