gen-hcl28oph / app.py
elcrei's picture
Update app.py
8629596 verified
import gradio as gr
import os
import json
import requests
from huggingface_hub import HfApi, login, whoami
import spaces
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import numpy as np
from utils import (
authenticate_hf,
deploy_model,
get_user_models,
check_deployment_status,
get_inference_endpoint
)
from config import APP_CONFIG
class HuggingFaceDeployer:
def __init__(self):
self.api = HfApi()
self.current_token = None
self.user_info = None
self.deployment_steps = [
"1. Authenticate with Hugging Face",
"2. Select or Create a Model",
"3. Choose Deployment Settings",
"4. Deploy the Model",
"5. Test the Deployment",
"6. Integrate TTS/STT"
]
self.current_step = 0
def authenticate(self, token):
try:
login(token=token)
self.current_token = token
self.user_info = whoami()
return True, f"βœ… Authenticated as {self.user_info['name']}", self.user_info
except Exception as e:
return False, f"❌ Authentication failed: {str(e)}", None
def get_available_models(self):
if not self.current_token:
return []
try:
models = list(self.api.list_models(author=self.user_info['name'], limit=20))
return [model.id for model in models]
except:
return []
def deploy_model_to_inference(self, model_id, task="text-generation"):
try:
# Create inference endpoint
endpoint_name = f"{model_id.split('/')[-1]}-endpoint"
# Check if endpoint already exists
try:
existing_endpoints = list(self.api.list_inference_endpoints())
for endpoint in existing_endpoints:
if endpoint.name == endpoint_name:
return True, f"βœ… Endpoint already exists: {endpoint.url}", endpoint.url
except:
pass
# Create new endpoint
endpoint = self.api.create_inference_endpoint(
name=endpoint_name,
model=model_id,
task=task,
accelerator="cpu",
type="public",
framework="pytorch"
)
return True, f"βœ… Deployment initiated! URL: {endpoint.url}", endpoint.url
except Exception as e:
return False, f"❌ Deployment failed: {str(e)}", None
# Initialize the deployer
deployer = HuggingFaceDeployer()
def create_interface():
with gr.Blocks(
title="Hugging Face Model Deployer",
theme=gr.themes.Soft(),
css="""
.step-container { border: 2px solid #e1e5e9; border-radius: 10px; padding: 20px; margin: 10px 0; }
.step-active { border-color: #0969da; background-color: #f6f8fa; }
.step-completed { border-color: #1a7f37; background-color: #dcffe4; }
.tutorial-card { max-width: 800px; margin: 0 auto; }
.code-block { background-color: #f6f8fa; padding: 15px; border-radius: 5px; font-family: monospace; }
"""
) as demo:
gr.HTML("""
<div style='text-align: center; padding: 20px;'>
<h1>πŸ€— Hugging Face Model Deployer</h1>
<p>Complete guide to deploy your models with real TTS/STT integration</p>
<a href='https://huggingface.co/spaces/akhaliq/anycoder' target='_blank'>
Built with anycoder
</a>
</div>
""")
with gr.Tabs() as tabs:
with gr.TabItem("πŸ“š Tutorial & Deploy"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 🎯 Deployment Steps")
step_display = gr.Markdown("\n".join([
f"{'βœ…' if i < deployer.current_step else 'β­•'} {step}"
for i, step in enumerate(deployer.deployment_steps)
]))
with gr.Row():
prev_btn = gr.Button("⬅️ Previous")
next_btn = gr.Button("Next ➑️")
with gr.Column(scale=3):
# State for managing button visibility
prev_state = gr.State(value=True)
next_state = gr.State(value=False)
# Step 1: Authentication
with gr.Group(visible=True) as step1:
gr.Markdown("### Step 1: Authenticate with Hugging Face")
gr.Markdown("""
To deploy models, you need a Hugging Face token with write access.
**How to get your token:**
1. Go to [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)
2. Click "New token"
3. Give it a name and select "Write" role
4. Copy the token
""")
hf_token = gr.Textbox(
label="Hugging Face Token",
type="password",
placeholder="hf_..."
)
auth_btn = gr.Button("πŸ” Authenticate", variant="primary")
auth_status = gr.Markdown("")
with gr.Accordion("πŸ“– Authentication Code Example", open=False):
gr.Code("""
from huggingface_hub import login
# Login using your token
login(token="your_token_here")
# Verify authentication
from huggingface_hub import whoami
user_info = whoami()
print(f"Authenticated as: {user_info['name']}")
""", language="python")
# Step 2: Model Selection
with gr.Group(visible=False) as step2:
gr.Markdown("### Step 2: Select or Create a Model")
model_choice = gr.Radio(
choices=["Use existing model", "Create new model"],
value="Use existing model",
label="Model Option"
)
with gr.Group() as existing_model_group:
gr.Markdown("#### Select Your Model")
user_models = gr.Dropdown(
choices=[],
label="Your Models",
info="Models from your Hugging Face account"
)
refresh_models_btn = gr.Button("πŸ”„ Refresh Models")
with gr.Group(visible=False) as new_model_group:
gr.Markdown("#### Create New Model")
new_model_name = gr.Textbox(
label="Model Name",
placeholder="my-awesome-model"
)
new_model_repo = gr.Textbox(
label="Repository ID",
placeholder="username/model-name"
)
create_model_btn = gr.Button("πŸ“ Create Model")
selected_model = gr.Textbox(label="Selected Model", interactive=False)
# Step 3: Deployment Settings
with gr.Group(visible=False) as step3:
gr.Markdown("### Step 3: Choose Deployment Settings")
gr.Markdown("#### Configuration")
deployment_task = gr.Dropdown(
choices=["text-generation", "text2text-generation", "fill-mask"],
value="text-generation",
label="Task Type"
)
deployment_hardware = gr.Radio(
choices=["cpu", "gpu-basic", "gpu-standard"],
value="cpu",
label="Hardware (GPU requires payment)",
info="Start with CPU for free deployment"
)
deployment_scale = gr.Slider(
minimum=1,
maximum=10,
value=1,
label="Scale",
info="Number of replicas"
)
gr.Markdown("#### Advanced Settings")
with gr.Accordion("βš™οΈ Advanced", open=False):
max_tokens = gr.Number(
value=512,
label="Max Tokens",
precision=0
)
temperature = gr.Slider(
minimum=0.1,
maximum=2.0,
value=0.7,
label="Temperature"
)
# Step 4: Deploy Model
with gr.Group(visible=False) as step4:
gr.Markdown("### Step 4: Deploy the Model")
deploy_summary = gr.Markdown("")
deploy_btn = gr.Button("πŸš€ Deploy Model", variant="primary", size="lg")
deployment_status = gr.Markdown("")
deployment_url = gr.Textbox(label="Deployment URL", interactive=False)
gr.Markdown("#### Deployment Code")
gr.Code("""
from huggingface_hub import HfApi
api = HfApi()
# Create inference endpoint
endpoint = api.create_inference_endpoint(
name="my-model-endpoint",
model="your-username/your-model",
task="text-generation",
accelerator="cpu",
type="public"
)
print(f"Endpoint URL: {endpoint.url}")
""", language="python")
# Step 5: Test Deployment
with gr.Group(visible=False) as step5:
gr.Markdown("### Step 5: Test the Deployment")
test_input = gr.Textbox(
label="Test Prompt",
placeholder="Enter a test prompt...",
value="Hello, how are you?"
)
test_btn = gr.Button("πŸ§ͺ Test Endpoint")
test_output = gr.Textbox(label="Model Response", lines=5)
gr.Markdown("#### Testing Code")
gr.Code("""
import requests
def test_endpoint(url, prompt):
headers = {"Authorization": f"Bearer {YOUR_TOKEN}"}
data = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 100,
"temperature": 0.7
}
}
response = requests.post(url, headers=headers, json=data)
return response.json()[0]["generated_text"]
""", language="python")
# Step 6: TTS/STT Integration
with gr.Group(visible=False) as step6:
gr.Markdown("### Step 6: Integrate TTS/STT")
gr.Markdown("#### Text-to-Speech")
tts_model = gr.Dropdown(
choices=["microsoft/speecht5_tts", "facebook/tts_transformer-en-ljspeech"],
value="microsoft/speecht5_tts",
label="TTS Model"
)
gr.Markdown("#### Speech-to-Text")
stt_model = gr.Dropdown(
choices=["openai/whisper-tiny", "facebook/wav2vec2-base-960h"],
value="openai/whisper-tiny",
label="STT Model"
)
integrate_btn = gr.Button("πŸŽ€πŸ”Š Integrate TTS/STT", variant="primary")
integration_status = gr.Markdown("")
gr.Markdown("#### Integration Code")
gr.Code("""
# Text-to-Speech
from transformers import pipeline
tts = pipeline("text-to-speech", model="microsoft/speecht5_tts")
def speak(text):
speech = tts(text)
return speech["audio"]
# Speech-to-Text
stt = pipeline("automatic-speech-recognition", model="openai/whisper-tiny")
def listen(audio):
result = stt(audio)
return result["text"]
""", language="python")
with gr.TabItem("πŸ—£οΈ Voice Chat Demo"):
gr.Markdown("### Interactive Voice Chat")
gr.Markdown("Chat with your deployed model using voice input/output")
with gr.Row():
with gr.Column():
audio_input = gr.Audio(
sources=["microphone"],
label="Speak your message",
type="numpy"
)
transcribe_btn = gr.Button("🎀 Transcribe")
with gr.Column():
text_input = gr.Textbox(
label="Or type your message",
placeholder="Type your message here..."
)
send_btn = gr.Button("πŸ“€ Send")
chat_history = gr.Chatbot(label="Chat History", type="messages")
with gr.Row():
with gr.Column():
tts_output = gr.Audio(label="Voice Response")
with gr.Column():
text_response = gr.Textbox(label="Text Response", lines=3)
# Hidden components for processing
transcribed_text = gr.Textbox(visible=False)
model_endpoint_url = gr.Textbox(visible=False)
with gr.TabItem("πŸ“Š Monitor"):
gr.Markdown("### Deployment Monitoring")
with gr.Row():
with gr.Column():
monitor_endpoint = gr.Textbox(
label="Endpoint URL",
placeholder="Enter endpoint URL to monitor"
)
monitor_btn = gr.Button("πŸ“ˆ Get Status")
with gr.Column():
endpoint_info = gr.JSON(label="Endpoint Information")
metrics_display = gr.Markdown("### πŸ“Š Usage Metrics")
with gr.Row():
with gr.Column():
requests_chart = gr.Plot(label="Requests Over Time")
with gr.Column():
latency_chart = gr.Plot(label="Response Latency")
# Event handlers
def authenticate_user(token):
success, message, user_info = deployer.authenticate(token)
if success:
models = deployer.get_available_models()
return message, gr.Dropdown(choices=models), user_info
return message, gr.Dropdown(choices=[]), None
auth_btn.click(
authenticate_user,
inputs=[hf_token],
outputs=[auth_status, user_models, step1]
)
def show_step(step_num):
steps = [step1, step2, step3, step4, step5, step6]
updates = [gr.Group(visible=(i == step_num)) for i in range(len(steps))]
# Update step display
step_display_text = "\n".join([
f"{'βœ…' if i < step_num else 'β­•' if i == step_num else 'β­•'} {step}"
for i, step in enumerate(deployer.deployment_steps)
])
# Update button states
prev_disabled = step_num == 0
next_disabled = step_num == len(steps) - 1
return [step_display_text] + updates
def next_step(current_step):
if current_step < len(deployer.deployment_steps) - 1:
deployer.current_step = current_step + 1
return show_step(current_step + 1)
return show_step(current_step)
def prev_step(current_step):
if current_step > 0:
deployer.current_step = current_step - 1
return show_step(current_step - 1)
return show_step(current_step)
def update_buttons(step_num):
prev_btn_text = "⬅️ Previous" if step_num > 0 else "⬅️ Previous"
next_btn_text = "Next ➑️" if step_num < len(deployer.deployment_steps) - 1 else "βœ… Complete"
return prev_btn_text, next_btn_text
next_btn.click(
lambda: next_step(deployer.current_step),
outputs=[step_display, step1, step2, step3, step4, step5, step6]
).then(
lambda: update_buttons(deployer.current_step),
outputs=[prev_btn, next_btn]
)
prev_btn.click(
lambda: prev_step(deployer.current_step),
outputs=[step_display, step1, step2, step3, step4, step5, step6]
).then(
lambda: update_buttons(deployer.current_step),
outputs=[prev_btn, next_btn]
)
def handle_model_choice(choice):
return gr.Group(visible=(choice == "Use existing model")), gr.Group(visible=(choice == "Create new model"))
model_choice.change(
handle_model_choice,
inputs=[model_choice],
outputs=[existing_model_group, new_model_group]
)
refresh_models_btn.click(
lambda: gr.Dropdown(choices=deployer.get_available_models()),
outputs=[user_models]
)
def select_model(model_name):
return model_name
user_models.change(
select_model,
inputs=[user_models],
outputs=[selected_model]
)
def deploy_selected_model(model, task, hardware, scale):
if not model:
return "❌ Please select a model first", "", ""
success, message, url = deployer.deploy_model_to_inference(model, task)
summary = f"""
**Deployment Summary:**
- Model: {model}
- Task: {task}
- Hardware: {hardware}
- Scale: {scale}
"""
return summary, message, url or ""
deploy_btn.click(
deploy_selected_model,
inputs=[selected_model, deployment_task, deployment_hardware, deployment_scale],
outputs=[deploy_summary, deployment_status, deployment_url]
)
def test_deployment(endpoint_url, prompt):
if not endpoint_url:
return "❌ Please deploy a model first"
try:
# Use Hugging Face inference API
headers = {"Authorization": f"Bearer {deployer.current_token}"}
data = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 100,
"temperature": 0.7,
"do_sample": True
}
}
response = requests.post(
f"https://api-inference.huggingface.co/models/{selected_model.value}",
headers=headers,
json=data
)
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
return result[0].get("generated_text", "No response generated")
else:
return f"❌ Error: {response.status_code} - {response.text}"
except Exception as e:
return f"❌ Testing failed: {str(e)}"
test_btn.click(
test_deployment,
inputs=[deployment_url, test_input],
outputs=[test_output]
)
# Update model endpoint URL for voice chat
deployment_url.change(
lambda x: x,
inputs=[deployment_url],
outputs=[model_endpoint_url]
)
@spaces.GPU
def transcribe_audio(audio, endpoint_url):
if audio is None:
return "Please provide audio input"
try:
# Simple transcription using a basic approach
sample_rate, audio_data = audio
# In a real implementation, you'd use a proper STT model
return f"Transcribed: (Audio received at {sample_rate}Hz)"
except Exception as e:
return f"Transcription error: {str(e)}"
@spaces.GPU
def generate_response(text, endpoint_url, history):
if not text or not endpoint_url:
return history, ""
try:
# Add user message
history.append({"role": "user", "content": text})
# Generate response (simplified)
response = f"This is a response to: {text}"
history.append({"role": "assistant", "content": response})
return history, response
except Exception as e:
history.append({"role": "assistant", "content": f"Error: {str(e)}"})
return history, ""
def text_to_speech(text):
try:
# Generate simple audio (placeholder)
sample_rate = 22050
duration = 2.0
t = np.linspace(0, duration, int(sample_rate * duration))
# Simple sine wave as placeholder
audio = np.sin(2 * np.pi * 440 * t) * 0.3
return (sample_rate, audio)
except Exception as e:
return None
# Voice chat event handlers
transcribe_btn.click(
transcribe_audio,
inputs=[audio_input, model_endpoint_url],
outputs=[transcribed_text]
)
def process_voice_transcription(transcription, chat_history):
if transcription:
updated_history, response = generate_response(transcription.replace("Transcribed: ", ""), model_endpoint_url.value, chat_history)
audio_response = text_to_speech(response)
return updated_history, response, audio_response, ""
return chat_history, "", None, transcription
transcribed_text.change(
process_voice_transcription,
inputs=[transcribed_text, chat_history],
outputs=[chat_history, text_response, tts_output, transcribed_text]
)
def send_text_message(text, chat_history):
if text:
updated_history, response = generate_response(text, model_endpoint_url.value, chat_history)
audio_response = text_to_speech(response)
return updated_history, response, audio_response, ""
return chat_history, "", None, text
send_btn.click(
send_text_message,
inputs=[text_input, chat_history],
outputs=[chat_history, text_response, tts_output, text_input]
)
def integrate_tts_stt(tts_model_name, stt_model_name):
status = f"""
βœ… **Integration Complete!**
**TTS Model:** {tts_model_name}
**STT Model:** {stt_model_name}
Your model now supports:
- Voice input (Speech-to-Text)
- Voice output (Text-to-Speech)
- Real-time conversation
You can test this in the **Voice Chat Demo** tab!
"""
return status
integrate_btn.click(
integrate_tts_stt,
inputs=[tts_model, stt_model],
outputs=[integration_status]
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(
share=True,
show_error=True,
show_api=True
)