Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| NeuCodec Test - Gradio App | |
| Equivalent to nemo and snac test spaces, but for NeuCodec used in NeuTTS-Air models. | |
| Allows testing encode/decode cycles with the neuphonic/neucodec model. | |
| """ | |
| import gradio as gr | |
| import torch | |
| import librosa | |
| import numpy as np | |
| import traceback | |
| import time | |
| # Attempt to import NeuCodec | |
| try: | |
| from neucodec import NeuCodec, DistillNeuCodec | |
| print("NeuCodec modules imported successfully.") | |
| except ImportError as e: | |
| print(f"Error importing NeuCodec: {e}") | |
| raise ImportError("Could not import NeuCodec. Make sure 'neucodec' is installed correctly.") from e | |
| # --- Configuration --- | |
| TARGET_SR = 16000 # NeuCodec operates at 16kHz for encoding | |
| OUTPUT_SR = 24000 # NeuCodec outputs at 24kHz | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| MODEL_NAME = "neuphonic/neucodec" # Options: neuphonic/neucodec, neuphonic/distill-neucodec | |
| print(f"Using device: {DEVICE}") | |
| # --- Load Model (Load once globally) --- | |
| neucodec = None | |
| try: | |
| print(f"Loading NeuCodec model: {MODEL_NAME}...") | |
| start_time = time.time() | |
| if MODEL_NAME == "neuphonic/distill-neucodec": | |
| neucodec = DistillNeuCodec.from_pretrained(MODEL_NAME) | |
| else: | |
| neucodec = NeuCodec.from_pretrained(MODEL_NAME) | |
| neucodec = neucodec.to(DEVICE) | |
| neucodec.eval() # Set model to evaluation mode | |
| end_time = time.time() | |
| print(f"NeuCodec loaded successfully to {DEVICE}. Time taken: {end_time - start_time:.2f} seconds.") | |
| except Exception as e: | |
| print(f"FATAL: Error loading NeuCodec: {e}") | |
| print(traceback.format_exc()) | |
| # --- Main Processing Function --- | |
| def process_audio(audio_filepath): | |
| """ | |
| Loads, resamples, encodes, decodes audio using NeuCodec, and returns results. | |
| """ | |
| if neucodec is None: | |
| return None, None, None, "Error: NeuCodec could not be loaded. Cannot process audio." | |
| if audio_filepath is None: | |
| return None, None, None, "Please upload an audio file." | |
| logs = ["--- Starting Audio Processing with NeuCodec ---"] | |
| try: | |
| # 1. Load Audio | |
| logs.append(f"Loading audio file: {audio_filepath}") | |
| load_start = time.time() | |
| # Load original audio (for playback reference) | |
| original_waveform, original_sr = librosa.load(audio_filepath, sr=None, mono=False) | |
| logs.append(f"Audio loaded. Original SR: {original_sr} Hz, Shape: {original_waveform.shape}") | |
| # Convert to mono if stereo | |
| if len(original_waveform.shape) > 1: | |
| logs.append(f"Warning: Input audio has {original_waveform.shape[0]} channels. Converting to mono.") | |
| original_waveform = librosa.to_mono(original_waveform) | |
| load_end = time.time() | |
| logs.append(f"Loading time: {load_end - load_start:.2f}s") | |
| # --- Prepare Original for Playback --- | |
| original_audio_playback = (original_sr, original_waveform) | |
| logs.append("Prepared original audio for playback.") | |
| # 2. Resample to 16kHz for encoding (NeuCodec expects 16kHz input) | |
| resample_start = time.time() | |
| logs.append(f"Resampling waveform to {TARGET_SR} Hz for encoding...") | |
| waveform_16k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=TARGET_SR) | |
| logs.append(f"Resampling complete. New Shape: {waveform_16k.shape}") | |
| resample_end = time.time() | |
| logs.append(f"Resampling time: {resample_end - resample_start:.2f}s") | |
| # --- Prepare 16kHz version for Playback --- | |
| resampled_audio_playback = (TARGET_SR, waveform_16k) | |
| logs.append("Prepared 16kHz audio for playback.") | |
| # 3. Prepare for NeuCodec Encoding | |
| # NeuCodec expects [batch, channels, samples] format | |
| waveform_tensor = torch.from_numpy(waveform_16k).float().unsqueeze(0).unsqueeze(0) # [1, 1, samples] | |
| waveform_tensor = waveform_tensor.to(DEVICE) | |
| logs.append(f"Waveform prepared for encoding. Shape: {waveform_tensor.shape}, Device: {DEVICE}") | |
| # 4. Encode Audio using NeuCodec | |
| logs.append("Encoding audio with NeuCodec...") | |
| encode_start = time.time() | |
| with torch.no_grad(): | |
| encoded_codes = neucodec.encode_code(audio_or_path=waveform_tensor.cpu()) | |
| encode_end = time.time() | |
| if encoded_codes is None: | |
| log_msg = "Encoding failed: encoded_codes is None" | |
| logs.append(log_msg) | |
| raise ValueError(log_msg) | |
| logs.append(f"Encoding complete. Time: {encode_end - encode_start:.2f}s") | |
| logs.append(f"Encoded codes shape: {encoded_codes.shape}") | |
| logs.append(f"Encoded codes device: {encoded_codes.device}") | |
| # Log some statistics about the codes | |
| logs.append(f"Code sequence length: {encoded_codes.shape[-1]}") | |
| logs.append(f"Code range: [{encoded_codes.min().item():.0f}, {encoded_codes.max().item():.0f}]") | |
| # Calculate compression ratio | |
| original_samples = waveform_16k.shape[0] | |
| code_elements = encoded_codes.numel() | |
| compression_ratio = original_samples / code_elements if code_elements > 0 else 0 | |
| logs.append(f"Compression ratio: ~{compression_ratio:.1f}:1 ({original_samples} samples -> {code_elements} codes)") | |
| # 5. Decode the Codes using NeuCodec | |
| logs.append("Decoding the generated codes with NeuCodec...") | |
| decode_start = time.time() | |
| with torch.no_grad(): | |
| reconstructed_waveform = neucodec.decode_code(encoded_codes) | |
| decode_end = time.time() | |
| logs.append(f"Decoding complete. Reconstructed waveform shape: {reconstructed_waveform.shape}, Device: {reconstructed_waveform.device}. Time: {decode_end - decode_start:.2f}s") | |
| # 6. Prepare Reconstructed Audio for Playback | |
| # Output is at 24kHz. Move to CPU, remove batch and channel dims, convert to NumPy. | |
| reconstructed_audio_np = reconstructed_waveform.cpu().squeeze().numpy() | |
| logs.append(f"Reconstructed audio prepared for playback at {OUTPUT_SR} Hz. Shape: {reconstructed_audio_np.shape}") | |
| reconstructed_audio_playback = (OUTPUT_SR, reconstructed_audio_np) | |
| # 7. Calculate quality metrics | |
| # For comparison, we need to resample original to 24kHz to match reconstructed output | |
| logs.append("Calculating quality metrics...") | |
| original_24k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=OUTPUT_SR) | |
| # Handle length differences (common with codecs) | |
| min_len = min(len(original_24k), len(reconstructed_audio_np)) | |
| original_trimmed = original_24k[:min_len] | |
| reconstructed_trimmed = reconstructed_audio_np[:min_len] | |
| # Simple MSE calculation | |
| mse = np.mean((original_trimmed - reconstructed_trimmed) ** 2) | |
| if len(original_24k) != len(reconstructed_audio_np): | |
| logs.append(f"Audio length difference: Original {len(original_24k)} samples, Reconstructed {len(reconstructed_audio_np)} samples") | |
| logs.append(f"MSE (first {min_len} samples at 24kHz): {mse:.6f}") | |
| # Calculate Signal-to-Noise Ratio (SNR) | |
| signal_power = np.mean(original_trimmed ** 2) | |
| noise_power = mse | |
| if noise_power > 0: | |
| snr_db = 10 * np.log10(signal_power / noise_power) | |
| logs.append(f"SNR: {snr_db:.2f} dB") | |
| logs.append("\n--- Audio Processing Completed Successfully ---") | |
| # Summary statistics | |
| total_time = (load_end - load_start) + (resample_end - resample_start) + (encode_end - encode_start) + (decode_end - decode_start) | |
| logs.append(f"Total processing time: {total_time:.2f}s") | |
| logs.append(f"Audio duration: {len(original_waveform) / original_sr:.2f}s") | |
| logs.append(f"Real-time factor: {(len(original_waveform) / original_sr) / total_time:.2f}x") | |
| return original_audio_playback, resampled_audio_playback, reconstructed_audio_playback, "\n".join(logs) | |
| except Exception as e: | |
| logs.append("\n--- An Error Occurred ---") | |
| logs.append(f"Error Type: {type(e).__name__}") | |
| logs.append(f"Error Details: {e}") | |
| logs.append("\n--- Traceback ---") | |
| logs.append(traceback.format_exc()) | |
| return None, None, None, "\n".join(logs) | |
| # --- Gradio Interface --- | |
| DESCRIPTION = """ | |
| This app demonstrates the **NeuCodec** model (`neuphonic/neucodec`) used in NeuTTS-Air. | |
| **How it works:** | |
| 1. Upload an audio file (wav, mp3, flac, etc.). | |
| 2. The audio will be automatically resampled to 16kHz for encoding. | |
| 3. The 16kHz audio is encoded into discrete codes by NeuCodec. | |
| 4. These codes are then decoded back into 24kHz audio by NeuCodec. | |
| 5. You can listen to the original, the 16kHz version, and the final reconstructed 24kHz audio. | |
| **Technical details:** | |
| - Input sample rate: 16kHz (for encoding) | |
| - Output sample rate: 24kHz (after decoding) | |
| - Architecture: 50Hz neural audio codec with single codebook | |
| - Hop length: 480 samples | |
| **Note:** If the input is stereo, it will be converted to mono. | |
| """ | |
| iface = gr.Interface( | |
| fn=process_audio, | |
| inputs=gr.Audio(type="filepath", label="Upload Audio File"), | |
| outputs=[ | |
| gr.Audio(label="Original Audio"), | |
| gr.Audio(label="16kHz Audio (Input to NeuCodec)"), | |
| gr.Audio(label="Reconstructed Audio (24kHz Output from NeuCodec)"), | |
| gr.Textbox(label="Log Output", lines=20) | |
| ], | |
| title="NeuCodec Demo (16kHz -> 24kHz)", | |
| description=DESCRIPTION, | |
| examples=[ | |
| # TODO | |
| # ["examples/example1.wav"], | |
| ], | |
| cache_examples=False | |
| ) | |
| if __name__ == "__main__": | |
| if neucodec is None: | |
| print("Cannot launch Gradio interface because NeuCodec failed to load.") | |
| else: | |
| print("Launching Gradio Interface...") | |
| print(f"Model: {MODEL_NAME}") | |
| print(f"Input sample rate: {TARGET_SR} Hz") | |
| print(f"Output sample rate: {OUTPUT_SR} Hz") | |
| print(f"Device: {DEVICE}") | |
| iface.launch(share=True) | |