Spaces:
Runtime error
Runtime error
File size: 9,917 Bytes
a2e831a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
#!/usr/bin/env python3
"""
NeuCodec Test - Gradio App
Equivalent to nemo and snac test spaces, but for NeuCodec used in NeuTTS-Air models.
Allows testing encode/decode cycles with the neuphonic/neucodec model.
"""
import gradio as gr
import torch
import librosa
import numpy as np
import traceback
import time
# Attempt to import NeuCodec
try:
from neucodec import NeuCodec, DistillNeuCodec
print("NeuCodec modules imported successfully.")
except ImportError as e:
print(f"Error importing NeuCodec: {e}")
raise ImportError("Could not import NeuCodec. Make sure 'neucodec' is installed correctly.") from e
# --- Configuration ---
TARGET_SR = 16000 # NeuCodec operates at 16kHz for encoding
OUTPUT_SR = 24000 # NeuCodec outputs at 24kHz
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_NAME = "neuphonic/neucodec" # Options: neuphonic/neucodec, neuphonic/distill-neucodec
print(f"Using device: {DEVICE}")
# --- Load Model (Load once globally) ---
neucodec = None
try:
print(f"Loading NeuCodec model: {MODEL_NAME}...")
start_time = time.time()
if MODEL_NAME == "neuphonic/distill-neucodec":
neucodec = DistillNeuCodec.from_pretrained(MODEL_NAME)
else:
neucodec = NeuCodec.from_pretrained(MODEL_NAME)
neucodec = neucodec.to(DEVICE)
neucodec.eval() # Set model to evaluation mode
end_time = time.time()
print(f"NeuCodec loaded successfully to {DEVICE}. Time taken: {end_time - start_time:.2f} seconds.")
except Exception as e:
print(f"FATAL: Error loading NeuCodec: {e}")
print(traceback.format_exc())
# --- Main Processing Function ---
def process_audio(audio_filepath):
"""
Loads, resamples, encodes, decodes audio using NeuCodec, and returns results.
"""
if neucodec is None:
return None, None, None, "Error: NeuCodec could not be loaded. Cannot process audio."
if audio_filepath is None:
return None, None, None, "Please upload an audio file."
logs = ["--- Starting Audio Processing with NeuCodec ---"]
try:
# 1. Load Audio
logs.append(f"Loading audio file: {audio_filepath}")
load_start = time.time()
# Load original audio (for playback reference)
original_waveform, original_sr = librosa.load(audio_filepath, sr=None, mono=False)
logs.append(f"Audio loaded. Original SR: {original_sr} Hz, Shape: {original_waveform.shape}")
# Convert to mono if stereo
if len(original_waveform.shape) > 1:
logs.append(f"Warning: Input audio has {original_waveform.shape[0]} channels. Converting to mono.")
original_waveform = librosa.to_mono(original_waveform)
load_end = time.time()
logs.append(f"Loading time: {load_end - load_start:.2f}s")
# --- Prepare Original for Playback ---
original_audio_playback = (original_sr, original_waveform)
logs.append("Prepared original audio for playback.")
# 2. Resample to 16kHz for encoding (NeuCodec expects 16kHz input)
resample_start = time.time()
logs.append(f"Resampling waveform to {TARGET_SR} Hz for encoding...")
waveform_16k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=TARGET_SR)
logs.append(f"Resampling complete. New Shape: {waveform_16k.shape}")
resample_end = time.time()
logs.append(f"Resampling time: {resample_end - resample_start:.2f}s")
# --- Prepare 16kHz version for Playback ---
resampled_audio_playback = (TARGET_SR, waveform_16k)
logs.append("Prepared 16kHz audio for playback.")
# 3. Prepare for NeuCodec Encoding
# NeuCodec expects [batch, channels, samples] format
waveform_tensor = torch.from_numpy(waveform_16k).float().unsqueeze(0).unsqueeze(0) # [1, 1, samples]
waveform_tensor = waveform_tensor.to(DEVICE)
logs.append(f"Waveform prepared for encoding. Shape: {waveform_tensor.shape}, Device: {DEVICE}")
# 4. Encode Audio using NeuCodec
logs.append("Encoding audio with NeuCodec...")
encode_start = time.time()
with torch.no_grad():
encoded_codes = neucodec.encode_code(audio_or_path=waveform_tensor.cpu())
encode_end = time.time()
if encoded_codes is None:
log_msg = "Encoding failed: encoded_codes is None"
logs.append(log_msg)
raise ValueError(log_msg)
logs.append(f"Encoding complete. Time: {encode_end - encode_start:.2f}s")
logs.append(f"Encoded codes shape: {encoded_codes.shape}")
logs.append(f"Encoded codes device: {encoded_codes.device}")
# Log some statistics about the codes
logs.append(f"Code sequence length: {encoded_codes.shape[-1]}")
logs.append(f"Code range: [{encoded_codes.min().item():.0f}, {encoded_codes.max().item():.0f}]")
# Calculate compression ratio
original_samples = waveform_16k.shape[0]
code_elements = encoded_codes.numel()
compression_ratio = original_samples / code_elements if code_elements > 0 else 0
logs.append(f"Compression ratio: ~{compression_ratio:.1f}:1 ({original_samples} samples -> {code_elements} codes)")
# 5. Decode the Codes using NeuCodec
logs.append("Decoding the generated codes with NeuCodec...")
decode_start = time.time()
with torch.no_grad():
reconstructed_waveform = neucodec.decode_code(encoded_codes)
decode_end = time.time()
logs.append(f"Decoding complete. Reconstructed waveform shape: {reconstructed_waveform.shape}, Device: {reconstructed_waveform.device}. Time: {decode_end - decode_start:.2f}s")
# 6. Prepare Reconstructed Audio for Playback
# Output is at 24kHz. Move to CPU, remove batch and channel dims, convert to NumPy.
reconstructed_audio_np = reconstructed_waveform.cpu().squeeze().numpy()
logs.append(f"Reconstructed audio prepared for playback at {OUTPUT_SR} Hz. Shape: {reconstructed_audio_np.shape}")
reconstructed_audio_playback = (OUTPUT_SR, reconstructed_audio_np)
# 7. Calculate quality metrics
# For comparison, we need to resample original to 24kHz to match reconstructed output
logs.append("Calculating quality metrics...")
original_24k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=OUTPUT_SR)
# Handle length differences (common with codecs)
min_len = min(len(original_24k), len(reconstructed_audio_np))
original_trimmed = original_24k[:min_len]
reconstructed_trimmed = reconstructed_audio_np[:min_len]
# Simple MSE calculation
mse = np.mean((original_trimmed - reconstructed_trimmed) ** 2)
if len(original_24k) != len(reconstructed_audio_np):
logs.append(f"Audio length difference: Original {len(original_24k)} samples, Reconstructed {len(reconstructed_audio_np)} samples")
logs.append(f"MSE (first {min_len} samples at 24kHz): {mse:.6f}")
# Calculate Signal-to-Noise Ratio (SNR)
signal_power = np.mean(original_trimmed ** 2)
noise_power = mse
if noise_power > 0:
snr_db = 10 * np.log10(signal_power / noise_power)
logs.append(f"SNR: {snr_db:.2f} dB")
logs.append("\n--- Audio Processing Completed Successfully ---")
# Summary statistics
total_time = (load_end - load_start) + (resample_end - resample_start) + (encode_end - encode_start) + (decode_end - decode_start)
logs.append(f"Total processing time: {total_time:.2f}s")
logs.append(f"Audio duration: {len(original_waveform) / original_sr:.2f}s")
logs.append(f"Real-time factor: {(len(original_waveform) / original_sr) / total_time:.2f}x")
return original_audio_playback, resampled_audio_playback, reconstructed_audio_playback, "\n".join(logs)
except Exception as e:
logs.append("\n--- An Error Occurred ---")
logs.append(f"Error Type: {type(e).__name__}")
logs.append(f"Error Details: {e}")
logs.append("\n--- Traceback ---")
logs.append(traceback.format_exc())
return None, None, None, "\n".join(logs)
# --- Gradio Interface ---
DESCRIPTION = """
This app demonstrates the **NeuCodec** model (`neuphonic/neucodec`) used in NeuTTS-Air.
**How it works:**
1. Upload an audio file (wav, mp3, flac, etc.).
2. The audio will be automatically resampled to 16kHz for encoding.
3. The 16kHz audio is encoded into discrete codes by NeuCodec.
4. These codes are then decoded back into 24kHz audio by NeuCodec.
5. You can listen to the original, the 16kHz version, and the final reconstructed 24kHz audio.
**Technical details:**
- Input sample rate: 16kHz (for encoding)
- Output sample rate: 24kHz (after decoding)
- Architecture: 50Hz neural audio codec with single codebook
- Hop length: 480 samples
**Note:** If the input is stereo, it will be converted to mono.
"""
iface = gr.Interface(
fn=process_audio,
inputs=gr.Audio(type="filepath", label="Upload Audio File"),
outputs=[
gr.Audio(label="Original Audio"),
gr.Audio(label="16kHz Audio (Input to NeuCodec)"),
gr.Audio(label="Reconstructed Audio (24kHz Output from NeuCodec)"),
gr.Textbox(label="Log Output", lines=20)
],
title="NeuCodec Demo (16kHz -> 24kHz)",
description=DESCRIPTION,
examples=[
# TODO
# ["examples/example1.wav"],
],
cache_examples=False
)
if __name__ == "__main__":
if neucodec is None:
print("Cannot launch Gradio interface because NeuCodec failed to load.")
else:
print("Launching Gradio Interface...")
print(f"Model: {MODEL_NAME}")
print(f"Input sample rate: {TARGET_SR} Hz")
print(f"Output sample rate: {OUTPUT_SR} Hz")
print(f"Device: {DEVICE}")
iface.launch(share=True)
|