Spaces:
Running
Running
| import gradio as gr | |
| import torch | |
| import os | |
| import time | |
| import subprocess | |
| import tempfile | |
| # --- Try to import ctransformers for GGUF, provide helpful message if not found --- | |
| try: | |
| from ctransformers import AutoModelForCausalLM as AutoModelForCausalLM_GGUF | |
| from ctransformers.llm import LLM | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| GGUF_AVAILABLE = True | |
| except ImportError: | |
| GGUF_AVAILABLE = False | |
| print("WARNING: 'ctransformers' not found. This app relies on it for efficient CPU inference.") | |
| print("Please install it with: pip install ctransformers transformers") | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| # --- Configuration for Models and Generation --- | |
| ORIGINAL_MODEL_ID = "HuggingFaceTB/SmolLM2-360M-Instruct" | |
| GGUF_MODEL_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" | |
| GGUF_MODEL_FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" | |
| # --- Generation Parameters --- | |
| MAX_NEW_TOKENS = 256 | |
| TEMPERATURE = 0.7 | |
| TOP_K = 50 | |
| TOP_P = 0.95 | |
| DO_SAMPLE = True # This parameter is primarily for Hugging Face transformers.Model.generate() | |
| # Global model and tokenizer | |
| model = None | |
| tokenizer = None | |
| device = "cpu" | |
| # --- Festival Audio Function --- | |
| def speak_text_festival_to_file(text): | |
| """ | |
| Uses Festival to speak the given text and saves the output to a temporary WAV file. | |
| Returns the path to the generated audio file, or None on error. | |
| """ | |
| if not text.strip(): | |
| print("No text provided for Festival to speak.") | |
| return None | |
| # Create a temporary WAV file for Festival output | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
| audio_filepath = temp_audio_file.name | |
| try: | |
| # Festival command to synthesize text and save to a WAV file | |
| festival_command = f""" | |
| (set! utt (SayText "{text.replace('"', '\\"')}")) | |
| (utt.save.wave utt "{audio_filepath}") | |
| """ | |
| # Execute Festival via subprocess | |
| process = subprocess.Popen(['festival', '--pipe'], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True) | |
| stdout, stderr = process.communicate(input=festival_command) | |
| if process.returncode != 0: | |
| print(f"Error speaking text with Festival. Return code: {process.returncode}") | |
| print(f"Festival stderr: {stderr}") | |
| if os.path.exists(audio_filepath): | |
| os.remove(audio_filepath) | |
| return None | |
| if not os.path.exists(audio_filepath) or os.path.getsize(audio_filepath) == 0: | |
| print(f"Festival did not create a valid WAV file at {audio_filepath}. Stderr: {stderr}") | |
| if os.path.exists(audio_filepath): | |
| os.remove(audio_filepath) | |
| return None | |
| print(f"Audio saved to: {audio_filepath}") | |
| return audio_filepath | |
| except FileNotFoundError: | |
| print("Error: Festival executable not found. Make sure Festival is installed and in your PATH.") | |
| if os.path.exists(audio_filepath): | |
| os.remove(audio_filepath) | |
| return None | |
| except Exception as e: | |
| print(f"An unexpected error occurred during Festival processing: {e}") | |
| if os.path.exists(audio_filepath): | |
| os.remove(audio_filepath) | |
| return None | |
| # --- Model Loading Function --- | |
| def load_model_for_zerocpu(): | |
| global model, tokenizer, device | |
| if GGUF_AVAILABLE: | |
| print(f"Attempting to load GGUF model '{GGUF_MODEL_ID}' (file: '{GGUF_MODEL_FILENAME}') for ZeroCPU...") | |
| try: | |
| model = AutoModelForCausalLM_GGUF.from_pretrained( | |
| GGUF_MODEL_ID, | |
| model_file=GGUF_MODEL_FILENAME, | |
| model_type="llama", | |
| gpu_layers=0 | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(ORIGINAL_MODEL_ID) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| print(f"GGUF model '{GGUF_MODEL_ID}' loaded successfully for CPU.") | |
| return | |
| except Exception as e: | |
| print(f"WARNING: Could not load GGUF model '{GGUF_MODEL_ID}' from '{GGUF_MODEL_FILENAME}': {e}") | |
| print(f"Falling back to standard Hugging Face model '{ORIGINAL_MODEL_ID}' for CPU (will be slower without GGUF quantization).") | |
| else: | |
| print("WARNING: ctransformers is not available. Will load standard Hugging Face model directly.") | |
| print(f"Loading standard Hugging Face model '{ORIGINAL_MODEL_ID}' for CPU...") | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained(ORIGINAL_MODEL_ID) | |
| tokenizer = AutoTokenizer.from_pretrained(ORIGINAL_MODEL_ID) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model.to(device) | |
| print(f"Standard model '{ORIGINAL_MODEL_ID}' loaded successfully on CPU.") | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Could not load standard model '{ORIGINAL_MODEL_ID}' on CPU: {e}") | |
| print("Please ensure the model ID is correct, you have enough RAM, and dependencies are installed.") | |
| model = None | |
| tokenizer = None | |
| # --- Inference Function for Gradio Blocks --- | |
| # This function yields tuples for streaming text and then the final audio. | |
| def predict_chat_with_audio_and_streaming(message: str, history: list): | |
| if model is None or tokenizer is None: | |
| # history will now be a list of dictionaries, so yield accordingly | |
| yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": "Error: Model or tokenizer failed to load."}], None | |
| return | |
| # Initialize llm_messages with a system message | |
| llm_messages = [{"role": "system", "content": "You are a friendly chatbot."}] | |
| # Iterate through the history (list of dictionaries) and convert it to the LLM message format | |
| # The history from Gradio's Chatbot (type='messages') is already in the desired format | |
| for item in history: | |
| llm_messages.append(item) | |
| # Add the current user message | |
| llm_messages.append({"role": "user", "content": message}) | |
| generated_text = "" | |
| start_time = time.time() | |
| if GGUF_AVAILABLE and isinstance(model, LLM): | |
| prompt_input = tokenizer.apply_chat_template(llm_messages, tokenize=False, add_generation_prompt=True) | |
| for token in model( | |
| prompt_input, | |
| max_new_tokens=MAX_NEW_TOKENS, | |
| temperature=TEMPERATURE, | |
| top_k=TOP_K, | |
| top_p=TOP_P, | |
| repetition_penalty=1.1, | |
| stop=["User:", "\nUser", "\n#", "\n##", "<|endoftext|>", "<|im_end|>"], | |
| stream=True | |
| ): | |
| generated_text += token | |
| # Strip common special tokens before yielding | |
| cleaned_text = generated_text.replace("<|im_end|>", "").replace("<|endoftext|>", "").strip() | |
| # Yield the current state of history (list of dictionaries) and an empty audio output for streaming text | |
| yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": cleaned_text}], None | |
| else: | |
| input_text = tokenizer.apply_chat_template(llm_messages, tokenize=False, add_generation_prompt=True) | |
| inputs = tokenizer.encode(input_text, return_tensors="pt").to(device) | |
| outputs = model.generate( | |
| inputs, | |
| max_length=inputs.shape[-1] + MAX_NEW_TOKENS, | |
| temperature=TEMPERATURE, | |
| top_k=TOP_K, | |
| top_p=TOP_P, | |
| do_sample=DO_SAMPLE, | |
| pad_token_id=tokenizer.pad_token_id | |
| ) | |
| generated_text = tokenizer.decode(outputs[0][inputs.shape[-1]:], skip_special_tokens=True).strip() | |
| # Strip common special tokens from the final generated text | |
| generated_text = generated_text.replace("<|im_end|>", "").replace("<|endoftext|>", "").strip() | |
| # Yield the full text response before audio generation | |
| yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": generated_text}], None | |
| end_time = time.time() | |
| print(f"Inference Time for this turn: {end_time - start_time:.2f} seconds") | |
| # After streaming is complete and full text is gathered | |
| audio_file_path = speak_text_festival_to_file(generated_text) | |
| # Yield the final state with audio file | |
| yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": generated_text}], audio_file_path | |
| # --- Gradio Interface Setup --- | |
| if __name__ == "__main__": | |
| load_model_for_zerocpu() | |
| # chatbot_initial_value is already in the correct format for type='messages' | |
| chatbot_initial_value = [{"role": "assistant", "content": "Hello! I'm an AI assistant. I'm currently running in a CPU-only environment for efficient demonstration. How can I help you today?"}] | |
| # Gradio Blocks for more flexible layout | |
| with gr.Blocks(theme="soft", title="SmolLM2-360M-Instruct (or TinyLlama GGUF) on ZeroCPU with Festival TTS") as demo: | |
| gr.Markdown( | |
| """ | |
| # SmolLM2-360M-Instruct (or TinyLlama GGUF) on ZeroCPU with Festival TTS | |
| This Space demonstrates an LLM for efficient CPU-only inference. | |
| **Note:** For ZeroCPU, this app prioritizes `tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf` (a GGUF-quantized model | |
| like TinyLlama) due to better CPU performance than `HuggingFaceTB/SmolLM2-360M-Instruct` | |
| without GGUF. Expect varied responses each run due to randomized generation. | |
| **Festival TTS:** The chatbot's responses will also be spoken aloud using the local Festival Speech Synthesis System. | |
| """ | |
| ) | |
| # The main Chatbot display component | |
| chatbot_display = gr.Chatbot(value=chatbot_initial_value, height=500, label="Chat History", type='messages') | |
| # Audio component for the last response | |
| audio_output = gr.Audio(label="Chatbot Audio Response", type="filepath", autoplay=True) | |
| # Textbox for user input | |
| msg = gr.Textbox(placeholder="Ask me a question...", container=False, scale=7) | |
| # Submit button | |
| submit_btn = gr.Button("Send") | |
| # Define example inputs for the textbox | |
| # For examples, when type='messages', it expects a list of lists where each inner list | |
| # represents a user message for the input textbox. The output is still the chat history. | |
| examples_data = [ | |
| ["What is the capital of France?"], | |
| ["Can you tell me a fun fact about outer space?"], | |
| ["What's the best way to stay motivated?"], | |
| ] | |
| # Gradio Examples | |
| gr.Examples( | |
| examples=examples_data, | |
| inputs=[msg], | |
| fn=predict_chat_with_audio_and_streaming, | |
| outputs=[chatbot_display, audio_output], | |
| cache_examples=False, | |
| ) | |
| # Event listeners for submission | |
| msg.submit(predict_chat_with_audio_and_streaming, | |
| inputs=[msg, chatbot_display], | |
| outputs=[chatbot_display, audio_output]) | |
| submit_btn.click(predict_chat_with_audio_and_streaming, | |
| inputs=[msg, chatbot_display], | |
| outputs=[chatbot_display, audio_output]) | |
| # Clear textbox after submission for better UX | |
| msg.submit(lambda: "", outputs=[msg]) | |
| submit_btn.click(lambda: "", outputs=[msg]) | |
| demo.launch() | |